瀏覽代碼

Merge pull request #1057 from rewerma/master

kafka canal-adapter 分布式开关的bug
rewerma 6 年之前
父節點
當前提交
8f1b66ab5a

+ 0 - 1
client-adapter/example/.gitignore

@@ -1 +0,0 @@
-/bin/

+ 7 - 2
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -35,7 +35,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
     }
 
     @Override
-    protected  void closeConnection(){
+    protected void closeConnection() {
         connector.stopRunning();
     }
 
@@ -45,6 +45,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
             ;
         while (running) {
             try {
+                syncSwitch.get(canalDestination);
                 logger.info("=============> Start to connect topic: {} <=============", this.topic);
                 connector.connect();
                 logger.info("=============> Start to subscribe topic: {} <=============", this.topic);
@@ -52,7 +53,11 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
                 logger.info("=============> Subscribe topic: {} succeed <=============", this.topic);
                 while (running) {
                     try {
-                        syncSwitch.get(canalDestination);
+                        Boolean status = syncSwitch.status(canalDestination);
+                        if (status != null && !status) {
+                            connector.disconnect();
+                            break;
+                        }
 
                         List<?> messages;
                         if (!flatMessage) {

+ 7 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -43,6 +43,7 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
             ;
         while (running) {
             try {
+                syncSwitch.get(canalDestination);
                 logger.info("=============> Start to connect topic: {} <=============", this.topic);
                 connector.connect();
                 logger.info("=============> Start to subscribe topic: {}<=============", this.topic);
@@ -50,6 +51,12 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
                 logger.info("=============> Subscribe topic: {} succeed<=============", this.topic);
                 while (running) {
                     try {
+                        Boolean status = syncSwitch.status(canalDestination);
+                        if (status != null && !status) {
+                            connector.disconnect();
+                            break;
+                        }
+
                         List<?> messages;
                         if (!flatMessage) {
                             messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS);

+ 0 - 24
client-launcher/src/main/resources/canal-client.yml

@@ -1,24 +0,0 @@
-#canalServerHost: 127.0.0.1:11111
-#zookeeperHosts: slave1:2181
-bootstrapServers: slave1:6667,slave2:6667 #or rocketmq nameservers:host1:9876;host2:9876
-flatMessage: false
-
-#canalInstances:
-#- instance: example
-#  adapterGroups:
-#  - outAdapters:
-#    - name: logger
-#    - name: hbase
-#      hosts: slave1:2181
-#      properties: {znodeParent: "/hbase-unsecure"}
-
-mqTopics:
-- mqMode: rocketmq
-  topic: example
-  groups:
-  - groupId: example
-    outAdapters:
-    - name: logger
-#    - name: hbase
-#      hosts: slave1:2181
-#      properties: {znodeParent: "/hbase-unsecure"}

+ 2 - 0
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -87,9 +87,11 @@ public class KafkaCanalConnector implements CanalMQConnector {
     public void disconnect() {
         if (kafkaConsumer != null) {
             kafkaConsumer.close();
+            kafkaConsumer = null;
         }
         if (kafkaConsumer2 != null) {
             kafkaConsumer2.close();
+            kafkaConsumer2 = null;
         }
 
         connected = false;