开始
最近在做的事情,需要解析MySQL binlog中的previous_gtids,首先想到两种方法:
- google搜现成开源的解析代码;
 
- 移植MySQL内核中关于previous_gtid解析的代码;
 
探索
移植MySQL代码这个事情,本身工作量比较大,而且MySQL中关于GTID的类以及文件有好多,我只是需要一个解析previous_gtids_log_event的工具,没必要搞这么重,所以方法2是下下策。
然后很自然的去google搜索开源解析代码,本以为解析previous_gtids_log_event这种事情,网上应该有很多现成的代码可以拷贝,结果google不但没有找到,而且连previous_gtids_log_event的格式相关的说明都没有搜到(可能是我不会用google)。方法一行不通。
没办法,只能硬着头皮看MySQL源码中关于previous_gtids_log_event中相关的解析方法。
在MySQL中,所有的event都有一个公共的基类log_event,这个基类有个read_event函数,在解析event的时候,通过read_event函数读出一个event的数据,然后根据头部中的type字段,强制类型转换为对应type的event,好像也没有涉及到解析的过程。这可咋办?
源码中不可能没有线索,所以耐着性子看代码,终于发现一个线索:在log_event.cc文件中定义previous_gtids_log_event成员函数的地方,找到了get_str()函数。这个20行左右的函数作用是将previous_gtids_log_event以指定的格式转换为字符串。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
   | char *Previous_gtids_log_event::get_str(   size_t *length_p, const Gtid_set::String_format *string_format) const {   DBUG_ENTER("Previous_gtids_log_event::get_str(size_t *)");   Sid_map sid_map(NULL);   Gtid_set set(&sid_map, NULL);   DBUG_PRINT("info", ("temp_buf=%p buf=%p", temp_buf, buf));      if (set.add_gtid_encoding(buf, buf_size) != RETURN_STATUS_OK)     DBUG_RETURN(NULL);   set.dbug_print("set");   size_t length= set.get_string_length(string_format);   DBUG_PRINT("info", ("string length= %lu", (ulong) length));   char* str= (char *)my_malloc(length + 1, MYF(MY_WME));   if (str != NULL)   {     set.to_string(str, string_format);     if (length_p != NULL)       *length_p= length;   }   DBUG_RETURN(str); }
   | 
 
深入
继续跟进到add_gtid_encoding函数,可以看到,在Gtid_set类中的add_gtid_encoding函数其实就是解析了previous_gtids_log_event:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
   | enum_return_status Gtid_set::add_gtid_encoding(const uchar *encoded,                                                size_t length,                                                size_t *actual_length) {   DBUG_ENTER("Gtid_set::add_gtid_encoding(const uchar *, size_t)");   ......   size_t pos= 0;   uint64 n_sids;   ......      if (length < 8)   {     DBUG_PRINT("error", ("(length=%lu) < 8", (ulong) length));     goto report_error;   }   n_sids= uint8korr(encoded);   pos+= 8;      for (uint i= 0; i < n_sids; i++)   {          if (length - pos < 16 + 8)     {       DBUG_PRINT("error", ("(length=%lu) - (pos=%lu) < 16 + 8. "                            "[n_sids=%llu i=%u]",                            (ulong) length, (ulong) pos, n_sids, i));       goto report_error;     }     rpl_sid sid;     sid.copy_from(encoded + pos);     pos+= 16;     uint64 n_intervals= uint8korr(encoded + pos);     pos+= 8;     ......          if (length - pos < 2 * 8 * n_intervals)     {       DBUG_PRINT("error", ("(length=%lu) - (pos=%lu) < 2 * 8 * (n_intervals=%llu)",                            (ulong) length, (ulong) pos, n_intervals));       goto report_error;     }     Interval_iterator ivit(this, sidno);     rpl_gno last= 0;     for (uint i= 0; i < n_intervals; i++)     {              rpl_gno start= sint8korr(encoded + pos);       pos+= 8;       rpl_gno end= sint8korr(encoded + pos);       pos+= 8;       if (start <= last || end <= start)       {         DBUG_PRINT("error", ("last=%lld start=%lld end=%lld",                              last, start, end));         goto report_error;       }       last= end;       .....     }   }   ..... }
   | 
 
根据以上代码,很容易得到previous_gtid的格式如下:【event body格式,省略了前面19字节的common header】
1 2 3 4
   | +-------+--------+----------+-------------+--------------+---------------+------------------------- |sid_num|sid1    |interv_num|interv1_start|interv1_end   |interv2_start  |interv2_end |sid2    |... |8 bytes|16 bytes|8 bytes   |8 bytes      |8 bytes       |8 bytes        |8 bytes     |16 bytes|... +-------+--------+----------+-------------+--------------+---------------+-------------------------
   | 
 
结果
previous_gtids_log_event格式中,post_header长度为0, 所以除去前面19字节的common header, 剩余的body部分就是要解析的数据部分。
将mysql代码转为手动解析的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
   | ...... int64_t ev_body_len = ev->ev_length_ - BINLOG_EVENT_MIN_HEAD_LEN; unsigned char* buf = new unsigned char[ev_body_len + 1]; int32_t rc = fread(buf, 1, ev_body_len, fd_); ......
  if (ev_body_len < 8) {      }
 
  decode_uint64_t(buf, &n_sids); buf_pos += 8;
  previous_gtids_.clear(); for (uint64_t i=0; i<n_sids; i++) {     if (ev_body_len - buf_pos < 16 + 8){              }
           memcpy(sid, buf + buf_pos, ENCODED_SID_LENGTH);     buf_pos += 16;     char tmpbuf[37];     sid_to_string(sid, tmpbuf);     uuid.assign(tmpbuf);     previous_gtids_.append(uuid + ":");
           decode_uint64_t(buf + buf_pos, &n_intervals);     buf_pos += 8;
      if (ev_body_len - buf_pos < 2 * 8 * n_intervals) {              }
           last = 0;     for (uint64_t j=0; j<n_intervals; j++) {                           decode_int64_t(buf + buf_pos, &start);         buf_pos += 8;
                   gno_to_string(start, gno_start);         previous_gtids_.append(gno_start);
                   decode_int64_t(buf + buf_pos, &end);               buf_pos += 8;
                   if (start <= last || end <= start) {                      }         last = end;
          if (start < end - 1) {                          gno_to_string(end - 1, gno_end);             previous_gtids_.append("-" + gno_end);         }
          if (j < n_intervals - 1) {             previous_gtids_.append(":");         }     }
      if (i < n_sids - 1) {         previous_gtids_.append(",\n");     } }
   | 
 
靠谱参考: