Browse Source

增加adapter相关参数: retry timeout batchSize

mcy 6 years ago
parent
commit
e4e23c9ecd
18 changed files with 513 additions and 221 deletions
  1. 34 4
      client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java
  2. 7 3
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java
  3. 6 4
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java
  4. 271 0
      client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java
  5. 0 7
      client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/SqlParseTest.java
  6. 49 4
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java
  7. 21 33
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java
  8. 12 4
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java
  9. 19 30
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java
  10. 63 33
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java
  11. 17 14
      client-adapter/launcher/src/main/resources/application.yml
  12. 3 3
      client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java
  13. 6 2
      client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java
  14. 0 50
      client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnectors.java
  15. 0 24
      client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnectors.java
  16. 3 3
      client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaClientExample.java
  17. 1 1
      client/src/test/java/com/alibaba/otter/canal/client/running/kafka/KafkaClientRunningTest.java
  18. 1 2
      client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/CanalRocketMQClientExample.java

+ 34 - 4
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/CanalClientConfig.java

@@ -19,6 +19,12 @@ public class CanalClientConfig {
 
     private Boolean             flatMessage = true; // 是否已flatMessage模式传输, 只适用于mq模式
 
+    private Integer             batchSize;          // 批大小
+
+    private Integer             retry;              // 重试次数
+
+    private Long                timeout;            // 消费超时时间
+
     private List<MQTopic>       mqTopics;           // mq topic 列表
 
     private List<CanalInstance> canalInstances;     // tcp 模式下 canal 实例列表, 与mq模式不能共存!!
@@ -63,6 +69,30 @@ public class CanalClientConfig {
         this.flatMessage = flatMessage;
     }
 
+    public Integer getBatchSize() {
+        return batchSize;
+    }
+
+    public void setBatchSize(Integer batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    public Integer getRetry() {
+        return retry;
+    }
+
+    public void setRetry(Integer retry) {
+        this.retry = retry;
+    }
+
+    public Long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(Long timeout) {
+        this.timeout = timeout;
+    }
+
     public List<CanalInstance> getCanalInstances() {
         return canalInstances;
     }
@@ -73,9 +103,9 @@ public class CanalClientConfig {
 
     public static class CanalInstance {
 
-        private String             instance;      // 实例名
+        private String      instance; // 实例名
 
-        private List<Group> groups;  // 适配器分组列表
+        private List<Group> groups;   // 适配器分组列表
 
         public String getInstance() {
             return instance;
@@ -112,9 +142,9 @@ public class CanalClientConfig {
 
     public static class MQTopic {
 
-        private String      mqMode;                     // mq模式 kafka or rocketMQ
+        private String        mqMode;                     // mq模式 kafka or rocketMQ
 
-        private String      topic;                      // topic名
+        private String        topic;                      // topic名
 
         private List<MQGroup> groups = new ArrayList<>(); // 分组列表
 

+ 7 - 3
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -3,7 +3,6 @@ package com.alibaba.otter.canal.client.adapter.es;
 import java.util.List;
 import java.util.Map;
 
-import com.alibaba.otter.canal.client.adapter.es.service.ESSyncService;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.transport.client.PreBuiltTransportClient;
@@ -13,6 +12,8 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
 import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfigLoader;
+import com.alibaba.otter.canal.client.adapter.es.service.ESSyncService;
+import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
 import com.alibaba.otter.canal.client.adapter.support.Dml;
 import com.alibaba.otter.canal.client.adapter.support.EtlResult;
 import com.alibaba.otter.canal.client.adapter.support.OuterAdapterConfig;
@@ -50,7 +51,8 @@ public class ESAdapter implements OuterAdapter {
             properties.forEach(settingBuilder::put);
             Settings settings = settingBuilder.build();
             transportClient = new PreBuiltTransportClient(settings);
-            esSyncService = new ESSyncService(transportClient);
+            ESTemplate esTemplate = new ESTemplate(transportClient);
+            esSyncService = new ESSyncService(esTemplate);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -72,7 +74,9 @@ public class ESAdapter implements OuterAdapter {
 
     @Override
     public void destroy() {
-
+        if (transportClient != null) {
+            transportClient.close();
+        }
     }
 
     @Override

+ 6 - 4
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/service/ESSyncService.java

@@ -1,15 +1,17 @@
 package com.alibaba.otter.canal.client.adapter.es.service;
 
+import com.alibaba.otter.canal.client.adapter.es.support.ESTemplate;
 import org.elasticsearch.client.transport.TransportClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ESSyncService {
-    private Logger logger = LoggerFactory.getLogger(this.getClass());
 
-    private TransportClient transportClient;
+    private static Logger logger = LoggerFactory.getLogger(ESSyncService.class);
 
-    public ESSyncService(TransportClient transportClient) {
-        this.transportClient = transportClient;
+    private ESTemplate    esTemplate;
+
+    public ESSyncService(ESTemplate esTemplate){
+        this.esTemplate = esTemplate;
     }
 }

+ 271 - 0
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -0,0 +1,271 @@
+package com.alibaba.otter.canal.client.adapter.es.support;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
+import org.elasticsearch.index.reindex.UpdateByQueryAction;
+import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.script.Script;
+import org.elasticsearch.script.ScriptType;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
+
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig;
+import com.alibaba.otter.canal.client.adapter.es.config.ESSyncConfig.ESMapping;
+
+public class ESTemplate {
+
+    private static final Logger logger         = LoggerFactory.getLogger(ESTemplate.class);
+
+    private static final int    MAX_BATCH_SIZE = 1000;
+
+    private TransportClient     transportClient;
+
+    public ESTemplate(TransportClient transportClient){
+        this.transportClient = transportClient;
+    }
+
+    public void setTransportClient(TransportClient transportClient) {
+        this.transportClient = transportClient;
+    }
+
+    /**
+     * 插入数据
+     * 
+     * @param esSyncConfig
+     * @param pkVal
+     * @param esFieldData
+     * @return
+     */
+    public boolean insert(ESSyncConfig esSyncConfig, Object pkVal, Map<String, Object> esFieldData) {
+        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
+        ESMapping mapping = esSyncConfig.getEsMapping();
+        if (mapping.get_id() != null) {
+            bulkRequestBuilder
+                .add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type(), pkVal.toString())
+                    .setSource(esFieldData));
+        } else {
+            SearchResponse response = transportClient.prepareSearch(mapping.get_index())
+                .setTypes(mapping.get_type())
+                .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
+                .setSize(MAX_BATCH_SIZE)
+                .get();
+            for (SearchHit hit : response.getHits()) {
+                bulkRequestBuilder
+                    .add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(), hit.getId()));
+            }
+            bulkRequestBuilder
+                .add(transportClient.prepareIndex(mapping.get_index(), mapping.get_type()).setSource(esFieldData));
+        }
+        return commitBulkRequest(bulkRequestBuilder);
+    }
+
+    /**
+     * 根据主键更新数据
+     * 
+     * @param esSyncConfig
+     * @param pkVal
+     * @param esFieldData
+     * @return
+     */
+    public boolean update(ESSyncConfig esSyncConfig, Object pkVal, Map<String, Object> esFieldData) {
+        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
+        append4Update(bulkRequestBuilder, esSyncConfig, pkVal, esFieldData);
+        return commitBulkRequest(bulkRequestBuilder);
+    }
+
+    /**
+     * 结合 addBulkRequest4Update 批量更新
+     * 
+     * @param consumer
+     * @return
+     */
+    public boolean updateBatch(Consumer<BulkRequestBuilder> consumer) {
+        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
+        consumer.accept(bulkRequestBuilder);
+        return commitBulkRequest(bulkRequestBuilder);
+    }
+
+    public void append4Update(BulkRequestBuilder bulkRequestBuilder, ESSyncConfig esSyncConfig, Object pkVal,
+                                      Map<String, Object> esFieldData) {
+        ESMapping mapping = esSyncConfig.getEsMapping();
+        if (mapping.get_id() != null) {
+            bulkRequestBuilder
+                    .add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
+                            .setDoc(esFieldData));
+        } else {
+            SearchResponse response = transportClient.prepareSearch(mapping.get_index())
+                    .setTypes(mapping.get_type())
+                    .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
+                    .setSize(MAX_BATCH_SIZE)
+                    .get();
+            for (SearchHit hit : response.getHits()) { // 理论上只有一条
+                bulkRequestBuilder
+                        .add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), hit.getId())
+                                .setDoc(esFieldData));
+            }
+        }
+    }
+
+    /**
+     * update by query
+     * 
+     * @param esSyncConfig
+     * @param queryBuilder
+     * @param esFieldData
+     * @return
+     */
+    private boolean updateByQuery(ESSyncConfig esSyncConfig, QueryBuilder queryBuilder,
+                                  Map<String, Object> esFieldData) {
+        return updateByQuery(esSyncConfig, queryBuilder, esFieldData, 0);
+    }
+
+    private boolean updateByQuery(ESSyncConfig esSyncConfig, QueryBuilder queryBuilder, Map<String, Object> esFieldData,
+                                  int counter) {
+        if (CollectionUtils.isEmpty(esFieldData)) {
+            return true;
+        }
+
+        ESMapping mapping = esSyncConfig.getEsMapping();
+
+        StringBuilder sb = new StringBuilder();
+        esFieldData.forEach((key, value) -> {
+            if (value instanceof Map) {
+                HashMap mapValue = (HashMap) value;
+                if (mapValue.containsKey("lon") && mapValue.containsKey("lat") && mapValue.size() == 2) {
+                    sb.append("ctx._source")
+                        .append("['")
+                        .append(key)
+                        .append("']")
+                        .append(" = [")
+                        .append(mapValue.get("lon"))
+                        .append(", ")
+                        .append(mapValue.get("lat"))
+                        .append("];");
+                } else {
+                    logger.warn("Unsupported object type for script_update");
+                }
+            } else if (value instanceof String) {
+                sb.append("ctx._source")
+                    .append("['")
+                    .append(key)
+                    .append("']")
+                    .append(" = '")
+                    .append(value)
+                    .append("';");
+            } else {
+                sb.append("ctx._source").append("['").append(key).append("']").append(" = ").append(value).append(";");
+            }
+        });
+        String scriptLine = sb.toString();
+        if (logger.isDebugEnabled()) {
+            logger.debug(scriptLine);
+        }
+
+        UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(transportClient);
+        updateByQuery.source(mapping.get_index())
+            .abortOnVersionConflict(false)
+            .filter(queryBuilder)
+            .script(new Script(ScriptType.INLINE, "painless", scriptLine, Collections.emptyMap()));
+
+        BulkByScrollResponse response = updateByQuery.get();
+        if (logger.isDebugEnabled()) {
+            logger.debug("updateByQuery response: {}", response.getStatus());
+        }
+        if (!CollectionUtils.isEmpty(response.getSearchFailures())) {
+            logger.error("script update_for_search has search error: " + response.getBulkFailures());
+            return false;
+        }
+
+        if (!CollectionUtils.isEmpty(response.getBulkFailures())) {
+            logger.error("script update_for_search has update error: " + response.getBulkFailures());
+            return false;
+        }
+
+        if (response.getStatus().getVersionConflicts() > 0) {
+            if (counter >= 3) {
+                logger.error("第 {} 次执行updateByQuery, 依旧存在版本冲突,不再继续重试。", counter);
+                return false;
+            }
+            logger.warn("本次updateByQuery存在版本冲突,准备重新执行...");
+            try {
+                TimeUnit.SECONDS.sleep(1);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+            return updateByQuery(esSyncConfig, queryBuilder, esFieldData, ++counter);
+        }
+
+        return true;
+    }
+
+    /**
+     * 通过主键删除数据
+     * 
+     * @param esSyncConfig
+     * @param pkVal
+     * @return
+     */
+    public boolean delete(ESSyncConfig esSyncConfig, Object pkVal) {
+        BulkRequestBuilder bulkRequestBuilder = transportClient.prepareBulk();
+        ESMapping mapping = esSyncConfig.getEsMapping();
+        if (mapping.get_id() != null) {
+            bulkRequestBuilder
+                .add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(), pkVal.toString()));
+        } else {
+            SearchResponse response = transportClient.prepareSearch(mapping.get_index())
+                .setTypes(mapping.get_type())
+                .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
+                .setSize(MAX_BATCH_SIZE)
+                .get();
+            for (SearchHit hit : response.getHits()) {
+                bulkRequestBuilder
+                    .add(transportClient.prepareDelete(mapping.get_index(), mapping.get_type(), hit.getId()));
+            }
+        }
+        return commitBulkRequest(bulkRequestBuilder);
+    }
+
+    /**
+     * 批量提交
+     * 
+     * @param bulkRequestBuilder
+     * @return
+     */
+    private static boolean commitBulkRequest(BulkRequestBuilder bulkRequestBuilder) {
+        if (bulkRequestBuilder.numberOfActions() > 0) {
+            BulkResponse response = bulkRequestBuilder.execute().actionGet();
+            if (response.hasFailures()) {
+                for (BulkItemResponse itemResponse : response.getItems()) {
+                    if (!itemResponse.isFailed()) {
+                        continue;
+                    }
+
+                    if (itemResponse.getFailure().getStatus() == RestStatus.NOT_FOUND) {
+                        logger.warn(itemResponse.getFailureMessage());
+                    } else {
+                        logger.error("ES 同步数据错误 {}", itemResponse.getFailureMessage());
+                    }
+                }
+            }
+
+            return !response.hasFailures();
+        }
+
+        return true;
+    }
+}

+ 0 - 7
client-adapter/elasticsearch/src/test/java/com/alibaba/otter/canal/client/adapter/es/test/SqlParseTest.java

@@ -14,13 +14,6 @@ public class SqlParseTest {
 
     @Test
     public void parseTest() {
-        // String sql = "select a.id,d.user_id2, concat(name,'_', a.nick) as name,
-        // b.name as roleNme, d.name as typeName,concat(d.label,'_') as label,"
-        // + " c.name as refName from user a left join role b on b.user_id=a.id "
-        // + "left join type d on d.user_id=a.id and d.user_id2=a.p_id "
-        // + "left join ( select ref_id,group_concat(name,',') as name from role group
-        // by ref_id ) c on c.ref_id=a.id "
-        // + "where a.id=1";
         String sql = "select a.id, concat(a.name,'_test') as name, a.role_id, b.name as role_name, c.labels from user a "
                      + "left join role b on a.role_id=b.id "
                      + "left join (select user_id, group_concat(label,',') as labels from user_label "

+ 49 - 4
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -2,11 +2,10 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
 
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.client.CanalMQConnector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -117,6 +116,52 @@ public abstract class AbstractCanalAdapterWorker {
         });
     }
 
+    protected void mqWriteOutData(int retry, long timeout, boolean flatMessage, CanalMQConnector connector,
+                        ExecutorService workerExecutor) {
+        for (int i = 0; i < retry; i++) {
+            try {
+                List<?> messages;
+                if (!flatMessage) {
+                    messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS);
+                } else {
+                    messages = connector.getFlatListWithoutAck(100L, TimeUnit.MILLISECONDS);
+                }
+                if (messages != null) {
+                    Future<Boolean> future = workerExecutor.submit(() -> {
+                        for (final Object message : messages) {
+                            if (message instanceof FlatMessage) {
+                                writeOut((FlatMessage) message);
+                            } else {
+                                writeOut((Message) message);
+                            }
+                        }
+                        return true;
+                    });
+
+                    try {
+                        future.get(timeout, TimeUnit.MILLISECONDS);
+                    } catch (Exception e) {
+                        future.cancel(true);
+                        throw e;
+                    }
+                }
+                connector.ack();
+                break;
+            } catch (Throwable e) {
+                if (i == retry - 1) {
+                    connector.ack();
+                }
+
+                logger.error(e.getMessage(), e);
+                try {
+                    TimeUnit.SECONDS.sleep(1L);
+                } catch (InterruptedException e1) {
+                    // ignore
+                }
+            }
+        }
+    }
+
     public void start() {
         if (!running) {
             thread = new Thread(this::process);

+ 21 - 33
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -1,14 +1,14 @@
 package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.common.errors.WakeupException;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
-import com.alibaba.otter.canal.client.kafka.KafkaCanalConnectors;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 
@@ -20,17 +20,24 @@ import com.alibaba.otter.canal.protocol.Message;
  */
 public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
 
+    private CanalClientConfig   canalClientConfig;
     private KafkaCanalConnector connector;
     private String              topic;
     private boolean             flatMessage;
 
-    public CanalAdapterKafkaWorker(String bootstrapServers, String topic, String groupId,
-                                   List<List<OuterAdapter>> canalOuterAdapters, boolean flatMessage){
+    public CanalAdapterKafkaWorker(CanalClientConfig canalClientConfig, String bootstrapServers, String topic,
+                                   String groupId, List<List<OuterAdapter>> canalOuterAdapters, boolean flatMessage){
         super(canalOuterAdapters);
+        this.canalClientConfig = canalClientConfig;
         this.topic = topic;
         this.canalDestination = topic;
         this.flatMessage = flatMessage;
-        this.connector = KafkaCanalConnectors.newKafkaConnector(bootstrapServers, topic, null, groupId, flatMessage);
+        this.connector = new KafkaCanalConnector(bootstrapServers,
+            topic,
+            null,
+            groupId,
+            canalClientConfig.getBatchSize(),
+            flatMessage);
         // connector.setSessionTimeout(1L, TimeUnit.MINUTES);
     }
 
@@ -38,6 +45,10 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
     protected void process() {
         while (!running)
             ;
+        ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
+        int retry = canalClientConfig.getRetry() == null ? 1 : canalClientConfig.getRetry();
+        long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
+
         while (running) {
             try {
                 syncSwitch.get(canalDestination);
@@ -47,35 +58,12 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
                 connector.subscribe();
                 logger.info("=============> Subscribe topic: {} succeed <=============", this.topic);
                 while (running) {
-                    try {
-                        Boolean status = syncSwitch.status(canalDestination);
-                        if (status != null && !status) {
-                            connector.disconnect();
-                            break;
-                        }
-
-                        List<?> messages;
-                        if (!flatMessage) {
-                            messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS);
-                        } else {
-                            messages = connector.getFlatListWithoutAck(100L, TimeUnit.MILLISECONDS);
-                        }
-                        if (messages != null) {
-                            for (final Object message : messages) {
-                                if (message instanceof FlatMessage) {
-                                    writeOut((FlatMessage) message);
-                                } else {
-                                    writeOut((Message) message);
-                                }
-                            }
-                        }
-                        connector.ack();
-                    } catch (CommitFailedException e) {
-                        logger.warn(e.getMessage());
-                    } catch (Throwable e) {
-                        logger.error(e.getMessage(), e);
-                        TimeUnit.SECONDS.sleep(1L);
+                    Boolean status = syncSwitch.status(canalDestination);
+                    if (status != null && !status) {
+                        connector.disconnect();
+                        break;
                     }
+                    mqWriteOutData(retry, timeout, flatMessage, connector, workerExecutor);
                 }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);

+ 12 - 4
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -67,9 +67,15 @@ public class CanalAdapterLoader {
                 }
                 CanalAdapterWorker worker;
                 if (sa != null) {
-                    worker = new CanalAdapterWorker(instance.getInstance(), sa, canalOuterAdapterGroups);
+                    worker = new CanalAdapterWorker(canalClientConfig,
+                        instance.getInstance(),
+                        sa,
+                        canalOuterAdapterGroups);
                 } else if (zkHosts != null) {
-                    worker = new CanalAdapterWorker(instance.getInstance(), zkHosts, canalOuterAdapterGroups);
+                    worker = new CanalAdapterWorker(canalClientConfig,
+                        instance.getInstance(),
+                        zkHosts,
+                        canalOuterAdapterGroups);
                 } else {
                     throw new RuntimeException("No canal server connector found");
                 }
@@ -90,7 +96,8 @@ public class CanalAdapterLoader {
                     }
                     canalOuterAdapterGroups.add(canalOuterAdapters);
                     if (StringUtils.isBlank(topic.getMqMode()) || "rocketmq".equalsIgnoreCase(topic.getMqMode())) {
-                        CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig.getBootstrapServers(),
+                        CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig,
+                            canalClientConfig.getBootstrapServers(),
                             topic.getTopic(),
                             group.getGroupId(),
                             canalOuterAdapterGroups,
@@ -98,7 +105,8 @@ public class CanalAdapterLoader {
                         canalMQWorker.put(topic.getTopic() + "-rocketmq-" + group.getGroupId(), rocketMQWorker);
                         rocketMQWorker.start();
                     } else if ("kafka".equalsIgnoreCase(topic.getMqMode())) {
-                        CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(canalClientConfig.getBootstrapServers(),
+                        CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(canalClientConfig,
+                            canalClientConfig.getBootstrapServers(),
                             topic.getTopic(),
                             group.getGroupId(),
                             canalOuterAdapterGroups,

+ 19 - 30
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -1,13 +1,16 @@
 package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 import org.apache.kafka.common.errors.WakeupException;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
 import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
-import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectors;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 
@@ -18,17 +21,19 @@ import com.alibaba.otter.canal.protocol.Message;
  */
 public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
 
+    private CanalClientConfig      canalClientConfig;
     private RocketMQCanalConnector connector;
     private String                 topic;
     private boolean                flatMessage;
 
-    public CanalAdapterRocketMQWorker(String nameServers, String topic, String groupId,
-                                      List<List<OuterAdapter>> canalOuterAdapters, boolean flatMessage){
+    public CanalAdapterRocketMQWorker(CanalClientConfig canalClientConfig, String nameServers, String topic,
+                                      String groupId, List<List<OuterAdapter>> canalOuterAdapters, boolean flatMessage){
         super(canalOuterAdapters);
+        this.canalClientConfig = canalClientConfig;
         this.topic = topic;
         this.flatMessage = flatMessage;
         this.canalDestination = topic;
-        this.connector = RocketMQCanalConnectors.newRocketMQConnector(nameServers, topic, groupId, flatMessage);
+        this.connector = new RocketMQCanalConnector(nameServers, topic, groupId, flatMessage);
         logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
     }
 
@@ -36,6 +41,11 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
     protected void process() {
         while (!running)
             ;
+
+        ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
+        int retry = canalClientConfig.getRetry() == null ? 1 : canalClientConfig.getRetry();
+        long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
+
         while (running) {
             try {
                 syncSwitch.get(canalDestination);
@@ -45,33 +55,12 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
                 connector.subscribe();
                 logger.info("=============> Subscribe topic: {} succeed<=============", this.topic);
                 while (running) {
-                    try {
-                        Boolean status = syncSwitch.status(canalDestination);
-                        if (status != null && !status) {
-                            connector.disconnect();
-                            break;
-                        }
-
-                        List<?> messages;
-                        if (!flatMessage) {
-                            messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS);
-                        } else {
-                            messages = connector.getFlatListWithoutAck(100L, TimeUnit.MILLISECONDS);
-                        }
-                        if (messages != null) {
-                            for (final Object message : messages) {
-                                if (message instanceof FlatMessage) {
-                                    writeOut((FlatMessage) message);
-                                } else {
-                                    writeOut((Message) message);
-                                }
-                            }
-                        }
-                        connector.ack();
-                    } catch (Throwable e) {
-                        logger.error(e.getMessage(), e);
-                        TimeUnit.SECONDS.sleep(1L);
+                    Boolean status = syncSwitch.status(canalDestination);
+                    if (status != null && !status) {
+                        connector.disconnect();
+                        break;
                     }
+                    mqWriteOutData(retry, timeout, flatMessage, connector, workerExecutor);
                 }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);

+ 63 - 33
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

@@ -2,12 +2,12 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.net.SocketAddress;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
 
 import com.alibaba.otter.canal.client.CanalConnector;
 import com.alibaba.otter.canal.client.CanalConnectors;
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
+import com.alibaba.otter.canal.client.adapter.support.CanalClientConfig;
 import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
 import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
 import com.alibaba.otter.canal.protocol.Message;
@@ -20,10 +20,12 @@ import com.alibaba.otter.canal.protocol.Message;
  */
 public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
 
-    private static final int BATCH_SIZE = 50;
-    private static final int SO_TIMEOUT = 0;
+    private static final int  BATCH_SIZE = 50;
+    private static final int  SO_TIMEOUT = 0;
 
-    private CanalConnector   connector;
+    private CanalConnector    connector;
+
+    private CanalClientConfig canalClientConfig;
 
     /**
      * 单台client适配器worker的构造方法
@@ -32,9 +34,10 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
      * @param address canal-server地址
      * @param canalOuterAdapters 外部适配器组
      */
-    public CanalAdapterWorker(String canalDestination, SocketAddress address,
+    public CanalAdapterWorker(CanalClientConfig canalClientConfig, String canalDestination, SocketAddress address,
                               List<List<OuterAdapter>> canalOuterAdapters){
         super(canalOuterAdapters);
+        this.canalClientConfig = canalClientConfig;
         this.canalDestination = canalDestination;
         connector = CanalConnectors.newSingleConnector(address, canalDestination, "", "");
     }
@@ -46,10 +49,11 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
      * @param zookeeperHosts zookeeper地址
      * @param canalOuterAdapters 外部适配器组
      */
-    public CanalAdapterWorker(String canalDestination, String zookeeperHosts,
+    public CanalAdapterWorker(CanalClientConfig canalClientConfig, String canalDestination, String zookeeperHosts,
                               List<List<OuterAdapter>> canalOuterAdapters){
         super(canalOuterAdapters);
         this.canalDestination = canalDestination;
+        this.canalClientConfig = canalClientConfig;
         connector = CanalConnectors.newClusterConnector(zookeeperHosts, canalDestination, "", "");
         ((ClusterCanalConnector) connector).setSoTimeout(SO_TIMEOUT);
     }
@@ -58,6 +62,15 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
     protected void process() {
         while (!running)
             ; // waiting until running == true
+
+        ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
+        int retry = canalClientConfig.getRetry() == null ? 1 : canalClientConfig.getRetry();
+        long timeout = canalClientConfig.getTimeout() == null ? 300000 : canalClientConfig.getTimeout(); // 默认超时5分钟
+        Integer batchSize = canalClientConfig.getBatchSize();
+        if (batchSize == null) {
+            batchSize = BATCH_SIZE;
+        }
+
         while (running) {
             try {
                 syncSwitch.get(canalDestination);
@@ -77,35 +90,50 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
                         break;
                     }
 
-                    // server配置canal.instance.network.soTimeout(默认: 30s)
-                    // 范围内未与server交互,server将关闭本次socket连接
-                    Message message = connector.getWithoutAck(BATCH_SIZE); // 获取指定数量的数据
-                    long batchId = message.getId();
-                    try {
-                        int size = message.getEntries().size();
-                        if (batchId == -1 || size == 0) {
-                            Thread.sleep(1000);
-                        } else {
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("destination: {} batchId: {} batchSize: {} ",
-                                    this.canalDestination,
-                                    batchId,
-                                    size);
+                    for (int i = 0; i < retry; i++) {
+                        Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
+                        long batchId = message.getId();
+                        try {
+                            int size = message.getEntries().size();
+                            if (batchId == -1 || size == 0) {
+                                Thread.sleep(500);
+                            } else {
+                                Future<Boolean> future = workerExecutor.submit(() -> {
+                                    if (logger.isDebugEnabled()) {
+                                        logger.debug("destination: {} batchId: {} batchSize: {} ",
+                                            canalDestination,
+                                            batchId,
+                                            size);
+                                    }
+                                    long begin = System.currentTimeMillis();
+                                    writeOut(message);
+                                    if (logger.isDebugEnabled()) {
+                                        logger.debug("destination: {} batchId: {} elapsed time: {} ms",
+                                            canalDestination,
+                                            batchId,
+                                            System.currentTimeMillis() - begin);
+                                    }
+                                    return true;
+                                });
+
+                                try {
+                                    future.get(timeout, TimeUnit.MILLISECONDS);
+                                } catch (Exception e) {
+                                    future.cancel(true);
+                                    throw e;
+                                }
                             }
-                            long begin = System.currentTimeMillis();
-                            writeOut(message);
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("destination: {} batchId: {} elapsed time: {} ms",
-                                    this.canalDestination,
-                                    batchId,
-                                    System.currentTimeMillis() - begin);
+                            connector.ack(batchId); // 提交确认
+                            break;
+                        } catch (Exception e) {
+                            if (i != retry - 1) {
+                                connector.rollback(batchId); // 处理失败, 回滚数据
+                            } else {
+                                connector.ack(batchId);
                             }
+                            logger.error("sync error!", e);
+                            Thread.sleep(500);
                         }
-                        connector.ack(batchId); // 提交确认
-                    } catch (Exception e) {
-                        connector.rollback(batchId); // 处理失败, 回滚数据
-                        logger.error("sync error!", e);
-                        Thread.sleep(500);
                     }
                 }
 
@@ -124,6 +152,8 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
                 }
             }
         }
+
+        workerExecutor.shutdown();
     }
 
     @Override

+ 17 - 14
client-adapter/launcher/src/main/resources/application.yml

@@ -15,27 +15,30 @@ hbase.zookeeper.property.clientPort: 2181
 hbase.zookeeper.znode.parent: /hbase
 
 canal.conf:
-  canalServerHost: 127.0.0.1:11111
+#  canalServerHost: 127.0.0.1:11111
 #  zookeeperHosts: slave1:2181
-#  bootstrapServers: slave1:6667 #or rocketmq nameservers:host1:9876;host2:9876
+  bootstrapServers: slave1:6667 #or rocketmq nameservers:host1:9876;host2:9876
   flatMessage: true
-  canalInstances:
-  - instance: example
-    groups:
-    - outAdapters:
-      - name: logger
+  retry: 3
+  timeout: 20000
+  batchSize: 50
+#  canalInstances:
+#  - instance: example
+#    groups:
+#    - outAdapters:
+#      - name: logger
 #      - name: hbase
 #        properties:
 #          hbase.zookeeper.quorum: ${hbase.zookeeper.quorum}
 #          hbase.zookeeper.property.clientPort: ${hbase.zookeeper.property.clientPort}
 #          zookeeper.znode.parent: ${hbase.zookeeper.znode.parent}
-#  mqTopics:
-#  - mqMode: kafka
-#    topic: example
-#    groups:
-#    - groupId: g2
-#      outAdapters:
-#      - name: logger
+  mqTopics:
+  - mqMode: kafka
+    topic: example
+    groups:
+    - groupId: g2
+      outAdapters:
+      - name: logger
 #  mqTopics:
 #  - mqMode: rocketmq
 #    topic: example

+ 3 - 3
client/src/main/java/com/alibaba/otter/canal/client/impl/SimpleCanalConnector.java

@@ -133,7 +133,7 @@ public class SimpleCanalConnector implements CanalConnector {
                 runningMonitor.stop();
             }
         } else {
-            doDisconnnect();
+            doDisconnect();
         }
     }
 
@@ -190,7 +190,7 @@ public class SimpleCanalConnector implements CanalConnector {
         }
     }
 
-    private void doDisconnnect() throws CanalClientException {
+    private void doDisconnect() throws CanalClientException {
         if (readableChannel != null) {
             quietlyClose(readableChannel);
             readableChannel = null;
@@ -434,7 +434,7 @@ public class SimpleCanalConnector implements CanalConnector {
 
                 public void processActiveExit() {
                     mutex.set(false);
-                    doDisconnnect();
+                    doDisconnect();
                 }
 
             });

+ 6 - 2
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -42,7 +42,8 @@ public class KafkaCanalConnector implements CanalMQConnector {
     private volatile boolean               running   = false;
     private boolean                        flatMessage;
 
-    public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId, boolean flatMessage){
+    public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId, Integer batchSize,
+                               boolean flatMessage){
         this.topic = topic;
         this.partition = partition;
         this.flatMessage = flatMessage;
@@ -55,7 +56,10 @@ public class KafkaCanalConnector implements CanalMQConnector {
         properties.put("auto.offset.reset", "latest"); // 如果没有offset则从最后的offset开始读
         properties.put("request.timeout.ms", "40000"); // 必须大于session.timeout.ms的设置
         properties.put("session.timeout.ms", "30000"); // 默认为30秒
-        properties.put("max.poll.records", "100");
+        if (batchSize == null) {
+            batchSize = 100;
+        }
+        properties.put("max.poll.records", batchSize.toString());
         properties.put("key.deserializer", StringDeserializer.class.getName());
         if (!flatMessage) {
             properties.put("value.deserializer", MessageDeserializer.class.getName());

+ 0 - 50
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnectors.java

@@ -1,50 +0,0 @@
-package com.alibaba.otter.canal.client.kafka;
-
-/**
- * canal kafka connectors创建工具类
- *
- * @author machengyuan @ 2018-6-12
- * @version 1.0.0
- */
-public class KafkaCanalConnectors {
-
-    /**
-     * 创建kafka客户端链接,独立运行不注册zk信息
-     *
-     * @param servers
-     * @param topic
-     * @param partition
-     * @param groupId
-     * @return
-     */
-    public static KafkaCanalConnector newKafkaConnector(String servers, String topic, Integer partition, String groupId) {
-        return new KafkaCanalConnector(servers, topic, partition, groupId, false);
-    }
-
-    /**
-     * 创建kafka客户端链接,独立运行不注册zk信息
-     *
-     * @param servers
-     * @param topic
-     * @param groupId
-     * @return
-     */
-    public static KafkaCanalConnector newKafkaConnector(String servers, String topic, String groupId) {
-        return new KafkaCanalConnector(servers, topic, null, groupId, false);
-    }
-
-    /**
-     * 创建kafka客户端链接
-     *
-     * @param servers
-     * @param topic
-     * @param partition
-     * @param groupId
-     * @param flatMessage
-     * @return
-     */
-    public static KafkaCanalConnector newKafkaConnector(String servers, String topic, Integer partition,
-                                                        String groupId, boolean flatMessage) {
-        return new KafkaCanalConnector(servers, topic, partition, groupId, flatMessage);
-    }
-}

+ 0 - 24
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnectors.java

@@ -1,24 +0,0 @@
-package com.alibaba.otter.canal.client.rocketmq;
-
-/**
- * RocketMQ connector provider.
- */
-public class RocketMQCanalConnectors {
-
-    /**
-     * Create RocketMQ connector
-     *
-     * @param nameServers name servers for RocketMQ
-     * @param topic
-     * @param groupId
-     * @return {@link RocketMQCanalConnector}
-     */
-    public static RocketMQCanalConnector newRocketMQConnector(String nameServers, String topic, String groupId) {
-        return new RocketMQCanalConnector(nameServers, topic, groupId, false);
-    }
-
-    public static RocketMQCanalConnector newRocketMQConnector(String nameServers, String topic, String groupId,
-                                                              boolean flatMessage) {
-        return new RocketMQCanalConnector(nameServers, topic, groupId, flatMessage);
-    }
-}

+ 3 - 3
client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaClientExample.java

@@ -9,7 +9,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
 import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
-import com.alibaba.otter.canal.client.kafka.KafkaCanalConnectors;
 import com.alibaba.otter.canal.protocol.Message;
 
 /**
@@ -36,12 +35,13 @@ public class CanalKafkaClientExample {
                                                     };
 
     public CanalKafkaClientExample(String zkServers, String servers, String topic, Integer partition, String groupId){
-        connector = KafkaCanalConnectors.newKafkaConnector(servers, topic, partition, groupId, false);
+        connector = new KafkaCanalConnector(servers, topic, partition, groupId, null, false);
     }
 
     public static void main(String[] args) {
         try {
-            final CanalKafkaClientExample kafkaCanalClientExample = new CanalKafkaClientExample(AbstractKafkaTest.zkServers,
+            final CanalKafkaClientExample kafkaCanalClientExample = new CanalKafkaClientExample(
+                AbstractKafkaTest.zkServers,
                 AbstractKafkaTest.servers,
                 AbstractKafkaTest.topic,
                 AbstractKafkaTest.partition,

+ 1 - 1
client/src/test/java/com/alibaba/otter/canal/client/running/kafka/KafkaClientRunningTest.java

@@ -30,7 +30,7 @@ public class KafkaClientRunningTest extends AbstractKafkaTest {
     public void testKafkaConsumer() {
         final ExecutorService executor = Executors.newFixedThreadPool(1);
 
-        final KafkaCanalConnector connector = KafkaCanalConnectors.newKafkaConnector(servers, topic, partition, groupId);
+        final KafkaCanalConnector connector = new KafkaCanalConnector(servers, topic, partition, groupId, null, false);
 
         executor.submit(new Runnable() {
 

+ 1 - 2
client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/CanalRocketMQClientExample.java

@@ -9,7 +9,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
 import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
-import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectors;
 import com.alibaba.otter.canal.client.running.kafka.AbstractKafkaTest;
 import com.alibaba.otter.canal.protocol.Message;
 
@@ -37,7 +36,7 @@ public class CanalRocketMQClientExample extends AbstractRocektMQTest {
                                                     };
 
     public CanalRocketMQClientExample(String nameServers, String topic, String groupId){
-        connector = RocketMQCanalConnectors.newRocketMQConnector(nameServers, topic, groupId);
+        connector = new RocketMQCanalConnector(nameServers, topic, groupId, false);
     }
 
     public static void main(String[] args) {