开始
最近在做的事情,需要解析MySQL binlog中的previous_gtids,首先想到两种方法:
- google搜现成开源的解析代码;
- 移植MySQL内核中关于previous_gtid解析的代码;
探索
移植MySQL代码这个事情,本身工作量比较大,而且MySQL中关于GTID的类以及文件有好多,我只是需要一个解析previous_gtids_log_event的工具,没必要搞这么重,所以方法2是下下策。
然后很自然的去google搜索开源解析代码,本以为解析previous_gtids_log_event这种事情,网上应该有很多现成的代码可以拷贝,结果google不但没有找到,而且连previous_gtids_log_event的格式相关的说明都没有搜到(可能是我不会用google)。方法一行不通。
没办法,只能硬着头皮看MySQL源码中关于previous_gtids_log_event中相关的解析方法。
在MySQL中,所有的event都有一个公共的基类log_event,这个基类有个read_event函数,在解析event的时候,通过read_event函数读出一个event的数据,然后根据头部中的type字段,强制类型转换为对应type的event,好像也没有涉及到解析的过程。这可咋办?
源码中不可能没有线索,所以耐着性子看代码,终于发现一个线索:在log_event.cc文件中定义previous_gtids_log_event成员函数的地方,找到了get_str()函数。这个20行左右的函数作用是将previous_gtids_log_event以指定的格式转换为字符串。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| char *Previous_gtids_log_event::get_str( size_t *length_p, const Gtid_set::String_format *string_format) const { DBUG_ENTER("Previous_gtids_log_event::get_str(size_t *)"); Sid_map sid_map(NULL); Gtid_set set(&sid_map, NULL); DBUG_PRINT("info", ("temp_buf=%p buf=%p", temp_buf, buf)); if (set.add_gtid_encoding(buf, buf_size) != RETURN_STATUS_OK) DBUG_RETURN(NULL); set.dbug_print("set"); size_t length= set.get_string_length(string_format); DBUG_PRINT("info", ("string length= %lu", (ulong) length)); char* str= (char *)my_malloc(length + 1, MYF(MY_WME)); if (str != NULL) { set.to_string(str, string_format); if (length_p != NULL) *length_p= length; } DBUG_RETURN(str); }
|
深入
继续跟进到add_gtid_encoding函数,可以看到,在Gtid_set类中的add_gtid_encoding函数其实就是解析了previous_gtids_log_event:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| enum_return_status Gtid_set::add_gtid_encoding(const uchar *encoded, size_t length, size_t *actual_length) { DBUG_ENTER("Gtid_set::add_gtid_encoding(const uchar *, size_t)"); ...... size_t pos= 0; uint64 n_sids; ...... if (length < 8) { DBUG_PRINT("error", ("(length=%lu) < 8", (ulong) length)); goto report_error; } n_sids= uint8korr(encoded); pos+= 8; for (uint i= 0; i < n_sids; i++) { if (length - pos < 16 + 8) { DBUG_PRINT("error", ("(length=%lu) - (pos=%lu) < 16 + 8. " "[n_sids=%llu i=%u]", (ulong) length, (ulong) pos, n_sids, i)); goto report_error; } rpl_sid sid; sid.copy_from(encoded + pos); pos+= 16; uint64 n_intervals= uint8korr(encoded + pos); pos+= 8; ...... if (length - pos < 2 * 8 * n_intervals) { DBUG_PRINT("error", ("(length=%lu) - (pos=%lu) < 2 * 8 * (n_intervals=%llu)", (ulong) length, (ulong) pos, n_intervals)); goto report_error; } Interval_iterator ivit(this, sidno); rpl_gno last= 0; for (uint i= 0; i < n_intervals; i++) { rpl_gno start= sint8korr(encoded + pos); pos+= 8; rpl_gno end= sint8korr(encoded + pos); pos+= 8; if (start <= last || end <= start) { DBUG_PRINT("error", ("last=%lld start=%lld end=%lld", last, start, end)); goto report_error; } last= end; ..... } } ..... }
|
根据以上代码,很容易得到previous_gtid的格式如下:【event body格式,省略了前面19字节的common header】
1 2 3 4
| +-------+--------+----------+-------------+--------------+---------------+------------------------- |sid_num|sid1 |interv_num|interv1_start|interv1_end |interv2_start |interv2_end |sid2 |... |8 bytes|16 bytes|8 bytes |8 bytes |8 bytes |8 bytes |8 bytes |16 bytes|... +-------+--------+----------+-------------+--------------+---------------+-------------------------
|
结果
previous_gtids_log_event格式中,post_header长度为0, 所以除去前面19字节的common header, 剩余的body部分就是要解析的数据部分。
将mysql代码转为手动解析的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| ...... int64_t ev_body_len = ev->ev_length_ - BINLOG_EVENT_MIN_HEAD_LEN; unsigned char* buf = new unsigned char[ev_body_len + 1]; int32_t rc = fread(buf, 1, ev_body_len, fd_); ......
if (ev_body_len < 8) { }
decode_uint64_t(buf, &n_sids); buf_pos += 8;
previous_gtids_.clear(); for (uint64_t i=0; i<n_sids; i++) { if (ev_body_len - buf_pos < 16 + 8){ }
memcpy(sid, buf + buf_pos, ENCODED_SID_LENGTH); buf_pos += 16; char tmpbuf[37]; sid_to_string(sid, tmpbuf); uuid.assign(tmpbuf); previous_gtids_.append(uuid + ":");
decode_uint64_t(buf + buf_pos, &n_intervals); buf_pos += 8;
if (ev_body_len - buf_pos < 2 * 8 * n_intervals) { }
last = 0; for (uint64_t j=0; j<n_intervals; j++) { decode_int64_t(buf + buf_pos, &start); buf_pos += 8;
gno_to_string(start, gno_start); previous_gtids_.append(gno_start);
decode_int64_t(buf + buf_pos, &end); buf_pos += 8;
if (start <= last || end <= start) { } last = end;
if (start < end - 1) { gno_to_string(end - 1, gno_end); previous_gtids_.append("-" + gno_end); }
if (j < n_intervals - 1) { previous_gtids_.append(":"); } }
if (i < n_sids - 1) { previous_gtids_.append(",\n"); } }
|
靠谱参考: