瀏覽代碼

fix upgrade proto2 to proto3

WithLin 6 年之前
父節點
當前提交
ca61be5f5a

+ 1 - 1
client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

@@ -159,7 +159,7 @@ public class SimpleCanalConnector implements CanalConnector {
             }
             }
             //
             //
             Handshake handshake = Handshake.parseFrom(p.getBody());
             Handshake handshake = Handshake.parseFrom(p.getBody());
-            supportedCompressions.addAll(handshake.getSupportedCompressionsList());
+            supportedCompressions.add(handshake.getSupportedCompressions());
             //
             //
             ClientAuth ca = ClientAuth.newBuilder()
             ClientAuth ca = ClientAuth.newBuilder()
                 .setUsername(username != null ? username : "")
                 .setUsername(username != null ? username : "")

+ 114 - 114
client/src/test/java/com/alibaba/otter/canal/client/running/ClientRunningTest.java

@@ -20,117 +20,117 @@ import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
 
 
 public class ClientRunningTest extends AbstractZkTest {
 public class ClientRunningTest extends AbstractZkTest {
 
 
-    private ZkClientx zkclientx = new ZkClientx(cluster1 + ";" + cluster2);
-    private short     clientId  = 1001;
-
-    @Before
-    public void setUp() {
-        String path = ZookeeperPathUtils.getDestinationPath(destination);
-        zkclientx.deleteRecursive(path);
-
-        zkclientx.createPersistent(ZookeeperPathUtils.getClientIdNodePath(this.destination, clientId), true);
-    }
-
-    @After
-    public void tearDown() {
-        String path = ZookeeperPathUtils.getDestinationPath(destination);
-        zkclientx.deleteRecursive(path);
-    }
-
-    @Test
-    public void testOneServer() {
-        final CountDownLatch countLatch = new CountDownLatch(2);
-        ClientRunningMonitor runningMonitor = buildClientRunning(countLatch, clientId, 2088);
-        runningMonitor.start();
-        sleep(2000L);
-        runningMonitor.stop();
-        sleep(2000L);
-
-        if (countLatch.getCount() != 0) {
-            Assert.fail();
-        }
-    }
-
-    @Test
-    public void testMultiServer() {
-        final CountDownLatch countLatch = new CountDownLatch(30);
-        final ClientRunningMonitor runningMonitor1 = buildClientRunning(countLatch, clientId, 2088);
-        final ClientRunningMonitor runningMonitor2 = buildClientRunning(countLatch, clientId, 2089);
-        final ClientRunningMonitor runningMonitor3 = buildClientRunning(countLatch, clientId, 2090);
-        final ExecutorService executor = Executors.newFixedThreadPool(3);
-        executor.submit(new Runnable() {
-
-            public void run() {
-                for (int i = 0; i < 10; i++) {
-                    if (!runningMonitor1.isStart()) {
-                        runningMonitor1.start();
-                    }
-                    sleep(2000L + RandomUtils.nextInt(500));
-                    runningMonitor1.stop();
-                    sleep(2000L + RandomUtils.nextInt(500));
-                }
-            }
-
-        });
-
-        executor.submit(new Runnable() {
-
-            public void run() {
-                for (int i = 0; i < 10; i++) {
-                    if (!runningMonitor2.isStart()) {
-                        runningMonitor2.start();
-                    }
-                    sleep(2000L + RandomUtils.nextInt(500));
-                    runningMonitor2.stop();
-                    sleep(2000L + RandomUtils.nextInt(500));
-                }
-            }
-
-        });
-
-        executor.submit(new Runnable() {
-
-            public void run() {
-                for (int i = 0; i < 10; i++) {
-                    if (!runningMonitor3.isStart()) {
-                        runningMonitor3.start();
-                    }
-                    sleep(2000L + RandomUtils.nextInt(500));
-                    runningMonitor3.stop();
-                    sleep(2000L + RandomUtils.nextInt(500));
-                }
-            }
-
-        });
-
-        sleep(30000L);
-    }
-
-    private ClientRunningMonitor buildClientRunning(final CountDownLatch countLatch, final short clientId,
-                                                    final int port) {
-        ClientRunningData clientData = new ClientRunningData();
-        clientData.setClientId(clientId);
-        clientData.setAddress(AddressUtils.getHostIp());
-
-        ClientRunningMonitor runningMonitor = new ClientRunningMonitor();
-        runningMonitor.setDestination(destination);
-        runningMonitor.setZkClient(zkclientx);
-        runningMonitor.setClientData(clientData);
-        runningMonitor.setListener(new ClientRunningListener() {
-
-            public InetSocketAddress processActiveEnter() {
-                System.out.println(String.format("clientId:%s port:%s has start", clientId, port));
-                countLatch.countDown();
-                return new InetSocketAddress(AddressUtils.getHostIp(), port);
-            }
-
-            public void processActiveExit() {
-                countLatch.countDown();
-                System.out.println(String.format("clientId:%s port:%s has stop", clientId, port));
-            }
-
-        });
-        runningMonitor.setDelayTime(1);
-        return runningMonitor;
-    }
-}
+	private ZkClientx zkclientx = new ZkClientx(cluster1 + ";" + cluster2);
+	private short     clientId  = 1001;
+
+	@Before
+	public void setUp() {
+		String path = ZookeeperPathUtils.getDestinationPath(destination);
+		zkclientx.deleteRecursive(path);
+
+		zkclientx.createPersistent(ZookeeperPathUtils.getClientIdNodePath(this.destination, clientId), true);
+	}
+
+	@After
+	public void tearDown() {
+		String path = ZookeeperPathUtils.getDestinationPath(destination);
+		zkclientx.deleteRecursive(path);
+	}
+
+	@Test
+	public void testOneServer() {
+		final CountDownLatch countLatch = new CountDownLatch(2);
+		ClientRunningMonitor runningMonitor = buildClientRunning(countLatch, clientId, 2088);
+		runningMonitor.start();
+		sleep(2000L);
+		runningMonitor.stop();
+		sleep(2000L);
+
+		if (countLatch.getCount() != 0) {
+			Assert.fail();
+		}
+	}
+
+	@Test
+	public void testMultiServer() {
+		final CountDownLatch countLatch = new CountDownLatch(30);
+		final ClientRunningMonitor runningMonitor1 = buildClientRunning(countLatch, clientId, 2088);
+		final ClientRunningMonitor runningMonitor2 = buildClientRunning(countLatch, clientId, 2089);
+		final ClientRunningMonitor runningMonitor3 = buildClientRunning(countLatch, clientId, 2090);
+		final ExecutorService executor = Executors.newFixedThreadPool(3);
+		executor.submit(new Runnable() {
+
+			public void run() {
+				for (int i = 0; i < 10; i++) {
+					if (!runningMonitor1.isStart()) {
+						runningMonitor1.start();
+					}
+					sleep(2000L + RandomUtils.nextInt(500));
+					runningMonitor1.stop();
+					sleep(2000L + RandomUtils.nextInt(500));
+				}
+			}
+
+		});
+
+		executor.submit(new Runnable() {
+
+			public void run() {
+				for (int i = 0; i < 10; i++) {
+					if (!runningMonitor2.isStart()) {
+						runningMonitor2.start();
+					}
+					sleep(2000L + RandomUtils.nextInt(500));
+					runningMonitor2.stop();
+					sleep(2000L + RandomUtils.nextInt(500));
+				}
+			}
+
+		});
+
+		executor.submit(new Runnable() {
+
+			public void run() {
+				for (int i = 0; i < 10; i++) {
+					if (!runningMonitor3.isStart()) {
+						runningMonitor3.start();
+					}
+					sleep(2000L + RandomUtils.nextInt(500));
+					runningMonitor3.stop();
+					sleep(2000L + RandomUtils.nextInt(500));
+				}
+			}
+
+		});
+
+		sleep(30000L);
+	}
+
+	private ClientRunningMonitor buildClientRunning(final CountDownLatch countLatch, final short clientId,
+																									final int port) {
+		ClientRunningData clientData = new ClientRunningData();
+		clientData.setClientId(clientId);
+		clientData.setAddress(AddressUtils.getHostIp());
+
+		ClientRunningMonitor runningMonitor = new ClientRunningMonitor();
+		runningMonitor.setDestination(destination);
+		runningMonitor.setZkClient(zkclientx);
+		runningMonitor.setClientData(clientData);
+		runningMonitor.setListener(new ClientRunningListener() {
+
+			public InetSocketAddress processActiveEnter() {
+				System.out.println(String.format("clientId:%s port:%s has start", clientId, port));
+				countLatch.countDown();
+				return new InetSocketAddress(AddressUtils.getHostIp(), port);
+			}
+
+			public void processActiveExit() {
+				countLatch.countDown();
+				System.out.println(String.format("clientId:%s port:%s has stop", clientId, port));
+			}
+
+		});
+		runningMonitor.setDelayTime(1);
+		return runningMonitor;
+	}
+}

