agapple 6 years ago
parent
commit
03e6662ce5

+ 6 - 5
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterKafkaWorker.java

@@ -3,11 +3,8 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.protocol.Message;
 import org.apache.kafka.common.errors.WakeupException;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
@@ -44,8 +41,12 @@ public class CanalAdapterKafkaWorker extends AbstractCanalAdapterWorker {
 
     @Override
     protected void process() {
-        while (!running)
-            ;
+        while (!running) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+            }
+        }
         ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
         int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒

+ 14 - 9
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -3,11 +3,7 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
-import com.alibaba.otter.canal.protocol.FlatMessage;
-import com.alibaba.otter.canal.protocol.Message;
 import org.apache.kafka.common.errors.WakeupException;
 
 import com.alibaba.otter.canal.client.adapter.OuterAdapter;
@@ -33,18 +29,27 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
         this.topic = topic;
         this.flatMessage = flatMessage;
         this.canalDestination = topic;
-        this.connector = new RocketMQCanalConnector(nameServers, topic, groupId, accessKey, secretKey, flatMessage);
+        this.connector = new RocketMQCanalConnector(nameServers,
+            topic,
+            groupId,
+            accessKey,
+            secretKey,
+            canalClientConfig.getBatchSize(),
+            flatMessage);
         logger.info("RocketMQ consumer config topic:{}, nameServer:{}, groupId:{}", topic, nameServers, groupId);
     }
 
     @Override
     protected void process() {
-        while (!running)
-            ;
+        while (!running) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+            }
+        }
 
         ExecutorService workerExecutor = Executors.newSingleThreadExecutor();
