Browse Source

fixed issue #928, support Dml.executeTime

七锋 6 years ago
parent
commit
017fe87e4b

+ 13 - 2
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/Dml.java

@@ -17,6 +17,9 @@ public class Dml implements Serializable {
     private String                    database;
     private String                    table;
     private String                    type;
+    // binlog executeTime
+    private Long                      es;
+    // dml build timeStamp
     private Long                      ts;
     private String                    sql;
     private List<Map<String, Object>> data;
@@ -78,6 +81,14 @@ public class Dml implements Serializable {
         this.old = old;
     }
 
+    public Long getEs() {
+        return es;
+    }
+
+    public void setEs(Long es) {
+        this.es = es;
+    }
+
     public void clear() {
         database = null;
         table = null;
@@ -90,7 +101,7 @@ public class Dml implements Serializable {
 
     @Override
     public String toString() {
-        return "Dml{" + "database='" + database + '\'' + ", table='" + table + '\'' + ", type='" + type + '\''
-               + ", ts=" + ts + ", sql='" + sql + '\'' + ", data=" + data + ", old=" + old + '}';
+        return "Dml [database=" + database + ", table=" + table + ", type=" + type + ", es=" + es + ", ts=" + ts
+               + ", sql=" + sql + ", data=" + data + ", old=" + old + "]";
     }
 }

+ 2 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/MessageUtil.java

@@ -45,6 +45,7 @@ public class MessageUtil {
             dml.setDatabase(entry.getHeader().getSchemaName());
             dml.setTable(entry.getHeader().getTableName());
             dml.setType(eventType.toString());
+            dml.setEs(entry.getHeader().getExecuteTime());
             dml.setTs(System.currentTimeMillis());
             dml.setSql(rowChange.getSql());
             List<Map<String, Object>> data = new ArrayList<>();
@@ -118,6 +119,7 @@ public class MessageUtil {
         dml.setTable(flatMessage.getTable());
         dml.setType(flatMessage.getType());
         dml.setTs(flatMessage.getTs());
+        dml.setEs(flatMessage.getEs());
         dml.setSql(flatMessage.getSql());
         if (flatMessage.getSqlType() == null || flatMessage.getMysqlType() == null) {
             throw new RuntimeException("SqlType or mysqlType is null");

+ 25 - 7
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -1,7 +1,12 @@
 package com.alibaba.otter.canal.protocol;
 
 import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import com.google.protobuf.ByteString;
 
@@ -18,6 +23,9 @@ public class FlatMessage implements Serializable {
     private String                    table;
     private Boolean                   isDdl;
     private String                    type;
+    // binlog executeTime
+    private Long                      es;
+    // dml build timeStamp
     private Long                      ts;
     private String                    sql;
     private Map<String, Integer>      sqlType;
@@ -120,6 +128,14 @@ public class FlatMessage implements Serializable {
         this.old = old;
     }
 
+    public Long getEs() {
+        return es;
+    }
+
+    public void setEs(Long es) {
+        this.es = es;
+    }
+
     /**
      * 将Message转换为FlatMessage
      * 
@@ -147,9 +163,8 @@ public class FlatMessage implements Serializable {
                 try {
                     rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                 } catch (Exception e) {
-                    throw new RuntimeException(
-                        "ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
-                        e);
+                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
+                                               + entry.toString(), e);
                 }
 
                 CanalEntry.EventType eventType = rowChange.getEventType();
@@ -160,6 +175,7 @@ public class FlatMessage implements Serializable {
                 flatMessage.setTable(entry.getHeader().getTableName());
                 flatMessage.setIsDdl(rowChange.getIsDdl());
                 flatMessage.setType(eventType.toString());
+                flatMessage.setEs(entry.getHeader().getExecuteTime());
                 flatMessage.setTs(System.currentTimeMillis());
                 flatMessage.setSql(rowChange.getSql());
 
@@ -273,6 +289,8 @@ public class FlatMessage implements Serializable {
                         flatMessageTmp.setSql(flatMessage.getSql());
                         flatMessageTmp.setSqlType(flatMessage.getSqlType());
                         flatMessageTmp.setMysqlType(flatMessage.getMysqlType());
+                        flatMessageTmp.setEs(flatMessage.getEs());
+                        flatMessageTmp.setTs(flatMessage.getTs());
                     }
                     List<Map<String, String>> data = flatMessageTmp.getData();
                     if (data == null) {
@@ -297,8 +315,8 @@ public class FlatMessage implements Serializable {
 
     @Override
     public String toString() {
-        return "FlatMessage{" + "id=" + id + ", database='" + database + '\'' + ", table='" + table + '\'' + ", isDdl="
-               + isDdl + ", type='" + type + '\'' + ", ts=" + ts + ", sql='" + sql + '\'' + ", sqlType=" + sqlType
-               + ", mysqlType=" + mysqlType + ", data=" + data + ", old=" + old + '}';
+        return "FlatMessage [id=" + id + ", database=" + database + ", table=" + table + ", isDdl=" + isDdl + ", type="
+               + type + ", es=" + es + ", ts=" + ts + ", sql=" + sql + ", sqlType=" + sqlType + ", mysqlType="
+               + mysqlType + ", data=" + data + ", old=" + old + "]";
     }
 }