LogDecoder.java 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. package com.taobao.tddl.dbsync.binlog;
  2. import java.io.IOException;
  3. import java.util.BitSet;
  4. import org.apache.commons.logging.Log;
  5. import org.apache.commons.logging.LogFactory;
  6. import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
  7. import com.taobao.tddl.dbsync.binlog.event.AppendBlockLogEvent;
  8. import com.taobao.tddl.dbsync.binlog.event.BeginLoadQueryLogEvent;
  9. import com.taobao.tddl.dbsync.binlog.event.CreateFileLogEvent;
  10. import com.taobao.tddl.dbsync.binlog.event.DeleteFileLogEvent;
  11. import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
  12. import com.taobao.tddl.dbsync.binlog.event.ExecuteLoadLogEvent;
  13. import com.taobao.tddl.dbsync.binlog.event.ExecuteLoadQueryLogEvent;
  14. import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
  15. import com.taobao.tddl.dbsync.binlog.event.GtidLogEvent;
  16. import com.taobao.tddl.dbsync.binlog.event.HeartbeatLogEvent;
  17. import com.taobao.tddl.dbsync.binlog.event.IgnorableLogEvent;
  18. import com.taobao.tddl.dbsync.binlog.event.IncidentLogEvent;
  19. import com.taobao.tddl.dbsync.binlog.event.IntvarLogEvent;
  20. import com.taobao.tddl.dbsync.binlog.event.LoadLogEvent;
  21. import com.taobao.tddl.dbsync.binlog.event.LogHeader;
  22. import com.taobao.tddl.dbsync.binlog.event.PreviousGtidsLogEvent;
  23. import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
  24. import com.taobao.tddl.dbsync.binlog.event.RandLogEvent;
  25. import com.taobao.tddl.dbsync.binlog.event.RotateLogEvent;
  26. import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent;
  27. import com.taobao.tddl.dbsync.binlog.event.RowsQueryLogEvent;
  28. import com.taobao.tddl.dbsync.binlog.event.StartLogEventV3;
  29. import com.taobao.tddl.dbsync.binlog.event.StopLogEvent;
  30. import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;
  31. import com.taobao.tddl.dbsync.binlog.event.TransactionContextLogEvent;
  32. import com.taobao.tddl.dbsync.binlog.event.UnknownLogEvent;
  33. import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent;
  34. import com.taobao.tddl.dbsync.binlog.event.UserVarLogEvent;
  35. import com.taobao.tddl.dbsync.binlog.event.ViewChangeEvent;
  36. import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
  37. import com.taobao.tddl.dbsync.binlog.event.XaPrepareLogEvent;
  38. import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
  39. import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
  40. import com.taobao.tddl.dbsync.binlog.event.mariadb.BinlogCheckPointLogEvent;
  41. import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidListLogEvent;
  42. import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidLogEvent;
  43. import com.taobao.tddl.dbsync.binlog.event.mariadb.StartEncryptionLogEvent;
  44. /**
  45. * Implements a binary-log decoder.
  46. *
  47. * <pre>
  48. * LogDecoder decoder = new LogDecoder();
  49. * decoder.handle(...);
  50. *
  51. * LogEvent event;
  52. * do
  53. * {
  54. * event = decoder.decode(buffer, context);
  55. *
  56. * // process log event.
  57. * }
  58. * while (event != null);
  59. * // no more events in buffer.
  60. * </pre>
  61. *
  62. * @author <a href="mailto:changyuan.lh@taobao.com">Changyuan.lh</a>
  63. * @version 1.0
  64. */
  65. public final class LogDecoder {
  66. protected static final Log logger = LogFactory.getLog(LogDecoder.class);
  67. protected final BitSet handleSet = new BitSet(LogEvent.ENUM_END_EVENT);
  68. public LogDecoder(){
  69. }
  70. public LogDecoder(final int fromIndex, final int toIndex){
  71. handleSet.set(fromIndex, toIndex);
  72. }
  73. public final void handle(final int fromIndex, final int toIndex) {
  74. handleSet.set(fromIndex, toIndex);
  75. }
  76. public final void handle(final int flagIndex) {
  77. handleSet.set(flagIndex);
  78. }
  79. /**
  80. * Decoding an event from binary-log buffer.
  81. *
  82. * @return <code>UknownLogEvent</code> if event type is unknown or skipped,
  83. * <code>null</code> if buffer is not including a full event.
  84. */
  85. public LogEvent decode(LogBuffer buffer, LogContext context) throws IOException {
  86. final int limit = buffer.limit();
  87. if (limit >= FormatDescriptionLogEvent.LOG_EVENT_HEADER_LEN) {
  88. LogHeader header = new LogHeader(buffer, context.getFormatDescription());
  89. final int len = header.getEventLen();
  90. if (limit >= len) {
  91. LogEvent event;
  92. /* Checking binary-log's header */
  93. if (handleSet.get(header.getType())) {
  94. buffer.limit(len);
  95. try {
  96. /* Decoding binary-log to event */
  97. event = decode(buffer, header, context);
  98. if (event != null) {
  99. // set logFileName
  100. event.getHeader().setLogFileName(context.getLogPosition().getFileName());
  101. event.setSemival(buffer.semival);
  102. }
  103. } catch (IOException e) {
  104. if (logger.isWarnEnabled()) {
  105. logger.warn("Decoding " + LogEvent.getTypeName(header.getType()) + " failed from: "
  106. + context.getLogPosition(), e);
  107. }
  108. throw e;
  109. } finally {
  110. buffer.limit(limit); /* Restore limit */
  111. }
  112. } else {
  113. /* Ignore unsupported binary-log. */
  114. event = new UnknownLogEvent(header);
  115. }
  116. /* consume this binary-log. */
  117. buffer.consume(len);
  118. return event;
  119. }
  120. }
  121. /* Rewind buffer's position to 0. */
  122. buffer.rewind();
  123. return null;
  124. }
  125. /**
  126. * Deserialize an event from buffer.
  127. *
  128. * @return <code>UknownLogEvent</code> if event type is unknown or skipped.
  129. */
  130. public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext context) throws IOException {
  131. FormatDescriptionLogEvent descriptionEvent = context.getFormatDescription();
  132. LogPosition logPosition = context.getLogPosition();
  133. int checksumAlg = LogEvent.BINLOG_CHECKSUM_ALG_UNDEF;
  134. if (header.getType() != LogEvent.FORMAT_DESCRIPTION_EVENT) {
  135. checksumAlg = descriptionEvent.header.getChecksumAlg();
  136. } else {
  137. // 如果是format事件自己,也需要处理checksum
  138. checksumAlg = header.getChecksumAlg();
  139. }
  140. if (checksumAlg != LogEvent.BINLOG_CHECKSUM_ALG_OFF && checksumAlg != LogEvent.BINLOG_CHECKSUM_ALG_UNDEF) {
  141. // remove checksum bytes
  142. buffer.limit(header.getEventLen() - LogEvent.BINLOG_CHECKSUM_LEN);
  143. }
  144. GTIDSet gtidSet = context.getGtidSet();
  145. switch (header.getType()) {
  146. case LogEvent.QUERY_EVENT: {
  147. QueryLogEvent event = new QueryLogEvent(header, buffer, descriptionEvent);
  148. /* updating position in context */
  149. logPosition.position = header.getLogPos();
  150. header.putGtidStr(context.getGtidSet());
  151. return event;
  152. }
  153. case LogEvent.XID_EVENT: {
  154. XidLogEvent event = new XidLogEvent(header, buffer, descriptionEvent);
  155. /* updating position in context */
  156. logPosition.position = header.getLogPos();
  157. header.putGtidStr(context.getGtidSet());
  158. return event;
  159. }
  160. case LogEvent.TABLE_MAP_EVENT: {
  161. TableMapLogEvent mapEvent = new TableMapLogEvent(header, buffer, descriptionEvent);
  162. /* updating position in context */
  163. logPosition.position = header.getLogPos();
  164. context.putTable(mapEvent);
  165. return mapEvent;
  166. }
  167. case LogEvent.WRITE_ROWS_EVENT_V1: {
  168. RowsLogEvent event = new WriteRowsLogEvent(header, buffer, descriptionEvent);
  169. /* updating position in context */
  170. logPosition.position = header.getLogPos();
  171. event.fillTable(context);
  172. return event;
  173. }
  174. case LogEvent.UPDATE_ROWS_EVENT_V1: {
  175. RowsLogEvent event = new UpdateRowsLogEvent(header, buffer, descriptionEvent);
  176. /* updating position in context */
  177. logPosition.position = header.getLogPos();
  178. event.fillTable(context);
  179. return event;
  180. }
  181. case LogEvent.DELETE_ROWS_EVENT_V1: {
  182. RowsLogEvent event = new DeleteRowsLogEvent(header, buffer, descriptionEvent);
  183. /* updating position in context */
  184. logPosition.position = header.getLogPos();
  185. event.fillTable(context);
  186. return event;
  187. }
  188. case LogEvent.ROTATE_EVENT: {
  189. RotateLogEvent event = new RotateLogEvent(header, buffer, descriptionEvent);
  190. /* updating position in context */
  191. logPosition = new LogPosition(event.getFilename(), event.getPosition());
  192. context.setLogPosition(logPosition);
  193. return event;
  194. }
  195. case LogEvent.LOAD_EVENT:
  196. case LogEvent.NEW_LOAD_EVENT: {
  197. LoadLogEvent event = new LoadLogEvent(header, buffer, descriptionEvent);
  198. /* updating position in context */
  199. logPosition.position = header.getLogPos();
  200. return event;
  201. }
  202. case LogEvent.SLAVE_EVENT: /* can never happen (unused event) */
  203. {
  204. if (logger.isWarnEnabled()) logger.warn("Skipping unsupported SLAVE_EVENT from: "
  205. + context.getLogPosition());
  206. break;
  207. }
  208. case LogEvent.CREATE_FILE_EVENT: {
  209. CreateFileLogEvent event = new CreateFileLogEvent(header, buffer, descriptionEvent);
  210. /* updating position in context */
  211. logPosition.position = header.getLogPos();
  212. return event;
  213. }
  214. case LogEvent.APPEND_BLOCK_EVENT: {
  215. AppendBlockLogEvent event = new AppendBlockLogEvent(header, buffer, descriptionEvent);
  216. /* updating position in context */
  217. logPosition.position = header.getLogPos();
  218. return event;
  219. }
  220. case LogEvent.DELETE_FILE_EVENT: {
  221. DeleteFileLogEvent event = new DeleteFileLogEvent(header, buffer, descriptionEvent);
  222. /* updating position in context */
  223. logPosition.position = header.getLogPos();
  224. return event;
  225. }
  226. case LogEvent.EXEC_LOAD_EVENT: {
  227. ExecuteLoadLogEvent event = new ExecuteLoadLogEvent(header, buffer, descriptionEvent);
  228. /* updating position in context */
  229. logPosition.position = header.getLogPos();
  230. return event;
  231. }
  232. case LogEvent.START_EVENT_V3: {
  233. /* This is sent only by MySQL <=4.x */
  234. StartLogEventV3 event = new StartLogEventV3(header, buffer, descriptionEvent);
  235. /* updating position in context */
  236. logPosition.position = header.getLogPos();
  237. return event;
  238. }
  239. case LogEvent.STOP_EVENT: {
  240. StopLogEvent event = new StopLogEvent(header, buffer, descriptionEvent);
  241. /* updating position in context */
  242. logPosition.position = header.getLogPos();
  243. return event;
  244. }
  245. case LogEvent.INTVAR_EVENT: {
  246. IntvarLogEvent event = new IntvarLogEvent(header, buffer, descriptionEvent);
  247. /* updating position in context */
  248. logPosition.position = header.getLogPos();
  249. return event;
  250. }
  251. case LogEvent.RAND_EVENT: {
  252. RandLogEvent event = new RandLogEvent(header, buffer, descriptionEvent);
  253. /* updating position in context */
  254. logPosition.position = header.getLogPos();
  255. header.putGtidStr(context.getGtidSet());
  256. return event;
  257. }
  258. case LogEvent.USER_VAR_EVENT: {
  259. UserVarLogEvent event = new UserVarLogEvent(header, buffer, descriptionEvent);
  260. /* updating position in context */
  261. logPosition.position = header.getLogPos();
  262. header.putGtidStr(context.getGtidSet());
  263. return event;
  264. }
  265. case LogEvent.FORMAT_DESCRIPTION_EVENT: {
  266. descriptionEvent = new FormatDescriptionLogEvent(header, buffer, descriptionEvent);
  267. context.setFormatDescription(descriptionEvent);
  268. return descriptionEvent;
  269. }
  270. case LogEvent.PRE_GA_WRITE_ROWS_EVENT: {
  271. if (logger.isWarnEnabled()) logger.warn("Skipping unsupported PRE_GA_WRITE_ROWS_EVENT from: "
  272. + context.getLogPosition());
  273. // ev = new Write_rows_log_event_old(buf, event_len,
  274. // description_event);
  275. break;
  276. }
  277. case LogEvent.PRE_GA_UPDATE_ROWS_EVENT: {
  278. if (logger.isWarnEnabled()) logger.warn("Skipping unsupported PRE_GA_UPDATE_ROWS_EVENT from: "
  279. + context.getLogPosition());
  280. // ev = new Update_rows_log_event_old(buf, event_len,
  281. // description_event);
  282. break;
  283. }
  284. case LogEvent.PRE_GA_DELETE_ROWS_EVENT: {
  285. if (logger.isWarnEnabled()) logger.warn("Skipping unsupported PRE_GA_DELETE_ROWS_EVENT from: "
  286. + context.getLogPosition());
  287. // ev = new Delete_rows_log_event_old(buf, event_len,
  288. // description_event);
  289. break;
  290. }
  291. case LogEvent.BEGIN_LOAD_QUERY_EVENT: {
  292. BeginLoadQueryLogEvent event = new BeginLoadQueryLogEvent(header, buffer, descriptionEvent);
  293. /* updating position in context */
  294. logPosition.position = header.getLogPos();
  295. return event;
  296. }
  297. case LogEvent.EXECUTE_LOAD_QUERY_EVENT: {
  298. ExecuteLoadQueryLogEvent event = new ExecuteLoadQueryLogEvent(header, buffer, descriptionEvent);
  299. /* updating position in context */
  300. logPosition.position = header.getLogPos();
  301. return event;
  302. }
  303. case LogEvent.INCIDENT_EVENT: {
  304. IncidentLogEvent event = new IncidentLogEvent(header, buffer, descriptionEvent);
  305. /* updating position in context */
  306. logPosition.position = header.getLogPos();
  307. return event;
  308. }
  309. case LogEvent.HEARTBEAT_LOG_EVENT: {
  310. HeartbeatLogEvent event = new HeartbeatLogEvent(header, buffer, descriptionEvent);
  311. /* updating position in context */
  312. logPosition.position = header.getLogPos();
  313. header.putGtidStr(context.getGtidSet());
  314. return event;
  315. }
  316. case LogEvent.IGNORABLE_LOG_EVENT: {
  317. IgnorableLogEvent event = new IgnorableLogEvent(header, buffer, descriptionEvent);
  318. /* updating position in context */
  319. logPosition.position = header.getLogPos();
  320. return event;
  321. }
  322. case LogEvent.ROWS_QUERY_LOG_EVENT: {
  323. RowsQueryLogEvent event = new RowsQueryLogEvent(header, buffer, descriptionEvent);
  324. /* updating position in context */
  325. logPosition.position = header.getLogPos();
  326. header.putGtidStr(context.getGtidSet());
  327. return event;
  328. }
  329. case LogEvent.WRITE_ROWS_EVENT: {
  330. RowsLogEvent event = new WriteRowsLogEvent(header, buffer, descriptionEvent);
  331. /* updating position in context */
  332. logPosition.position = header.getLogPos();
  333. event.fillTable(context);
  334. header.putGtidStr(context.getGtidSet());
  335. return event;
  336. }
  337. case LogEvent.UPDATE_ROWS_EVENT: {
  338. RowsLogEvent event = new UpdateRowsLogEvent(header, buffer, descriptionEvent);
  339. /* updating position in context */
  340. logPosition.position = header.getLogPos();
  341. event.fillTable(context);
  342. header.putGtidStr(context.getGtidSet());
  343. return event;
  344. }
  345. case LogEvent.DELETE_ROWS_EVENT: {
  346. RowsLogEvent event = new DeleteRowsLogEvent(header, buffer, descriptionEvent);
  347. /* updating position in context */
  348. logPosition.position = header.getLogPos();
  349. event.fillTable(context);
  350. header.putGtidStr(context.getGtidSet());
  351. return event;
  352. }
  353. case LogEvent.GTID_LOG_EVENT:
  354. case LogEvent.ANONYMOUS_GTID_LOG_EVENT: {
  355. GtidLogEvent event = new GtidLogEvent(header, buffer, descriptionEvent);
  356. /* updating position in context */
  357. logPosition.position = header.getLogPos();
  358. if (gtidSet != null) {
  359. gtidSet.update(event.getGtidStr());
  360. // update latest gtid
  361. header.putGtidStr(gtidSet);
  362. }
  363. return event;
  364. }
  365. case LogEvent.PREVIOUS_GTIDS_LOG_EVENT: {
  366. PreviousGtidsLogEvent event = new PreviousGtidsLogEvent(header, buffer, descriptionEvent);
  367. /* updating position in context */
  368. logPosition.position = header.getLogPos();
  369. return event;
  370. }
  371. case LogEvent.TRANSACTION_CONTEXT_EVENT: {
  372. TransactionContextLogEvent event = new TransactionContextLogEvent(header, buffer, descriptionEvent);
  373. /* updating position in context */
  374. logPosition.position = header.getLogPos();
  375. return event;
  376. }
  377. case LogEvent.VIEW_CHANGE_EVENT: {
  378. ViewChangeEvent event = new ViewChangeEvent(header, buffer, descriptionEvent);
  379. /* updating position in context */
  380. logPosition.position = header.getLogPos();
  381. return event;
  382. }
  383. case LogEvent.XA_PREPARE_LOG_EVENT: {
  384. XaPrepareLogEvent event = new XaPrepareLogEvent(header, buffer, descriptionEvent);
  385. /* updating position in context */
  386. logPosition.position = header.getLogPos();
  387. return event;
  388. }
  389. case LogEvent.ANNOTATE_ROWS_EVENT: {
  390. AnnotateRowsEvent event = new AnnotateRowsEvent(header, buffer, descriptionEvent);
  391. /* updating position in context */
  392. logPosition.position = header.getLogPos();
  393. header.putGtidStr(context.getGtidSet());
  394. return event;
  395. }
  396. case LogEvent.BINLOG_CHECKPOINT_EVENT: {
  397. BinlogCheckPointLogEvent event = new BinlogCheckPointLogEvent(header, buffer, descriptionEvent);
  398. /* updating position in context */
  399. logPosition.position = header.getLogPos();
  400. return event;
  401. }
  402. case LogEvent.GTID_EVENT: {
  403. MariaGtidLogEvent event = new MariaGtidLogEvent(header, buffer, descriptionEvent);
  404. /* updating position in context */
  405. logPosition.position = header.getLogPos();
  406. return event;
  407. }
  408. case LogEvent.GTID_LIST_EVENT: {
  409. MariaGtidListLogEvent event = new MariaGtidListLogEvent(header, buffer, descriptionEvent);
  410. /* updating position in context */
  411. logPosition.position = header.getLogPos();
  412. return event;
  413. }
  414. case LogEvent.START_ENCRYPTION_EVENT: {
  415. StartEncryptionLogEvent event = new StartEncryptionLogEvent(header, buffer, descriptionEvent);
  416. /* updating position in context */
  417. logPosition.position = header.getLogPos();
  418. return event;
  419. }
  420. default:
  421. /*
  422. * Create an object of Ignorable_log_event for unrecognized
  423. * sub-class. So that SLAVE SQL THREAD will only update the
  424. * position and continue.
  425. */
  426. if ((buffer.getUint16(LogEvent.FLAGS_OFFSET) & LogEvent.LOG_EVENT_IGNORABLE_F) > 0) {
  427. IgnorableLogEvent event = new IgnorableLogEvent(header, buffer, descriptionEvent);
  428. /* updating position in context */
  429. logPosition.position = header.getLogPos();
  430. return event;
  431. } else {
  432. if (logger.isWarnEnabled()) {
  433. logger.warn("Skipping unrecognized binlog event " + LogEvent.getTypeName(header.getType())
  434. + " from: " + context.getLogPosition());
  435. }
  436. }
  437. }
  438. /* updating position in context */
  439. logPosition.position = header.getLogPos();
  440. /* Unknown or unsupported log event */
  441. return new UnknownLogEvent(header);
  442. }
  443. }