Browse Source

fixed testcase

agapple 8 years ago
parent
commit
5d9d5b1e44

+ 73 - 56
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParserTest.java

@@ -13,6 +13,7 @@ import com.alibaba.otter.canal.parse.stub.AbstractCanalEventSinkTest;
 import com.alibaba.otter.canal.parse.stub.AbstractCanalLogPositionManager;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
+import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.alibaba.otter.canal.protocol.position.LogIdentity;
 import com.alibaba.otter.canal.protocol.position.LogPosition;
@@ -22,8 +23,8 @@ public class MysqlEventParserTest {
 
     private static final String DETECTING_SQL = "insert into retl.xdual values(1,now()) on duplicate key update x=now()";
     private static final String MYSQL_ADDRESS = "127.0.0.1";
-    private static final String USERNAME      = "xxxxx";
-    private static final String PASSWORD      = "xxxxx";
+    private static final String USERNAME      = "root";
+    private static final String PASSWORD      = "xxxxxx";
 
     @Test
     public void test_position() throws InterruptedException {
@@ -45,20 +46,23 @@ public class MysqlEventParserTest {
             public boolean sink(List<Entry> entrys, InetSocketAddress remoteAddress, String destination)
                                                                                                         throws CanalSinkException {
                 for (Entry entry : entrys) {
-                    entryCount.incrementAndGet();
-                    String logfilename = entry.getHeader().getLogfileName();
-                    long logfileoffset = entry.getHeader().getLogfileOffset();
-                    long executeTime = entry.getHeader().getExecuteTime();
-                    entryPosition.setJournalName(logfilename);
-                    entryPosition.setPosition(logfileoffset);
-                    entryPosition.setTimestamp(executeTime);
-
-                    break;
+                    if (entry.getEntryType() != EntryType.HEARTBEAT) {
+                        entryCount.incrementAndGet();
+                        String logfilename = entry.getHeader().getLogfileName();
+                        long logfileoffset = entry.getHeader().getLogfileOffset();
+                        long executeTime = entry.getHeader().getExecuteTime();
+                        entryPosition.setJournalName(logfilename);
+                        entryPosition.setPosition(logfileoffset);
+                        entryPosition.setTimestamp(executeTime);
+                        break;
+                    }
                 }
 
-                controller.stop();
-                timeoutChecker.stop();
-                timeoutChecker.touch();
+                if (entryCount.get() > 0) {
+                    controller.stop();
+                    timeoutChecker.stop();
+                    timeoutChecker.touch();
+                }
                 return true;
             }
         });
@@ -92,14 +96,14 @@ public class MysqlEventParserTest {
 
     @Test
     public void test_timestamp() throws InterruptedException {
-        final TimeoutChecker timeoutChecker = new TimeoutChecker(30 * 1000);
+        final TimeoutChecker timeoutChecker = new TimeoutChecker(3000 * 1000);
         final AtomicLong entryCount = new AtomicLong(0);
         final EntryPosition entryPosition = new EntryPosition();
 
         final MysqlEventParser controller = new MysqlEventParser();
-        final EntryPosition defaultPosition = buildPosition(null, null, 1322803601000L);
+        final EntryPosition defaultPosition = buildPosition(null, null, 1475116855000L);
         controller.setSlaveId(3344L);
-        controller.setDetectingEnable(true);
+        controller.setDetectingEnable(false);
         controller.setDetectingSQL(DETECTING_SQL);
         controller.setMasterInfo(buildAuthentication());
         controller.setMasterPosition(defaultPosition);
@@ -109,22 +113,25 @@ public class MysqlEventParserTest {
             public boolean sink(List<Entry> entrys, InetSocketAddress remoteAddress, String destination)
                                                                                                         throws CanalSinkException {
                 for (Entry entry : entrys) {
-                    entryCount.incrementAndGet();
-
-                    String logfilename = entry.getHeader().getLogfileName();
-                    long logfileoffset = entry.getHeader().getLogfileOffset();
-                    long executeTime = entry.getHeader().getExecuteTime();
-
-                    entryPosition.setJournalName(logfilename);
-                    entryPosition.setPosition(logfileoffset);
-                    entryPosition.setTimestamp(executeTime);
-
-                    break;
+                    if (entry.getEntryType() != EntryType.HEARTBEAT) {
+                        entryCount.incrementAndGet();
+
+                        String logfilename = entry.getHeader().getLogfileName();
+                        long logfileoffset = entry.getHeader().getLogfileOffset();
+                        long executeTime = entry.getHeader().getExecuteTime();
+
+                        entryPosition.setJournalName(logfilename);
+                        entryPosition.setPosition(logfileoffset);
+                        entryPosition.setTimestamp(executeTime);
+                        break;
+                    }
                 }
 
-                controller.stop();
-                timeoutChecker.stop();
-                timeoutChecker.touch();
+                if (entryCount.get() > 0) {
+                    controller.stop();
+                    timeoutChecker.stop();
+                    timeoutChecker.touch();
+                }
                 return true;
             }
         });
@@ -174,21 +181,26 @@ public class MysqlEventParserTest {
             public boolean sink(List<Entry> entrys, InetSocketAddress remoteAddress, String destination)
                                                                                                         throws CanalSinkException {
                 for (Entry entry : entrys) {
-                    entryCount.incrementAndGet();
+                    if (entry.getEntryType() != EntryType.HEARTBEAT) {
+
+                        entryCount.incrementAndGet();
 
-                    String logfilename = entry.getHeader().getLogfileName();
-                    long logfileoffset = entry.getHeader().getLogfileOffset();
-                    long executeTime = entry.getHeader().getExecuteTime();
+                        String logfilename = entry.getHeader().getLogfileName();
+                        long logfileoffset = entry.getHeader().getLogfileOffset();
+                        long executeTime = entry.getHeader().getExecuteTime();
 
-                    entryPosition.setJournalName(logfilename);
-                    entryPosition.setPosition(logfileoffset);
-                    entryPosition.setTimestamp(executeTime);
-                    break;
+                        entryPosition.setJournalName(logfilename);
+                        entryPosition.setPosition(logfileoffset);
+                        entryPosition.setTimestamp(executeTime);
+                        break;
+                    }
                 }
 
-                controller.stop();
-                timeoutChecker.stop();
-                timeoutChecker.touch();
+                if (entryCount.get() > 0) {
+                    controller.stop();
+                    timeoutChecker.stop();
+                    timeoutChecker.touch();
+                }
                 return true;
             }
         });
@@ -244,22 +256,27 @@ public class MysqlEventParserTest {
             public boolean sink(List<Entry> entrys, InetSocketAddress remoteAddress, String destination)
                                                                                                         throws CanalSinkException {
                 for (Entry entry : entrys) {
-                    entryCount.incrementAndGet();
-
-                    // String logfilename = entry.getHeader().getLogfileName();
-                    // long logfileoffset =
-                    // entry.getHeader().getLogfileOffset();
-                    long executeTime = entry.getHeader().getExecuteTime();
-
-                    // entryPosition.setJournalName(logfilename);
-                    // entryPosition.setPosition(logfileoffset);
-                    entryPosition.setTimestamp(executeTime);
-                    break;
+                    if (entry.getEntryType() != EntryType.HEARTBEAT) {
+                        entryCount.incrementAndGet();
+
+                        // String logfilename =
+                        // entry.getHeader().getLogfileName();
+                        // long logfileoffset =
+                        // entry.getHeader().getLogfileOffset();
+                        long executeTime = entry.getHeader().getExecuteTime();
+
+                        // entryPosition.setJournalName(logfilename);
+                        // entryPosition.setPosition(logfileoffset);
+                        entryPosition.setTimestamp(executeTime);
+                        break;
+                    }
                 }
 
-                controller.stop();
-                timeoutChecker.stop();
-                timeoutChecker.touch();
+                if (entryCount.get() > 0) {
+                    controller.stop();
+                    timeoutChecker.stop();
+                    timeoutChecker.touch();
+                }
                 return true;
             }
         });