Browse Source

移除kafka client基于zk的HA机制

mcy 6 years ago
parent
commit
6b94f396ff

+ 2 - 2
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -20,9 +20,9 @@ public class CanalClientConfig {
 
 
     private String              bootstrapServers;
     private String              bootstrapServers;
 
 
-    private List<KafkaTopic>    kafkaTopics    = new ArrayList<>();
+    private List<KafkaTopic>    kafkaTopics;
 
 
-    private List<CanalInstance> canalInstances = new ArrayList<>();
+    private List<CanalInstance> canalInstances;
 
 
     public String getCanalServerHost() {
     public String getCanalServerHost() {
         return canalServerHost;
         return canalServerHost;

+ 3 - 2
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterKafkaWorker.java

@@ -91,7 +91,7 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
     private void process() {
     private void process() {
         while (!running)
         while (!running)
             ;
             ;
-        ExecutorService executor = Executors.newFixedThreadPool(1);
+        ExecutorService executor = Executors.newSingleThreadExecutor();
         final AtomicBoolean executing = new AtomicBoolean(true);
         final AtomicBoolean executing = new AtomicBoolean(true);
         while (running) {
         while (running) {
             try {
             try {
@@ -142,7 +142,8 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
                                 }
                                 }
                             });
                             });
 
 
-                            while (executing.get()) { // keeping kafka client active
+                            // 间隔一段时间ack一次, 防止因超时未响应切换到另外台客户端
+                            while (executing.get()) {
                                 connector.ack();
                                 connector.ack();
                                 Thread.sleep(500);
                                 Thread.sleep(500);
                             }
                             }

+ 47 - 42
client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterLoader.java

@@ -26,13 +26,13 @@ public class CanalAdapterLoader {
 
 
     private static final Logger                  logger            = LoggerFactory.getLogger(CanalAdapterLoader.class);
     private static final Logger                  logger            = LoggerFactory.getLogger(CanalAdapterLoader.class);
 
 
-    private CanalClientConfig canalClientConfig;
+    private CanalClientConfig                    canalClientConfig;
 
 
     private Map<String, CanalAdapterWorker>      canalWorkers      = new HashMap<>();
     private Map<String, CanalAdapterWorker>      canalWorkers      = new HashMap<>();
 
 
     private Map<String, CanalAdapterKafkaWorker> canalKafkaWorkers = new HashMap<>();
     private Map<String, CanalAdapterKafkaWorker> canalKafkaWorkers = new HashMap<>();
 
 
-    private ExtensionLoader<CanalOuterAdapter> loader;
+    private ExtensionLoader<CanalOuterAdapter>   loader;
 
 
     public CanalAdapterLoader(CanalClientConfig canalClientConfig){
     public CanalAdapterLoader(CanalClientConfig canalClientConfig){
         this.canalClientConfig = canalClientConfig;
         this.canalClientConfig = canalClientConfig;
@@ -43,7 +43,7 @@ public class CanalAdapterLoader {
      */
      */
     public void init() {
     public void init() {
         // canal instances 和 kafka topics 配置不能同时为空
         // canal instances 和 kafka topics 配置不能同时为空
-        if (canalClientConfig.getCanalInstances().isEmpty() && canalClientConfig.getKafkaTopics().isEmpty()) {
+        if (canalClientConfig.getCanalInstances() == null && canalClientConfig.getKafkaTopics() == null) {
             throw new RuntimeException("Blank config property: canalInstances or canalKafkaTopics");
             throw new RuntimeException("Blank config property: canalInstances or canalKafkaTopics");
         }
         }
 
 
@@ -58,56 +58,61 @@ public class CanalAdapterLoader {
         }
         }
         String zkHosts = this.canalClientConfig.getZookeeperHosts();
         String zkHosts = this.canalClientConfig.getZookeeperHosts();
 
 
-        if (zkHosts == null && sa == null) {
-            throw new RuntimeException("Blank config property: canalServerHost or zookeeperHosts");
-        }
+        // if (zkHosts == null && sa == null) {
+        // throw new RuntimeException("Blank config property: canalServerHost or
+        // zookeeperHosts");
+        // }
 
 
         // 初始化canal-client的适配器
         // 初始化canal-client的适配器
-        for (CanalClientConfig.CanalInstance instance : canalClientConfig.getCanalInstances()) {
-            List<List<CanalOuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
+        if (canalClientConfig.getCanalInstances() != null) {
+            for (CanalClientConfig.CanalInstance instance : canalClientConfig.getCanalInstances()) {
+                List<List<CanalOuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
 
 
-            for (CanalClientConfig.AdapterGroup connectorGroup : instance.getAdapterGroups()) {
-                List<CanalOuterAdapter> canalOutConnectors = new ArrayList<>();
-                for (CanalOuterAdapterConfiguration c : connectorGroup.getOutAdapters()) {
-                    loadConnector(c, canalOutConnectors);
+                for (CanalClientConfig.AdapterGroup connectorGroup : instance.getAdapterGroups()) {
+                    List<CanalOuterAdapter> canalOutConnectors = new ArrayList<>();
+                    for (CanalOuterAdapterConfiguration c : connectorGroup.getOutAdapters()) {
+                        loadConnector(c, canalOutConnectors);
+                    }
+                    canalOuterAdapterGroups.add(canalOutConnectors);
                 }
                 }
-                canalOuterAdapterGroups.add(canalOutConnectors);
-            }
-            CanalAdapterWorker worker;
-            if (zkHosts != null) {
-                worker = new CanalAdapterWorker(instance.getInstance(), zkHosts, canalOuterAdapterGroups);
-            } else {
-                worker = new CanalAdapterWorker(instance.getInstance(), sa, canalOuterAdapterGroups);
+                CanalAdapterWorker worker;
+                if (zkHosts != null) {
+                    worker = new CanalAdapterWorker(instance.getInstance(), zkHosts, canalOuterAdapterGroups);
+                } else {
+                    worker = new CanalAdapterWorker(instance.getInstance(), sa, canalOuterAdapterGroups);
+                }
+                canalWorkers.put(instance.getInstance(), worker);
+                worker.start();
+                logger.info("Start adapter for canal instance: {} succeed", instance.getInstance());
             }
             }
-            canalWorkers.put(instance.getInstance(), worker);
-            worker.start();
-            logger.info("Start adapter for canal instance: {} succeed", instance.getInstance());
         }
         }
 
 
         // 初始化canal-client-kafka的适配器
         // 初始化canal-client-kafka的适配器
-        for (CanalClientConfig.KafkaTopic kafkaTopic : canalClientConfig.getKafkaTopics()) {
-            for (CanalClientConfig.Group group : kafkaTopic.getGroups()) {
-                List<List<CanalOuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
+        if (canalClientConfig.getKafkaTopics() != null) {
+            for (CanalClientConfig.KafkaTopic kafkaTopic : canalClientConfig.getKafkaTopics()) {
+                for (CanalClientConfig.Group group : kafkaTopic.getGroups()) {
+                    List<List<CanalOuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
 
 
-                List<CanalOuterAdapter> canalOuterAdapters = new ArrayList<>();
+                    List<CanalOuterAdapter> canalOuterAdapters = new ArrayList<>();
 
 
-                for (CanalOuterAdapterConfiguration config : group.getOutAdapters()) {
-                    // for (CanalOuterAdapterConfiguration config : adaptor.getOutAdapters()) {
-                    loadConnector(config, canalOuterAdapters);
-                    // }
+                    for (CanalOuterAdapterConfiguration config : group.getOutAdapters()) {
+                        // for (CanalOuterAdapterConfiguration config : adaptor.getOutAdapters()) {
+                        loadConnector(config, canalOuterAdapters);
+                        // }
+                    }
+                    canalOuterAdapterGroups.add(canalOuterAdapters);
+
+                    // String zkServers = canalClientConfig.getZookeeperHosts();
+                    CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(zkHosts,
+                        canalClientConfig.getBootstrapServers(),
+                        kafkaTopic.getTopic(),
+                        group.getGroupId(),
+                        canalOuterAdapterGroups);
+                    canalKafkaWorkers.put(kafkaTopic.getTopic() + "-" + group.getGroupId(), canalKafkaWorker);
+                    canalKafkaWorker.start();
+                    logger.info("Start adapter for canal-client kafka topic: {} succeed",
+                        kafkaTopic.getTopic() + "-" + group.getGroupId());
                 }
                 }
-                canalOuterAdapterGroups.add(canalOuterAdapters);
-
-                String zkServers = canalClientConfig.getZookeeperHosts();
-                CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(zkServers,
-                    canalClientConfig.getBootstrapServers(),
-                    kafkaTopic.getTopic(),
-                    group.getGroupId(),
-                    canalOuterAdapterGroups);
-                canalKafkaWorkers.put(kafkaTopic.getTopic() + "-" + group.getGroupId(), canalKafkaWorker);
-                canalKafkaWorker.start();
-                logger.info("Start adapter for canal-client kafka topic: {} succeed",
-                    kafkaTopic.getTopic() + "-" + group.getGroupId());
             }
             }
         }
         }
     }
     }

+ 9 - 10
client-launcher/src/main/resources/canal-client.yml

@@ -1,6 +1,6 @@
 canalServerHost: 127.0.0.1:11111
 canalServerHost: 127.0.0.1:11111
-#zookeeperHosts: 127.0.0.1:2181
-#bootstrapServers: kafka1.mytest.com:9092,kafka2.mytest.com:9092
+#zookeeperHosts: slave1:2181
+#bootstrapServers: slave1:6667,slave2:6667
 
 
 canalInstances:
 canalInstances:
 - instance: example
 - instance: example
@@ -10,13 +10,12 @@ canalInstances:
     - name: hbase
     - name: hbase
       hosts: slave1:2181
       hosts: slave1:2181
       properties: {znodeParent: "/hbase-unsecure"}
       properties: {znodeParent: "/hbase-unsecure"}
-
 #kafkaTopics:
 #kafkaTopics:
-#- topic: devmysql4308
+#- topic: example
 #  groups:
 #  groups:
-#  - groupId: devmysql4308_es
-#    adapters:
-#    - name: es
-#      hosts:
-#      zkHosts:
-#      properties: {clusterName: es-service-test}
+#  - groupId: example_g1
+#    outAdapters:
+#    - name: logger
+#    - name: hbase
+#      hosts: slave1:2181
+#      properties: {znodeParent: "/hbase-unsecure"}

+ 39 - 41
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -10,10 +10,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
 
 import com.alibaba.otter.canal.client.kafka.running.ClientRunningData;
 import com.alibaba.otter.canal.client.kafka.running.ClientRunningData;
-import com.alibaba.otter.canal.client.kafka.running.ClientRunningListener;
-import com.alibaba.otter.canal.client.kafka.running.ClientRunningMonitor;
 import com.alibaba.otter.canal.common.utils.AddressUtils;
 import com.alibaba.otter.canal.common.utils.AddressUtils;
-import com.alibaba.otter.canal.common.utils.BooleanMutex;
 import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
 import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
@@ -27,16 +24,16 @@ import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 public class KafkaCanalConnector {
 public class KafkaCanalConnector {
 
 
     private KafkaConsumer<String, Message> kafkaConsumer;
     private KafkaConsumer<String, Message> kafkaConsumer;
-    private String topic;
-    private Integer partition;
-    private Properties properties;
-    private ClientRunningMonitor runningMonitor;  // 运行控制
-    private ZkClientx zkClientx;
-    private BooleanMutex mutex = new BooleanMutex(false);
-    private volatile boolean connected = false;
-    private volatile boolean running = false;
-
-    public KafkaCanalConnector(String zkServers, String servers, String topic, Integer partition, String groupId) {
+    private String                         topic;
+    private Integer                        partition;
+    private Properties                     properties;
+    // private ClientRunningMonitor runningMonitor; // 运行控制
+    // private BooleanMutex mutex = new BooleanMutex(false);
+    private ZkClientx                      zkClientx;
+    private volatile boolean               connected = false;
+    private volatile boolean               running   = false;
+
+    public KafkaCanalConnector(String zkServers, String servers, String topic, Integer partition, String groupId){
         this.topic = topic;
         this.topic = topic;
         this.partition = partition;
         this.partition = partition;
 
 
@@ -45,7 +42,7 @@ public class KafkaCanalConnector {
         properties.put("group.id", groupId);
         properties.put("group.id", groupId);
         properties.put("enable.auto.commit", false);
         properties.put("enable.auto.commit", false);
         properties.put("auto.commit.interval.ms", "1000");
         properties.put("auto.commit.interval.ms", "1000");
-        properties.put("auto.offset.reset", "latest"); //如果没有offset则从最后的offset开始读
+        properties.put("auto.offset.reset", "latest"); // 如果没有offset则从最后的offset开始读
         properties.put("request.timeout.ms", "40000"); // 必须大于session.timeout.ms的设置
         properties.put("request.timeout.ms", "40000"); // 必须大于session.timeout.ms的设置
         properties.put("session.timeout.ms", "30000"); // 默认为30秒
         properties.put("session.timeout.ms", "30000"); // 默认为30秒
         properties.put("max.poll.records", "1"); // 所以一次只取一条数据
         properties.put("max.poll.records", "1"); // 所以一次只取一条数据
@@ -59,19 +56,19 @@ public class KafkaCanalConnector {
             clientData.setGroupId(groupId);
             clientData.setGroupId(groupId);
             clientData.setAddress(AddressUtils.getHostIp());
             clientData.setAddress(AddressUtils.getHostIp());
 
 
-            runningMonitor = new ClientRunningMonitor();
-            runningMonitor.setTopic(topic);
-            runningMonitor.setZkClient(zkClientx);
-            runningMonitor.setClientData(clientData);
-            runningMonitor.setListener(new ClientRunningListener() {
-                public void processActiveEnter() {
-                    mutex.set(true);
-                }
-
-                public void processActiveExit() {
-                    mutex.set(false);
-                }
-            });
+            // runningMonitor = new ClientRunningMonitor();
+            // runningMonitor.setTopic(topic);
+            // runningMonitor.setZkClient(zkClientx);
+            // runningMonitor.setClientData(clientData);
+            // runningMonitor.setListener(new ClientRunningListener() {
+            // public void processActiveEnter() {
+            // mutex.set(true);
+            // }
+            //
+            // public void processActiveExit() {
+            // mutex.set(false);
+            // }
+            // });
         }
         }
 
 
     }
     }
@@ -96,11 +93,11 @@ public class KafkaCanalConnector {
             return;
             return;
         }
         }
 
 
-        if (runningMonitor != null) {
-            if (!runningMonitor.isStart()) {
-                runningMonitor.start();
-            }
-        }
+        // if (runningMonitor != null) {
+        // if (!runningMonitor.isStart()) {
+        // runningMonitor.start();
+        // }
+        // }
 
 
         connected = true;
         connected = true;
 
 
@@ -116,9 +113,9 @@ public class KafkaCanalConnector {
         kafkaConsumer.close();
         kafkaConsumer.close();
 
 
         connected = false;
         connected = false;
-        if (runningMonitor.isStart()) {
-            runningMonitor.stop();
-        }
+        // if (runningMonitor.isStart()) {
+        // runningMonitor.stop();
+        // }
     }
     }
 
 
     private void waitClientRunning() {
     private void waitClientRunning() {
@@ -129,12 +126,12 @@ public class KafkaCanalConnector {
                 }
                 }
 
 
                 running = true;
                 running = true;
-                mutex.get();// 阻塞等待
+                // mutex.get();// 阻塞等待
             } else {
             } else {
                 // 单机模式直接设置为running
                 // 单机模式直接设置为running
                 running = true;
                 running = true;
             }
             }
-        } catch (InterruptedException e) {
+        } catch (Exception e) {
             Thread.currentThread().interrupt();
             Thread.currentThread().interrupt();
             throw new CanalClientException(e);
             throw new CanalClientException(e);
         }
         }
@@ -142,7 +139,8 @@ public class KafkaCanalConnector {
 
 
     public boolean checkValid() {
     public boolean checkValid() {
         if (zkClientx != null) {
         if (zkClientx != null) {
-            return mutex.state();
+            // return mutex.state();
+            return true;
         } else {
         } else {
             return true;// 默认都放过
             return true;// 默认都放过
         }
         }
@@ -235,9 +233,9 @@ public class KafkaCanalConnector {
     public void stopRunning() {
     public void stopRunning() {
         if (running) {
         if (running) {
             running = false; // 设置为非running状态
             running = false; // 设置为非running状态
-            if (!mutex.state()) {
-                mutex.set(true); // 中断阻塞
-            }
+            // if (!mutex.state()) {
+            // mutex.set(true); // 中断阻塞
+            // }
         }
         }
     }
     }
 }
 }