文件差異過大導致無法顯示
+ 259 - 228
protocol/src/main/java/com/alibaba/otter/canal/protocol/CanalEntry.java


文件差異過大導致無法顯示
+ 462 - 249
protocol/src/main/java/com/alibaba/otter/canal/protocol/CanalPacket.java


+ 82 - 43
protocol/src/main/java/com/alibaba/otter/canal/protocol/CanalProtocol.proto

@@ -1,4 +1,4 @@
-syntax = "proto2";
+syntax = "proto3";
 package com.alibaba.otter.canal.protocol;
 package com.alibaba.otter.canal.protocol;
 
 
 option java_package = "com.alibaba.otter.canal.protocol";
 option java_package = "com.alibaba.otter.canal.protocol";
@@ -6,6 +6,7 @@ option java_outer_classname = "CanalPacket";
 option optimize_for = SPEED;
 option optimize_for = SPEED;
 
 
 enum Compression {
 enum Compression {
+    COMPRESSIONCOMPATIBLEPROTO2 = 0;
     NONE = 1;
     NONE = 1;
     ZLIB = 2;
     ZLIB = 2;
     GZIP = 3;
     GZIP = 3;
@@ -13,6 +14,8 @@ enum Compression {
 }
 }
 
 
 enum PacketType {
 enum PacketType {
+    //compatible
+    PACKAGETYPECOMPATIBLEPROTO2 = 0;
     HANDSHAKE = 1;
     HANDSHAKE = 1;
     CLIENTAUTHENTICATION = 2;
     CLIENTAUTHENTICATION = 2;
     ACK = 3;
     ACK = 3;
@@ -30,86 +33,122 @@ enum PacketType {
 }
 }
 
 
 message Packet {
 message Packet {
-    optional int32 magic_number = 1 [default = 17];
-    optional int32 version = 2 [default = 1];
-    optional PacketType type = 3;
-    optional Compression compression = 4 [default = NONE];
-    optional bytes body = 5;
+     //[default = 17];
+     oneof magic_number_present {
+         int32 magic_number = 1;
+     }
+     //[default = 1];
+     oneof version_present {
+          int32 version = 2;
+     };
+     PacketType type = 3;
+     //[default = NONE];
+     oneof compression_present {
+          Compression compression = 4;
+     }
+
+     bytes body = 5;
 }
 }
 
 
 message HeartBeat {
 message HeartBeat {
-    optional int64 send_timestamp = 1;
-    optional int64 start_timestamp = 2;
+     int64 send_timestamp = 1;
+     int64 start_timestamp = 2;
 }
 }
 
 
 message Handshake {
 message Handshake {
-    optional string communication_encoding = 1 [default = "utf8"];
-    optional bytes seeds = 2;
-    repeated Compression supported_compressions = 3;
+    //  [default = "utf8"];
+    oneof communication_encoding_present {
+        string communication_encoding = 1;
+    }
+     bytes seeds = 2;
+     Compression supported_compressions = 3;
 }
 }
 
 
 // client authentication
 // client authentication
 message ClientAuth {
 message ClientAuth {
-    optional string username = 1;
-    optional bytes password = 2; // hashed password with seeds from Handshake message
-    optional int32 net_read_timeout = 3 [default = 0]; // in seconds
-    optional int32 net_write_timeout = 4 [default = 0]; // in seconds
-    optional string destination = 5;
-    optional string client_id = 6;
-    optional string filter = 7;
-    optional int64 start_timestamp = 8;
+    string username = 1;
+    bytes password = 2; // hashed password with seeds from Handshake message
+    // [default = 0]
+    oneof net_read_timeout_present {
+         int32 net_read_timeout = 3; // in seconds
+    }
+    // [default = 0];
+    oneof net_write_timeout_present {
+        int32 net_write_timeout = 4; // in seconds
+    }
+    string destination = 5;
+    string client_id = 6;
+    string filter = 7;
+    int64 start_timestamp = 8;
 }
 }
 
 
 message Ack {
 message Ack {
-    optional int32 error_code = 1 [default = 0];
-    optional string error_message = 2; // if something like compression is not supported, erorr_message will tell about it.
+    //[default = 0]
+    oneof error_code_present {
+        int32 error_code = 1;
+    }
+    string error_message = 2; // if something like compression is not supported, erorr_message will tell about it.
 }
 }
 
 
 message ClientAck {
 message ClientAck {
-    optional string destination = 1;
-    optional string client_id = 2;
-    optional int64 batch_id = 3;
+    string destination = 1;
+    string client_id = 2;
+    int64 batch_id = 3;
 }
 }
 
 
 // subscription
 // subscription
 message Sub {
 message Sub {
-    optional string destination = 1;
-    optional string client_id = 2;
-    optional string filter = 7;
+    string destination = 1;
+    string client_id = 2;
+    string filter = 7;
 }
 }
 
 
 // Unsubscription
 // Unsubscription
 message Unsub {
 message Unsub {
-    optional string destination = 1;
-    optional string client_id = 2;
-    optional string filter = 7;
+    string destination = 1;
+    string client_id = 2;
+    string filter = 7;
 }
 }
 
 
 //  PullRequest
 //  PullRequest
 message Get {
 message Get {
-    optional string destination = 1;
-    optional string client_id = 2;
-    optional int32 fetch_size = 3;
-    optional int64 timeout = 4 [default = -1]; // 默认-1时代表不控制
-    optional int32 unit = 5 [default = 2];// 数字类型,0:纳秒,1:毫秒,2:微秒,3:秒,4:分钟,5:小时,6:天
-    optional bool auto_ack = 6 [default = false]; // 是否自动ack
+    string destination = 1;
+    string client_id = 2;
+    int32 fetch_size = 3;
+    //[default = -1]
+    oneof timeout_present {
+        int64 timeout = 4; // 默认-1时代表不控制
+    }
+    //[default = 2]
+    oneof unit_present {
+        int32 unit = 5;// 数字类型,0:纳秒,1:毫秒,2:微秒,3:秒,4:分钟,5:小时,6:天
+    }
+    //[default = false]
+    oneof auto_ack_present {
+        bool auto_ack = 6; // 是否自动ack
+    }
+
 }
 }
 
 
 //
 //
 message Messages {
 message Messages {
-	optional int64 batch_id = 1;
+	int64 batch_id = 1;
     repeated bytes messages = 2;
     repeated bytes messages = 2;
 }
 }
 
 
 // TBD when new packets are required
 // TBD when new packets are required
 message Dump{
 message Dump{
-    optional string journal = 1;
-    optional int64  position = 2;
-    optional int64 timestamp = 3 [default = 0];
+    string journal = 1;
+    int64  position = 2;
+    // [default = 0]
+    oneof timestamp_present {
+        int64 timestamp = 3;
+    }
+
 }
 }
 
 
 message ClientRollback{
 message ClientRollback{
-    optional string destination = 1;
-    optional string client_id = 2;
-    optional int64 batch_id = 3;
+    string destination = 1;
+    string client_id = 2;
+    int64 batch_id = 3;
 }
 }

+ 107 - 84
protocol/src/main/java/com/alibaba/otter/canal/protocol/EntryProtocol.proto

@@ -1,4 +1,4 @@
-syntax = "proto2";
+syntax = "proto3";
 package com.alibaba.otter.canal.protocol;
 package com.alibaba.otter.canal.protocol;
 
 
 option java_package = "com.alibaba.otter.canal.protocol";
 option java_package = "com.alibaba.otter.canal.protocol";
@@ -11,88 +11,101 @@ option optimize_for = SPEED;
  ****************************************************************/
  ****************************************************************/
 message Entry {
 message Entry {
 	/**协议头部信息**/
 	/**协议头部信息**/
-	optional Header						header 				= 1;
-	
-	/**打散后的事件类型**/
-	optional EntryType					entryType			= 2 [default = ROWDATA];
-	
+     Header						header 				= 1;
+	///**打散后的事件类型**/ [default = ROWDATA]
+	oneof entryType_present{
+		EntryType					entryType			= 2;
+	}
+
 	/**传输的二进制数组**/
 	/**传输的二进制数组**/
-	optional bytes						storeValue			= 3;
+	bytes						storeValue			= 3;
 }
 }
 
 
 /**message Header**/
 /**message Header**/
 message Header {
 message Header {
-	/**协议的版本号**/
-	optional int32 					version				= 1 [default = 1];	
-	
+	/**协议的版本号**/  //[default = 1]
+	oneof version_present {
+		int32 					version				= 1;
+	}
+
+
 	/**binlog/redolog 文件名**/
 	/**binlog/redolog 文件名**/
-	optional string					logfileName			= 2;
-	
+	string					logfileName			= 2;
+
 	/**binlog/redolog 文件的偏移位置**/
 	/**binlog/redolog 文件的偏移位置**/
-	optional int64 					logfileOffset		= 3;
-	
+	int64 					logfileOffset		= 3;
+
 	/**服务端serverId**/
 	/**服务端serverId**/
-	optional 	int64				serverId         	= 4;	
-	
+	int64				serverId         	= 4;
+
 	/** 变更数据的编码 **/
 	/** 变更数据的编码 **/
-	optional string					serverenCode		= 5;
-	
+	string					serverenCode		= 5;
+
 	/**变更数据的执行时间 **/
 	/**变更数据的执行时间 **/
-	optional int64					executeTime			= 6;
-	
-	/** 变更数据的来源**/
-	optional Type					sourceType			= 7 [default = MYSQL];
-	
+	int64					executeTime			= 6;
+
+	/** 变更数据的来源**/ //[default = MYSQL]
+	oneof sourceType_present {
+		Type					sourceType			= 7;
+	}
+
+
 	/** 变更数据的schemaname**/
 	/** 变更数据的schemaname**/
-	optional string					schemaName			= 8;
-	
+	string					schemaName			= 8;
+
 	/**变更数据的tablename**/
 	/**变更数据的tablename**/
-	optional string					tableName			= 9;	
-	
+	string					tableName			= 9;
+
 	/**每个event的长度**/
 	/**每个event的长度**/
-	optional int64					eventLength         = 10;
-	
-	/**数据变更类型**/
-	optional EventType 				eventType			= 11 [default = UPDATE];
-	
+	int64					eventLength         = 10;
+
+	/**数据变更类型**/  // [default = UPDATE]
+	oneof eventType_present {
+		EventType 				eventType			= 11;
+	}
+
+
 	/**预留扩展**/
 	/**预留扩展**/
 	repeated Pair					props				= 12;
 	repeated Pair					props				= 12;
 
 
     /**当前事务的gitd**/
     /**当前事务的gitd**/
-	optional string                 gtid                = 13;
+	string                 gtid                = 13;
 }
 }
 
 
 /**每个字段的数据结构**/
 /**每个字段的数据结构**/
 message Column {
 message Column {
 	/**字段下标**/
 	/**字段下标**/
-	optional int32		index			= 		1;
-	
+	int32		index			= 		1;
+
 	/**字段java中类型**/
 	/**字段java中类型**/
-	optional int32 		sqlType			= 		2;
-	
+	int32 		sqlType			= 		2;
+
 	/**字段名称(忽略大小写),在mysql中是没有的**/
 	/**字段名称(忽略大小写),在mysql中是没有的**/
-	optional string		name			=		3;
-	
+	string		name			=		3;
+
 	/**是否是主键**/
 	/**是否是主键**/
-	optional bool 		isKey			= 		4;
-	
+	bool 		isKey			= 		4;
+
 	/**如果EventType=UPDATE,用于标识这个字段值是否有修改**/
 	/**如果EventType=UPDATE,用于标识这个字段值是否有修改**/
-	optional bool		updated			= 		5;
-	
-	/** 标识是否为空  **/
-	optional bool		isNull			= 		6 [default = false];
-	
+	bool		updated			= 		5;
+
+	/** 标识是否为空  **/ //[default = false]
+	oneof isNull_present {
+		bool		isNull			= 		6;
+	}
+
+
 	/**预留扩展**/
 	/**预留扩展**/
-	repeated Pair		props			=		7;	
-	
+	repeated Pair		props			=		7;
+
 	/** 字段值,timestamp,Datetime是一个时间格式的文本 **/
 	/** 字段值,timestamp,Datetime是一个时间格式的文本 **/
-	optional string		value			= 		8;
-	
+	string		value			= 		8;
+
 	/** 对应数据对象原始长度 **/
 	/** 对应数据对象原始长度 **/
-	optional int32		length			= 		9;
-	
+	int32		length			= 		9;
+
 	/**字段mysql类型**/
 	/**字段mysql类型**/
-	optional string		mysqlType		= 		10;
+	string		mysqlType		= 		10;
 }
 }
 
 
 message RowData {
 message RowData {
@@ -102,73 +115,81 @@ message RowData {
 
 
 	/** 字段信息,增量数据(修改后,新增后)  **/
 	/** 字段信息,增量数据(修改后,新增后)  **/
 	repeated Column			afterColumns	= 		2;
 	repeated Column			afterColumns	= 		2;
-	
+
 	/**预留扩展**/
 	/**预留扩展**/
-	repeated Pair			props			=		3;	
+	repeated Pair			props			=		3;
 }
 }
 
 
 /**message row 每行变更数据的数据结构**/
 /**message row 每行变更数据的数据结构**/
 message RowChange {
 message RowChange {
 
 
 	/**tableId,由数据库产生**/
 	/**tableId,由数据库产生**/
-	optional int64	 		tableId			=		1;
-	
-	/**数据变更类型**/
-	optional EventType 		eventType		= 		2 [default = UPDATE];
-	
-	/** 标识是否是ddl语句  **/
-	optional bool			isDdl			= 		10 [default = false];
+	int64	 		tableId			=		1;
+
+
+	/**数据变更类型**/ //[default = UPDATE]
+	oneof eventType_present {
+		EventType 		eventType		= 		2;
+	}
+
+
+	/** 标识是否是ddl语句  **/ // [default = false]
+	oneof isDdl_present {
+		bool			isDdl			= 		10;
+	}
+
 
 
 	/** ddl/query的sql语句  **/
 	/** ddl/query的sql语句  **/
-	optional string			sql 			= 		11;
-	
+	string			sql 			= 		11;
+
 	/** 一次数据库变更可能存在多行  **/
 	/** 一次数据库变更可能存在多行  **/
 	repeated RowData		rowDatas		= 		12;
 	repeated RowData		rowDatas		= 		12;
-	
+
 	/**预留扩展**/
 	/**预留扩展**/
-	repeated Pair			props			=		13;	
-	
+	repeated Pair			props			=		13;
+
 	/** ddl/query的schemaName,会存在跨库ddl,需要保留执行ddl的当前schemaName  **/
 	/** ddl/query的schemaName,会存在跨库ddl,需要保留执行ddl的当前schemaName  **/
-	optional string			ddlSchemaName 	= 		14;
+	string			ddlSchemaName 	= 		14;
 }
 }
 
 
 /**开始事务的一些信息**/
 /**开始事务的一些信息**/
 message TransactionBegin{
 message TransactionBegin{
-	
+
 	/**已废弃,请使用header里的executeTime**/
 	/**已废弃,请使用header里的executeTime**/
-	optional int64			executeTime		=		1;
-	
+	int64			executeTime		=		1;
+
 	/**已废弃,Begin里不提供事务id**/
 	/**已废弃,Begin里不提供事务id**/
-	optional string			transactionId	=		2;
-	
+	string			transactionId	=		2;
+
 	/**预留扩展**/
 	/**预留扩展**/
-	repeated Pair			props			=		3;	
-	
+	repeated Pair			props			=		3;
+
 	/**执行的thread Id**/
 	/**执行的thread Id**/
-	optional int64			threadId		=		4;
+	int64			threadId		=		4;
 }
 }
 
 
 /**结束事务的一些信息**/
 /**结束事务的一些信息**/
 message TransactionEnd{
 message TransactionEnd{
-	
+
 	/**已废弃,请使用header里的executeTime**/
 	/**已废弃,请使用header里的executeTime**/
-	optional int64			executeTime		=		1;
-	
+	int64			executeTime		=		1;
+
 	/**事务号**/
 	/**事务号**/
-	optional string			transactionId	=		2;
-	
+	string			transactionId	=		2;
+
 	/**预留扩展**/
 	/**预留扩展**/
-	repeated Pair			props			=		3;	
+	repeated Pair			props			=		3;
 }
 }
 
 
 /**预留扩展**/
 /**预留扩展**/
 message Pair{
 message Pair{
-	optional string 		key				= 			1;	
-	optional string 		value			= 			2;	
+	string 		key				= 			1;
+	string 		value			= 			2;
 }
 }
 
 
 /**打散后的事件类型,主要用于标识事务的开始,变更数据,结束**/
 /**打散后的事件类型,主要用于标识事务的开始,变更数据,结束**/
 enum EntryType{
 enum EntryType{
+	ENTRYTYPECOMPATIBLEPROTO2 = 0;
 	TRANSACTIONBEGIN 		=		1;
 	TRANSACTIONBEGIN 		=		1;
 	ROWDATA					=		2;
 	ROWDATA					=		2;
 	TRANSACTIONEND			=		3;
 	TRANSACTIONEND			=		3;
@@ -179,6 +200,7 @@ enum EntryType{
 
 
 /** 事件类型 **/
 /** 事件类型 **/
 enum EventType {
 enum EventType {
+	EVENTTYPECOMPATIBLEPROTO2 = 0;
     INSERT 		= 		1;
     INSERT 		= 		1;
     UPDATE 		= 		2;
     UPDATE 		= 		2;
     DELETE 		= 		3;
     DELETE 		= 		3;
@@ -201,6 +223,7 @@ enum EventType {
 
 
 /**数据库类型**/
 /**数据库类型**/
 enum Type {
 enum Type {
+	TYPECOMPATIBLEPROTO2 = 0;
     ORACLE		= 		1;
     ORACLE		= 		1;
     MYSQL		= 		2;
     MYSQL		= 		2;
     PGSQL		= 		3;
     PGSQL		= 		3;

+ 3 - 1
server/src/test/java/com/alibaba/otter/canal/server/CanalServerWithNettyTest.java

@@ -66,6 +66,8 @@ public class CanalServerWithNettyTest {
 
 
     @Test
     @Test
     public void testAuth() {
     public void testAuth() {
+
+
         try {
         try {
             SocketChannel channel = SocketChannel.open();
             SocketChannel channel = SocketChannel.open();
             channel.connect(new InetSocketAddress("127.0.0.1", 1088));
             channel.connect(new InetSocketAddress("127.0.0.1", 1088));
@@ -80,7 +82,7 @@ public class CanalServerWithNettyTest {
             }
             }
             //
             //
             Handshake handshake = Handshake.parseFrom(p.getBody());
             Handshake handshake = Handshake.parseFrom(p.getBody());
-            System.out.println(handshake.getSupportedCompressionsList());
+            System.out.println(handshake.getSupportedCompressions());
             //
             //
             ClientAuth ca = ClientAuth.newBuilder()
             ClientAuth ca = ClientAuth.newBuilder()
                 .setUsername("")
                 .setUsername("")

部分文件因文件數量過多而無法顯示