浏览代码

Merge pull request #807 from qmz/master

instance with manager模式一些微调
agapple 6 年之前
父节点
当前提交
764e1b6dc0

+ 8 - 13
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -6,6 +6,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import com.alibaba.otter.canal.meta.FileMixedMetaManager;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -20,13 +21,7 @@ import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
 import com.alibaba.otter.canal.instance.core.AbstractCanalInstance;
 import com.alibaba.otter.canal.instance.manager.model.Canal;
 import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
-import com.alibaba.otter.canal.instance.manager.model.CanalParameter.DataSourcing;
-import com.alibaba.otter.canal.instance.manager.model.CanalParameter.HAMode;
-import com.alibaba.otter.canal.instance.manager.model.CanalParameter.IndexMode;
-import com.alibaba.otter.canal.instance.manager.model.CanalParameter.MetaMode;
-import com.alibaba.otter.canal.instance.manager.model.CanalParameter.SourcingType;
-import com.alibaba.otter.canal.instance.manager.model.CanalParameter.StorageMode;
-import com.alibaba.otter.canal.instance.manager.model.CanalParameter.StorageScavengeMode;
+import com.alibaba.otter.canal.instance.manager.model.CanalParameter.*;
 import com.alibaba.otter.canal.meta.MemoryMetaManager;
 import com.alibaba.otter.canal.meta.PeriodMixedMetaManager;
 import com.alibaba.otter.canal.meta.ZooKeeperMetaManager;
@@ -37,12 +32,7 @@ import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
 import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.LocalBinlogEventParser;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
-import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
-import com.alibaba.otter.canal.parse.index.FailbackLogPositionManager;
-import com.alibaba.otter.canal.parse.index.MemoryLogPositionManager;
-import com.alibaba.otter.canal.parse.index.MetaLogPositionManager;
-import com.alibaba.otter.canal.parse.index.PeriodMixedLogPositionManager;
-import com.alibaba.otter.canal.parse.index.ZooKeeperLogPositionManager;
+import com.alibaba.otter.canal.parse.index.*;
 import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
 import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.alibaba.otter.canal.sink.entry.EntryEventSink;
@@ -120,6 +110,11 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             ZooKeeperMetaManager zooKeeperMetaManager = new ZooKeeperMetaManager();
             zooKeeperMetaManager.setZkClientx(getZkclientx());
             ((PeriodMixedMetaManager) metaManager).setZooKeeperMetaManager(zooKeeperMetaManager);
+        } else if (mode.isLocalFile()){
+            FileMixedMetaManager fileMixedMetaManager = new FileMixedMetaManager();
+            fileMixedMetaManager.setDataDir(parameters.getDataDir());
+            fileMixedMetaManager.setPeriod(parameters.getMetaFileFlushPeriod());
+            metaManager = fileMixedMetaManager;
         } else {
             throw new CanalException("unsupport MetaMode for " + mode);
         }

+ 25 - 1
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java

@@ -28,8 +28,10 @@ public class CanalParameter implements Serializable {
     private Long                     zkClusterId;                                                    // zk集群id,为管理方便
     private List<String>             zkClusters;                                                     // zk集群地址
 
+    private String                   dataDir                            = "../conf";                 // 默认本地文件数据的目录默认是conf
     // meta相关参数
     private MetaMode                 metaMode                           = MetaMode.MEMORY;           // meta机制
+    private Integer                  metaFileFlushPeriod                = 1000;                      // meta刷新间隔
 
     // storage存储
     private Integer                  transactionSize                    = 1024;                      // 支持处理的transaction事务大小
@@ -243,7 +245,9 @@ public class CanalParameter implements Serializable {
         /** 文件存储模式 */
         ZOOKEEPER,
         /** 混合模式,内存+文件 */
-        MIXED;
+        MIXED,
+        /** 本地文件存储模式*/
+        LOCAL_FILE;
 
         public boolean isMemory() {
             return this.equals(MetaMode.MEMORY);
@@ -256,6 +260,10 @@ public class CanalParameter implements Serializable {
         public boolean isMixed() {
             return this.equals(MetaMode.MIXED);
         }
+
+        public boolean isLocalFile(){
+            return this.equals(MetaMode.LOCAL_FILE);
+        }
     }
 
     public static enum IndexMode {
@@ -390,6 +398,22 @@ public class CanalParameter implements Serializable {
         return storageMode;
     }
 
+    public String getDataDir() {
+        return dataDir;
+    }
+
+    public void setDataDir(String dataDir) {
+        this.dataDir = dataDir;
+    }
+
+    public Integer getMetaFileFlushPeriod() {
+        return metaFileFlushPeriod;
+    }
+
+    public void setMetaFileFlushPeriod(Integer metaFileFlushPeriod) {
+        this.metaFileFlushPeriod = metaFileFlushPeriod;
+    }
+
     public void setStorageMode(StorageMode storageMode) {
         this.storageMode = storageMode;
     }

+ 53 - 0
server/src/test/java/com/alibaba/otter/canal/server/CanalServerWithEmbedded_FileModeTest.java

@@ -0,0 +1,53 @@
+package com.alibaba.otter.canal.server;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+import com.alibaba.otter.canal.instance.manager.model.Canal;
+import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
+import com.alibaba.otter.canal.instance.manager.model.CanalParameter.*;
+
+public class CanalServerWithEmbedded_FileModeTest extends BaseCanalServerWithEmbededTest {
+
+    protected Canal buildCanal() {
+        Canal canal = new Canal();
+        canal.setId(1L);
+        canal.setName(DESTINATION);
+        canal.setDesc("my standalone server test ");
+
+        CanalParameter parameter = new CanalParameter();
+
+        parameter.setMetaMode(MetaMode.LOCAL_FILE);
+        parameter.setDataDir("./conf");
+        parameter.setMetaFileFlushPeriod(1000);
+        parameter.setHaMode(HAMode.HEARTBEAT);
+        parameter.setIndexMode(IndexMode.MEMORY_META_FAILBACK);
+
+        parameter.setStorageMode(StorageMode.MEMORY);
+        parameter.setMemoryStorageBufferSize(32 * 1024);
+
+        parameter.setSourcingType(SourcingType.MYSQL);
+        parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(MYSQL_ADDRESS, 3306),
+            new InetSocketAddress(MYSQL_ADDRESS, 3306)));
+        parameter.setDbUsername(USERNAME);
+        parameter.setDbPassword(PASSWORD);
+        parameter.setPositions(Arrays.asList("{\"journalName\":\"mysql-bin.000001\",\"position\":332L,\"timestamp\":\"1505998863000\"}",
+            "{\"journalName\":\"mysql-bin.000001\",\"position\":332L,\"timestamp\":\"1505998863000\"}"));
+
+        parameter.setSlaveId(1234L);
+
+        parameter.setDefaultConnectionTimeoutInSeconds(30);
+        parameter.setConnectionCharset("UTF-8");
+        parameter.setConnectionCharsetNumber((byte) 33);
+        parameter.setReceiveBufferSize(8 * 1024);
+        parameter.setSendBufferSize(8 * 1024);
+
+        parameter.setDetectingEnable(false);
+        parameter.setDetectingIntervalInSeconds(10);
+        parameter.setDetectingRetryTimes(3);
+        parameter.setDetectingSQL(DETECTING_SQL);
+
+        canal.setCanalParameter(parameter);
+        return canal;
+    }
+}