Browse Source

fixed code format & merge

agapple 4 years ago
parent
commit
1369418c6a

+ 9 - 1
admin/admin-web/src/main/resources/canal-template.properties

@@ -64,6 +64,9 @@ canal.instance.filter.query.ddl = false
 canal.instance.filter.table.error = false
 canal.instance.filter.rows = false
 canal.instance.filter.transaction.entry = false
+canal.instance.filter.dml.insert = false
+canal.instance.filter.dml.update = false
+canal.instance.filter.dml.delete = false
 
 # binlog format/image check
 canal.instance.binlog.format = ROW,STATEMENT,MIXED 
@@ -99,6 +102,9 @@ canal.conf.dir = ../conf
 # auto scan instance dir add/remove and start/stop instance
 canal.auto.scan = true
 canal.auto.scan.interval = 5
+# set this value to 'true' means that when binlog pos not found, skip to latest.
+# WARN: pls keep 'false' in production env, or if you know what you want.
+canal.auto.reset.latest.pos.mode = false
 
 canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
 #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
@@ -155,6 +161,7 @@ rocketmq.namespace =
 rocketmq.namesrv.addr = 127.0.0.1:9876
 rocketmq.retry.times.when.send.failed = 0
 rocketmq.vip.channel.enabled = false
+rocketmq.tag = 
 
 ##################################################
 ######### 		    RabbitMQ	     #############
@@ -163,4 +170,5 @@ rabbitmq.host =
 rabbitmq.virtual.host =
 rabbitmq.exchange =
 rabbitmq.username =
-rabbitmq.password =
+rabbitmq.password =
+rabbitmq.deliveryMode =

+ 5 - 4
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/URLClassExtensionLoader.java