-        int retry = canalClientConfig.getRetries() == null
-                    || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
+        int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         long timeout = canalClientConfig.getTimeout() == null ? 30000 : canalClientConfig.getTimeout(); // 默认超时30秒
 
         while (running) {

+ 11 - 5
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterWorker.java

@@ -2,7 +2,8 @@ package com.alibaba.otter.canal.adapter.launcher.loader;
 
 import java.net.SocketAddress;
 import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import com.alibaba.otter.canal.client.CanalConnector;
 import com.alibaba.otter.canal.client.CanalConnectors;
@@ -58,11 +59,16 @@ public class CanalAdapterWorker extends AbstractCanalAdapterWorker {
 
     @Override
     protected void process() {
-        while (!running)
-            ; // waiting until running == true
+        while (!running) { // waiting until running == true
+            while (!running) {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                }
+            }
+        }
 
-        int retry = canalClientConfig.getRetries() == null
-                    || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
+        int retry = canalClientConfig.getRetries() == null || canalClientConfig.getRetries() == 0 ? 1 : canalClientConfig.getRetries();
         if (retry == -1) {
             // 重试次数-1代表异常时一直阻塞重试
             retry = Integer.MAX_VALUE;

+ 13 - 9
client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

@@ -1,7 +1,5 @@
 package com.alibaba.otter.canal.client.rocketmq;
 
-import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
-import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -26,6 +24,8 @@ import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.aliyun.openservices.apache.api.impl.authority.SessionCredentials;
+import com.aliyun.openservices.apache.api.impl.rocketmq.ClientRPCHook;
 import com.google.common.collect.Lists;
 
 /**
@@ -48,24 +48,26 @@ public class RocketMQCanalConnector implements CanalMQConnector {
     private volatile boolean                    connected           = false;
     private DefaultMQPushConsumer               rocketMQConsumer;
     private BlockingQueue<ConsumerBatchMessage> messageBlockingQueue;
+    private int                                 batchSize           = -1;
     private long                                batchProcessTimeout = 60 * 1000;
     private boolean                             flatMessage;
     private volatile ConsumerBatchMessage       lastGetBatchMessage = null;
     private String                              accessKey;
     private String                              secretKey;
 
-
-    public RocketMQCanalConnector(String nameServer, String topic, String groupName, boolean flatMessage){
+    public RocketMQCanalConnector(String nameServer, String topic, String groupName, Integer batchSize,
+                                  boolean flatMessage){
         this.nameServer = nameServer;
         this.topic = topic;
         this.groupName = groupName;
         this.flatMessage = flatMessage;
         this.messageBlockingQueue = new LinkedBlockingQueue<>(1024);
+        this.batchSize = batchSize;
     }
 
-    public RocketMQCanalConnector(String nameServer, String topic, String groupName,
-        String accessKey, String secretKey, boolean flatMessage){
-        this(nameServer, topic, groupName, flatMessage);
+    public RocketMQCanalConnector(String nameServer, String topic, String groupName, String accessKey,
+                                  String secretKey, Integer batchSize, boolean flatMessage){
+        this(nameServer, topic, groupName, batchSize, flatMessage);
         this.accessKey = accessKey;
         this.secretKey = secretKey;
     }
@@ -73,8 +75,7 @@ public class RocketMQCanalConnector implements CanalMQConnector {
     public void connect() throws CanalClientException {
 
         RPCHook rpcHook = null;
-        if(null != accessKey && accessKey.length() > 0
-            && null != secretKey && secretKey.length() > 0){
+        if (null != accessKey && accessKey.length() > 0 && null != secretKey && secretKey.length() > 0) {
             SessionCredentials sessionCredentials = new SessionCredentials();
             sessionCredentials.setAccessKey(accessKey);
             sessionCredentials.setSecretKey(secretKey);
@@ -85,6 +86,9 @@ public class RocketMQCanalConnector implements CanalMQConnector {
         if (!StringUtils.isBlank(nameServer)) {
             rocketMQConsumer.setNamesrvAddr(nameServer);
         }
+        if (batchSize != -1) {
+            rocketMQConsumer.setConsumeMessageBatchMaxSize(batchSize);
+        }
     }
 
     public void disconnect() throws CanalClientException {

+ 7 - 2
client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaClientExample.java

@@ -99,8 +99,13 @@ public class CanalKafkaClientExample {
     }
 
     private void process() {
-        while (!running)
-            ;
+        while (!running) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+            }
+        }
+
         while (running) {
             try {
                 connector.connect();

+ 12 - 6
client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaOffsetClientExample.java

@@ -1,14 +1,15 @@
 package com.alibaba.otter.canal.client.running.kafka;
 
-import com.alibaba.otter.canal.client.kafka.KafkaOffsetCanalConnector;
-import com.alibaba.otter.canal.client.kafka.protocol.KafkaMessage;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.kafka.common.errors.WakeupException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import com.alibaba.otter.canal.client.kafka.KafkaOffsetCanalConnector;
+import com.alibaba.otter.canal.client.kafka.protocol.KafkaMessage;
 
 /**
  * KafkaOffsetCanalConnector 使用示例
@@ -99,8 +100,13 @@ public class CanalKafkaOffsetClientExample {
     }
 
     private void process() {
-        while (!running)
-            ;
+        while (!running) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+            }
+        }
+
         while (running) {
             try {
                 // 修改 AutoOffsetReset 的值,默认(earliest)

+ 8 - 3
client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/CanalRocketMQClientExample.java

@@ -36,7 +36,7 @@ public class CanalRocketMQClientExample extends AbstractRocektMQTest {
                                                     };
 
     public CanalRocketMQClientExample(String nameServers, String topic, String groupId){
-        connector = new RocketMQCanalConnector(nameServers, topic, groupId, false);
+        connector = new RocketMQCanalConnector(nameServers, topic, groupId, 500, false);
     }
 
     public static void main(String[] args) {
@@ -97,8 +97,13 @@ public class CanalRocketMQClientExample extends AbstractRocektMQTest {
     }
 
     private void process() {
-        while (!running)
-            ;
+        while (!running) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+            }
+        }
+
         while (running) {
             try {
                 connector.connect();

+ 16 - 9
deployer/src/main/java/com/alibaba/otter/canal/deployer/monitor/ManagerRemoteConfigMonitor.java

@@ -4,8 +4,16 @@ import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileWriter;
 import java.nio.charset.StandardCharsets;
-import java.sql.*;
-import java.util.*;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -39,7 +47,7 @@ public class ManagerRemoteConfigMonitor {
 
     private long                     scanIntervalInSecond   = 5;
     private ScheduledExecutorService executor               = Executors.newScheduledThreadPool(2,
-        new NamedThreadFactory("remote-canal-config-scan"));
+                                                                new NamedThreadFactory("remote-canal-config-scan"));
 
     public ManagerRemoteConfigMonitor(String jdbcUrl, String jdbcUsername, String jdbcPassword){
         this.jdbcUrl = jdbcUrl;
@@ -75,8 +83,8 @@ public class ManagerRemoteConfigMonitor {
                     overrideLocalCanalConfig(configItem.getContent());
                     properties = new Properties();
                     properties.load(new ByteArrayInputStream(configItem.getContent().getBytes(StandardCharsets.UTF_8)));
-                    scanIntervalInSecond = Integer
-                        .valueOf(properties.getProperty(CanalConstants.CANAL_AUTO_SCAN_INTERVAL, "5"));
+                    scanIntervalInSecond = Integer.valueOf(properties.getProperty(CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
+                        "5"));
                 }
             }
         } catch (Exception e) {
@@ -223,8 +231,7 @@ public class ManagerRemoteConfigMonitor {
         Map<String, ConfigItem> changedInstanceConfigs = modifiedInstanceConfigs[0];
         if (changedInstanceConfigs != null) {
             for (ConfigItem configItem : changedInstanceConfigs.values()) {
-                try (FileWriter writer = new FileWriter(
-                    getConfPath() + configItem.getName() + "/instance.properties")) {
+                try (FileWriter writer = new FileWriter(getConfPath() + configItem.getName() + "/instance.properties")) {
                     writer.write(configItem.getContent());
                     writer.flush();
                 } catch (Exception e) {
@@ -351,8 +358,8 @@ public class ManagerRemoteConfigMonitor {
         }
     }
 
-    public interface Listener<Properties> {
+    public interface Listener<T> {
 
-        void onChange(Properties properties);
+        void onChange(T properties);
     }
 }