Przeglądaj źródła

Resolve conflict

duhengforever 6 lat temu
rodzic
commit
86fd77f9c4

+ 15 - 24
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java

@@ -15,9 +15,8 @@ import com.alibaba.otter.canal.client.rocketmq.RocketMQCanalConnectorProvider;
 import com.alibaba.otter.canal.protocol.Message;
 
 /**
- * kafka对应的client适配器工作线程
+ * rocketmq对应的client适配器工作线程
  *
- * @author rewerma 2018-8-19 下午11:30:49
  * @version 1.0.0
  */
 public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
@@ -65,13 +64,19 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
                             message = connector.getFlatMessageWithoutAck();
                         }
                         if (message != null) {
-<<<<<<< HEAD:client-launcher/src/main/java/com/alibaba/otter/canal/client/adapter/loader/CanalAdapterRocketMQWorker.java
                             final Object msg = message;
                             executor.submit(new Runnable() {
                                 @Override
                                 public void run() {
                                     try {
+                                        if (logger.isDebugEnabled()) {
+                                            logger.debug("topic: {} batchId: {} batchSize: {} ",
+                                                topic,
+                                                message.getId(),
+                                                message.getEntries().size());
+                                        }
                                         if (msg != null) {
+                                            long begin = System.currentTimeMillis();
                                             if (msg instanceof Message) {
                                                 Message receive = (Message) msg;
                                                 writeOut(receive, topic);
@@ -81,31 +86,17 @@ public class CanalAdapterRocketMQWorker extends AbstractCanalAdapterWorker {
                                                 writeOut(receive);
                                                 connector.ack(receive.getId());
                                             }
+                                            long now = System.currentTimeMillis();
+                                            if ((now - begin) > 5 * 60 * 1000) {
+                                                logger.error("topic: {} batchId {} elapsed time: {} ms",
+                                                    topic,
+                                                    message.getId(),
+                                                    now - begin);
+                                            }
                                         }
                                     } catch (Exception e) {
                                         logger.error(e.getMessage(), e);
                                     }
-=======
-                            executor.submit(() -> {
-                                try {
-                                    if (logger.isDebugEnabled()) {
-                                        logger.debug("topic: {} batchId: {} batchSize: {} ",
-                                            topic,
-                                            message.getId(),
-                                            message.getEntries().size());
-                                    }
-                                    long begin = System.currentTimeMillis();
-                                    writeOut(message);
-                                    long now = System.currentTimeMillis();
-                                    if ((System.currentTimeMillis() - begin) > 5 * 60 * 1000) {
-                                        logger.error("topic: {} batchId {} elapsed time: {} ms",
-                                            topic,
-                                            message.getId(),
-                                            now - begin);
-                                    }
-                                } catch (Exception e) {
-                                    logger.error(e.getMessage(), e);
->>>>>>> 2c6bd3ee1b23f30fdb2b1f748805284a4fed872c:client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterRocketMQWorker.java
                                 }
                                 connector.ack(message.getId());
                             });