@@ -7,7 +7,8 @@ import java.util.Enumeration;
 import java.util.NoSuchElementException;
 
 public class URLClassExtensionLoader extends URLClassLoader {
-    public URLClassExtensionLoader(URL[] urls) {
+
+    public URLClassExtensionLoader(URL[] urls){
         super(urls);
     }
 
@@ -18,9 +19,9 @@ public class URLClassExtensionLoader extends URLClassLoader {
             return c;
         }
 
-        if (name.startsWith("java.") || name.startsWith("org.slf4j.")
-                || name.startsWith("org.apache.logging")
-                || name.startsWith("org.apache.commons.logging.")) {
+        if (name.startsWith("java.") || name.startsWith("org.slf4j.") || name.startsWith("org.apache.logging")
+            || name.startsWith("org.apache.zookeeper.") || name.startsWith("org.I0Itec.zkclient.")
+            || name.startsWith("org.apache.commons.logging.")) {
             // || name.startsWith("org.apache.hadoop."))
             // {
             c = super.loadClass(name);

+ 0 - 1
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/config/RabbitMQConstants.java

@@ -15,7 +15,6 @@ public class RabbitMQConstants {
     public static final String RABBITMQ_VIRTUAL_HOST     = ROOT + "." + "virtual.host";
     public static final String RABBITMQ_USERNAME         = ROOT + "." + "username";
     public static final String RABBITMQ_PASSWORD         = ROOT + "." + "password";
-    public static final String RABBITMQ_DELIVERY_NODE    = ROOT + "." + "deliveryMode";
 
     public static final String RABBITMQ_RESOURCE_OWNERID = ROOT + "." + "rabbitmq.resource.ownerId";
 }

+ 0 - 10
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/config/RabbitMQProducerConfig.java

@@ -15,8 +15,6 @@ public class RabbitMQProducerConfig extends MQProperties {
     private String exchange;
     private String username;
     private String password;
-    // 1:transient, 2:"persistent"
-    private String  deliveryMode;
 
     public String getHost() {
         return host;
@@ -57,12 +55,4 @@ public class RabbitMQProducerConfig extends MQProperties {
     public void setPassword(String password) {
         this.password = password;
     }
-
-    public String getDeliveryMode() {
-        return deliveryMode;
-    }
-
-    public void setDeliveryMode(String deliveryMode) {
-        this.deliveryMode = deliveryMode;
-    }
 }

+ 5 - 8
connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java

@@ -6,7 +6,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeoutException;
 
-import com.rabbitmq.client.*;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,6 +26,10 @@ import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQConstants;
 import com.alibaba.otter.canal.connector.rabbitmq.config.RabbitMQProducerConfig;
 import com.alibaba.otter.canal.protocol.FlatMessage;
 import com.alibaba.otter.canal.protocol.Message;
+import com.rabbitmq.client.AlreadyClosedException;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
 
 /**
  * RabbitMQ Producer SPI 实现
@@ -103,10 +106,6 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
         if (!StringUtils.isEmpty(password)) {
             rabbitMQProperties.setPassword(password);
         }
-        String deliveryMode = properties.getProperty(RabbitMQConstants.RABBITMQ_DELIVERY_NODE);
-        if (!StringUtils.isEmpty(deliveryMode)) {
-            rabbitMQProperties.setDeliveryMode(deliveryMode);
-        }
     }
 
     @Override
@@ -166,9 +165,7 @@ public class CanalRabbitMQProducer extends AbstractMQProducer implements CanalMQ
         // tips: 目前逻辑中暂不处理对exchange处理,请在Console后台绑定 才可使用routekey
         try {
             RabbitMQProducerConfig rabbitMQProperties = (RabbitMQProducerConfig) this.mqProperties;
-            AMQP.BasicProperties properties = rabbitMQProperties != null && rabbitMQProperties.getDeliveryMode() != null
-                    && "2".equals(rabbitMQProperties.getDeliveryMode()) ? MessageProperties.MINIMAL_PERSISTENT_BASIC : null;
-            channel.basicPublish(rabbitMQProperties.getExchange(), queueName, properties, message);
+            channel.basicPublish(rabbitMQProperties.getExchange(), queueName, null, message);
         } catch (Throwable e) {
             throw new RuntimeException(e);
         }

+ 14 - 9
connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java

@@ -140,7 +140,7 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
         if (!StringUtils.isEmpty(vipChannelEnabled)) {
             rocketMQProperties.setVipChannelEnabled(Boolean.parseBoolean(vipChannelEnabled));
         }
-        String tag = properties.getProperty(RocketMQConstants.ROCKETMQ_TAG);
+        String tag = PropertiesUtils.getProperty(properties, RocketMQConstants.ROCKETMQ_TAG);
         if (!StringUtils.isEmpty(tag)) {
             rocketMQProperties.setTag(tag);
         }
@@ -207,8 +207,10 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
                     if (dataPartition != null) {
                         final int index = i;
                         template.submit(() -> {
-                            Message data = new Message(topicName, ((RocketMQProducerConfig)this.mqProperties).getTag(), CanalMessageSerializerUtil.serializer(dataPartition,
-                                mqProperties.isFilterTransactionEntry()));
+                            Message data = new Message(topicName,
+                                ((RocketMQProducerConfig) this.mqProperties).getTag(),
+                                CanalMessageSerializerUtil.serializer(dataPartition,
+                                    mqProperties.isFilterTransactionEntry()));
                             sendMessage(data, index);
                         });
                     }
@@ -217,8 +219,9 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
                 template.waitForResult();
             } else {
                 final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
-                Message data = new Message(topicName, ((RocketMQProducerConfig)this.mqProperties).getTag(), CanalMessageSerializerUtil.serializer(message,
-                    mqProperties.isFilterTransactionEntry()));
+                Message data = new Message(topicName,
+                    ((RocketMQProducerConfig) this.mqProperties).getTag(),
+                    CanalMessageSerializerUtil.serializer(message, mqProperties.isFilterTransactionEntry()));
                 sendMessage(data, partition);
             }
         } else {
@@ -254,8 +257,9 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
                         final int index = i;
                         template.submit(() -> {
                             List<Message> messages = flatMessagePart.stream()
-                                .map(flatMessage -> new Message(topicName, ((RocketMQProducerConfig)this.mqProperties).getTag(), JSON.toJSONBytes(flatMessage,
-                                    SerializerFeature.WriteMapNullValue)))
+                                .map(flatMessage -> new Message(topicName,
+                                    ((RocketMQProducerConfig) this.mqProperties).getTag(),
+                                    JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)))
                                 .collect(Collectors.toList());
                             // 批量发送
                             sendMessage(messages, index);
@@ -268,8 +272,9 @@ public class CanalRocketMQProducer extends AbstractMQProducer implements CanalMQ
             } else {
                 final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
                 List<Message> messages = flatMessages.stream()
-                    .map(flatMessage -> new Message(topicName, ((RocketMQProducerConfig)this.mqProperties).getTag(), JSON.toJSONBytes(flatMessage,
-                        SerializerFeature.WriteMapNullValue)))
+                    .map(flatMessage -> new Message(topicName,
+                        ((RocketMQProducerConfig) this.mqProperties).getTag(),
+                        JSON.toJSONBytes(flatMessage, SerializerFeature.WriteMapNullValue)))
                     .collect(Collectors.toList());
                 // 批量发送
                 sendMessage(messages, partition);

+ 9 - 5
deployer/src/main/resources/canal.properties

@@ -2,7 +2,7 @@
 ######### 		common argument		#############
 #################################################
 # tcp bind ip
-canal.ip = 
+canal.ip =
 # register ip to zookeeper
 canal.register.ip =
 canal.port = 11111
@@ -16,6 +16,10 @@ canal.metrics.pull.port = 11112
 canal.admin.port = 11110
 canal.admin.user = admin
 canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
+# admin auto register
+#canal.admin.register.auto = true
+#canal.admin.register.cluster =
+#canal.admin.register.name =
 
 canal.zkServers =
 # flush data to zk
@@ -60,9 +64,9 @@ canal.instance.filter.query.ddl = false
 canal.instance.filter.table.error = false
 canal.instance.filter.rows = false
 canal.instance.filter.transaction.entry = false
-canal.instance.filter.dml.insert=false
-canal.instance.filter.dml.update=false
-canal.instance.filter.dml.delete=false
+canal.instance.filter.dml.insert = false
+canal.instance.filter.dml.update = false
+canal.instance.filter.dml.delete = false
 
 # binlog format/image check
 canal.instance.binlog.format = ROW,STATEMENT,MIXED 
@@ -157,7 +161,7 @@ rocketmq.namespace =
 rocketmq.namesrv.addr = 127.0.0.1:9876
 rocketmq.retry.times.when.send.failed = 0
 rocketmq.vip.channel.enabled = false
-rocketmq.tag = testTag
+rocketmq.tag = 
 
 ##################################################
 ######### 		    RabbitMQ	     #############

+ 22 - 18
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java

@@ -53,7 +53,7 @@ import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
  */
 public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implements MultiStageCoprocessor {
 
-    private static final int                  maxFullTimes = 10;
+    private static final int                  maxFullTimes    = 10;
     private LogEventConvert                   logEventConvert;
     private EventTransactionBuffer            transactionBuffer;
     private ErosaConnection                   connection;
@@ -71,13 +71,13 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
     private BatchEventProcessor<MessageEvent> simpleParserStage;
     private BatchEventProcessor<MessageEvent> sinkStoreStage;
     private LogContext                        logContext;
-
-    protected boolean              filterDmlInsert           = false;
-    protected boolean              filterDmlUpdate           = false;
-    protected boolean              filterDmlDelete           = false;
+    protected boolean                         filterDmlInsert = false;
+    protected boolean                         filterDmlUpdate = false;
+    protected boolean                         filterDmlDelete = false;
 
     public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert,
-                                      EventTransactionBuffer transactionBuffer, String destination, boolean filterDmlInsert, boolean filterDmlUpdate, boolean filterDmlDelete){
+                                      EventTransactionBuffer transactionBuffer, String destination,
+                                      boolean filterDmlInsert, boolean filterDmlUpdate, boolean filterDmlDelete){
         this.ringBufferSize = ringBufferSize;
         this.parserThreadCount = parserThreadCount;
         this.logEventConvert = logEventConvert;
@@ -106,8 +106,8 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
         // stage 2
         this.logContext = new LogContext();
         simpleParserStage = new BatchEventProcessor<>(disruptorMsgBuffer,
-                sequenceBarrier,
-                new SimpleParserStage(logContext));
+            sequenceBarrier,
+            new SimpleParserStage(logContext));
         simpleParserStage.setExceptionHandler(exceptionHandler);
         disruptorMsgBuffer.addGatingSequences(simpleParserStage.getSequence());
 
@@ -126,9 +126,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
         // stage 4
         SequenceBarrier sinkSequenceBarrier = disruptorMsgBuffer.newBarrier(sequence);
-        sinkStoreStage = new BatchEventProcessor<>(disruptorMsgBuffer,
-                sinkSequenceBarrier,
-                new SinkStoreStage());
+        sinkStoreStage = new BatchEventProcessor<>(disruptorMsgBuffer, sinkSequenceBarrier, new SinkStoreStage());
         sinkStoreStage.setExceptionHandler(exceptionHandler);
         disruptorMsgBuffer.addGatingSequences(sinkStoreStage.getSequence());
 
@@ -278,19 +276,25 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
                 switch (eventType) {
                     case LogEvent.WRITE_ROWS_EVENT_V1:
                     case LogEvent.WRITE_ROWS_EVENT:
-                        tableMeta = logEventConvert.parseRowsEventForTableMeta((WriteRowsLogEvent) logEvent);
-                        needDmlParse = !filterDmlInsert;//true;
+                        if (!filterDmlInsert) {
+                            tableMeta = logEventConvert.parseRowsEventForTableMeta((WriteRowsLogEvent) logEvent);
+                            needDmlParse = true;
+                        }
                         break;
                     case LogEvent.UPDATE_ROWS_EVENT_V1:
                     case LogEvent.PARTIAL_UPDATE_ROWS_EVENT:
                     case LogEvent.UPDATE_ROWS_EVENT:
-                        tableMeta = logEventConvert.parseRowsEventForTableMeta((UpdateRowsLogEvent) logEvent);
-                        needDmlParse = !filterDmlUpdate;//true;
+                        if (!filterDmlUpdate) {
+                            tableMeta = logEventConvert.parseRowsEventForTableMeta((UpdateRowsLogEvent) logEvent);
+                            needDmlParse = true;
+                        }
                         break;
                     case LogEvent.DELETE_ROWS_EVENT_V1:
                     case LogEvent.DELETE_ROWS_EVENT:
-                        tableMeta = logEventConvert.parseRowsEventForTableMeta((DeleteRowsLogEvent) logEvent);
-                        needDmlParse = !filterDmlDelete;//true;
+                        if (!filterDmlDelete) {
+                            tableMeta = logEventConvert.parseRowsEventForTableMeta((DeleteRowsLogEvent) logEvent);
+                            needDmlParse = true;
+                        }
                         break;
                     case LogEvent.ROWS_QUERY_LOG_EVENT:
                         needDmlParse = true;
@@ -448,7 +452,7 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement
 
         @Override
         public void handleEventException(final Throwable ex, final long sequence, final Object event) {
-            //异常上抛,否则processEvents的逻辑会默认会mark为成功执行,有丢数据风险
+            // 异常上抛,否则processEvents的逻辑会默认会mark为成功执行,有丢数据风险
             throw new CanalParseException(ex);
         }
 

+ 4 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -12,8 +12,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidListLogEvent;
-import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidLogEvent;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.slf4j.Logger;
@@ -60,6 +58,8 @@ import com.taobao.tddl.dbsync.binlog.event.UserVarLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
+import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidListLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.mariadb.MariaGtidLogEvent;
 import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException;
 
 /**
@@ -195,9 +195,9 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         Pair.Builder builder = Pair.newBuilder();
         builder.setKey("gtid");
         if (logEvent instanceof MariaGtidLogEvent) {
-            builder.setValue(((MariaGtidLogEvent)logEvent).getGtidStr());
+            builder.setValue(((MariaGtidLogEvent) logEvent).getGtidStr());
         } else if (logEvent instanceof MariaGtidListLogEvent) {
-            builder.setValue(((MariaGtidListLogEvent)logEvent).getGtidStr());
+            builder.setValue(((MariaGtidListLogEvent) logEvent).getGtidStr());
         }
         Header header = createHeader(logHeader, "", "", EventType.GTID);
         return createEntry(header, EntryType.GTIDLOG, builder.build().toByteString());
@@ -294,7 +294,6 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
 
             Header header = createHeader(event.getHeader(), schemaName, tableName, type);
             RowChange.Builder rowChangeBuilder = RowChange.newBuilder();
-
             rowChangeBuilder.setIsDdl(!isDml);
             rowChangeBuilder.setSql(queryString);
             if (StringUtils.isNotEmpty(event.getDbName())) {// 可能为空