Răsfoiți Sursa

fixed mq config

七锋 7 ani în urmă
părinte
comite
071545987b

+ 1 - 2
client/src/test/java/com/alibaba/otter/canal/client/running/kafka/CanalKafkaClientExample.java

@@ -108,8 +108,7 @@ public class CanalKafkaClientExample {
                 connector.subscribe();
                 while (running) {
                     try {
-                        List<Message> messages = connector.getWithoutAck(1L, TimeUnit.SECONDS); // 获取message
-
+                        List<Message> messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS); // 获取message
                         if (messages == null) {
                             continue;
                         }

+ 15 - 10
client/src/test/java/com/alibaba/otter/canal/client/running/rocketmq/CanalRocketMQClientExample.java

@@ -1,12 +1,15 @@
 package com.alibaba.otter.canal.client.running.rocketmq;
 
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.kafka.common.errors.WakeupException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
 import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnector;
-import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectorProvider;
+import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectors;
 import com.alibaba.otter.canal.client.running.kafka.AbstractKafkaTest;
 import com.alibaba.otter.canal.protocol.Message;
 
@@ -34,7 +37,7 @@ public class CanalRocketMQClientExample extends AbstractRocektMQTest {
                                                     };
 
     public CanalRocketMQClientExample(String nameServers, String topic, String groupId){
-        connector = RocketMQCanalConnectorProvider.newRocketMQConnector(nameServers, topic, groupId);
+        connector = RocketMQCanalConnectors.newRocketMQConnector(nameServers, topic, groupId);
     }
 
     public static void main(String[] args) {
@@ -103,21 +106,23 @@ public class CanalRocketMQClientExample extends AbstractRocektMQTest {
                 connector.connect();
                 connector.subscribe();
                 while (running) {
-                    Message message = connector.getWithoutAck(1); // 获取message
-                    try {
-                        if (message == null) {
-                            continue;
-                        }
+                    List<Message> messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS); // 获取message
+                    for (Message message : messages) {
                         long batchId = message.getId();
                         int size = message.getEntries().size();
                         if (batchId == -1 || size == 0) {
+                            // try {
+                            // Thread.sleep(1000);
+                            // } catch (InterruptedException e) {
+                            // }
                         } else {
+                            // printSummary(message, batchId, size);
+                            // printEntry(message.getEntries());
                             logger.info(message.toString());
                         }
-                    } catch (Exception e) {
-                        logger.error(e.getMessage(), e);
                     }
-                    connector.ack(message.getId()); // 提交确认
+
+                    connector.ack(); // 提交确认
                 }
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);

+ 1 - 1
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -387,7 +387,7 @@ public class CanalController {
         return config;
     }
 
-    public String getProperty(Properties properties, String key) {
+    public static String getProperty(Properties properties, String key) {
         key = StringUtils.trim(key);
         String value = System.getProperty(key);
 

+ 13 - 8
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java

@@ -38,6 +38,19 @@ public class CanalLauncher {
                 properties.load(new FileInputStream(conf));
             }
 
+            CanalMQProducer canalMQProducer = null;
+            String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
+            if (serverMode.equalsIgnoreCase("kafka")) {
+                canalMQProducer = new CanalKafkaProducer();
+            } else if (serverMode.equalsIgnoreCase("rocketmq")) {
+                canalMQProducer = new CanalRocketMQProducer();
+            }
+
+            if (canalMQProducer != null) {
+                // disable netty
+                System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
+            }
+
             logger.info("## start the canal server.");
             final CanalController controller = new CanalController(properties);
             controller.start();
@@ -57,19 +70,11 @@ public class CanalLauncher {
 
             });
 
-            CanalMQProducer canalMQProducer = null;
-            String serverMode = controller.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
-            if (serverMode.equalsIgnoreCase("kafka")) {
-                canalMQProducer = new CanalKafkaProducer();
-            } else if (serverMode.equalsIgnoreCase("rocketmq")) {
-                canalMQProducer = new CanalRocketMQProducer();
-            }
             if (canalMQProducer != null) {
                 CanalMQStarter canalServerStarter = new CanalMQStarter(canalMQProducer);
                 if (canalServerStarter != null) {
                     canalServerStarter.init();
                 }
-
             }
         } catch (Throwable e) {
             logger.error("## Something goes wrong when starting up the canal Server:", e);

+ 2 - 2
deployer/src/main/resources/mq.yml

@@ -6,9 +6,9 @@ bufferMemory: 33554432
 
 # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
 canalBatchSize: 50
-# Canal get数据的超时时间, 单位: 毫秒, 为不限超时
+# Canal get数据的超时时间, 单位: 毫秒, 0为不限超时
 canalGetTimeout: 100
-flatMessage: false
+flatMessage: true
 
 canalDestinations:
   - canalDestination: example

+ 0 - 1
protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java

@@ -277,7 +277,6 @@ public class FlatMessage implements Serializable {
             partitionsNum = 1;
         }
         FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];
-
         String pk = pkHashConfig.get(flatMessage.getDatabase() + "." + flatMessage.getTable());
         if (pk == null || flatMessage.getIsDdl()) {
             partitionMessages[0] = flatMessage;

+ 2 - 2
server/src/main/java/com/alibaba/otter/canal/common/MQProperties.java

@@ -12,7 +12,7 @@ import java.util.Map;
  */
 public class MQProperties {
 
-    private String                 servers                = "localhost:6667";
+    private String                 servers                = "127.0.0.1:6667";
     private int                    retries                = 0;
     private int                    batchSize              = 16384;
     private int                    lingerMs               = 1;
@@ -20,7 +20,7 @@ public class MQProperties {
     private boolean                filterTransactionEntry = true;
     private String                 producerGroup          = "Canal-Producer";
     private int                    canalBatchSize         = 50;
-    private Long                   canalGetTimeout;
+    private Long                   canalGetTimeout        = 100L;
     private boolean                flatMessage            = true;
 
     private List<CanalDestination> canalDestinations      = new ArrayList<CanalDestination>();

+ 5 - 6
server/src/main/java/com/alibaba/otter/canal/server/CanalMQStarter.java

@@ -121,15 +121,14 @@ public class CanalMQStarter {
                 server.subscribe(clientIdentity);
                 logger.info("## the canal consumer {} is running now ......", destination.getCanalDestination());
 
+                Long getTimeout = properties.getCanalGetTimeout();
+                int getBatchSize = properties.getCanalBatchSize();
                 while (running) {
                     Message message;
-                    if (properties.getCanalGetTimeout() != null) {
-                        message = server.getWithoutAck(clientIdentity,
-                            properties.getCanalBatchSize(),
-                            properties.getCanalGetTimeout(),
-                            TimeUnit.MILLISECONDS);
+                    if (getTimeout != null && getTimeout > 0) {
+                        message = server.getWithoutAck(clientIdentity, getBatchSize, getTimeout, TimeUnit.MILLISECONDS);
                     } else {
-                        message = server.getWithoutAck(clientIdentity, properties.getCanalBatchSize());
+                        message = server.getWithoutAck(clientIdentity, getBatchSize);
                     }
 
                     final long batchId = message.getId();