Browse Source

rename log manager module and ajust xml

yinxiu 8 years ago
parent
commit
a78622ae02

+ 5 - 5
deployer/src/main/resources/spring/default-instance.xml

@@ -128,14 +128,14 @@
 		<!-- 解析位点记录 -->
 		<property name="logPositionManager">
 			<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
-				<property name="primary">
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
-				</property>
-				<property name="failback">
+				</constructor-arg>
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
-						<property name="metaManager" ref="metaManager" />
+						<constructor-arg ref="metaManager"/>
 					</bean>
-				</property>
+				</constructor-arg>
 			</bean>
 		</property>
 		

+ 5 - 5
deployer/src/main/resources/spring/file-instance.xml

@@ -113,14 +113,14 @@
 		<!-- 解析位点记录 -->
 		<property name="logPositionManager">
 			<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
-				<property name="primary">
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
-				</property>
-				<property name="failback">
+				</constructor-arg>
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
-						<property name="metaManager" ref="metaManager" />
+						<constructor-arg ref="metaManager"/>
 					</bean>
-				</property>
+				</constructor-arg>
 			</bean>
 		</property>
 		

+ 5 - 5
deployer/src/main/resources/spring/local-instance.xml

@@ -123,14 +123,14 @@
 		<!-- 解析位点记录 -->
 		<property name="logPositionManager">
 			<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
-				<property name="primary">
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
-				</property>
-				<property name="failback">
+				</constructor-arg>
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
-						<property name="metaManager" ref="metaManager" />
+						<constructor-arg ref="metaManager"/>
 					</bean>
-				</property>
+				</constructor-arg>
 			</bean>
 		</property>
 		

+ 3 - 3
instance/core/src/main/java/com/alibaba/otter/canal/instance/core/AbstractCanalInstance.java

@@ -2,7 +2,7 @@ package com.alibaba.otter.canal.instance.core;
 
 import java.util.List;
 
-import com.alibaba.otter.canal.parse.index.LogPositionManager;
+import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -171,7 +171,7 @@ public class AbstractCanalInstance extends AbstractCanalLifeCycle implements Can
         if (eventParser instanceof AbstractEventParser) {
             AbstractEventParser abstractEventParser = (AbstractEventParser) eventParser;
             // 首先启动log position管理器
-            LogPositionManager logPositionManager = abstractEventParser.getLogPositionManager();
+            CanalLogPositionManager logPositionManager = abstractEventParser.getLogPositionManager();
             if (!logPositionManager.isStart()) {
                 logPositionManager.start();
             }
@@ -196,7 +196,7 @@ public class AbstractCanalInstance extends AbstractCanalLifeCycle implements Can
         if (eventParser instanceof AbstractEventParser) {
             AbstractEventParser abstractEventParser = (AbstractEventParser) eventParser;
             // 首先启动log position管理器
-            LogPositionManager logPositionManager = abstractEventParser.getLogPositionManager();
+            CanalLogPositionManager logPositionManager = abstractEventParser.getLogPositionManager();
             if (logPositionManager.isStart()) {
                 logPositionManager.stop();
             }

+ 3 - 3
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -338,10 +338,10 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
         return haController;
     }
 
-    protected LogPositionManager initLogPositionManager() {
+    protected CanalLogPositionManager initLogPositionManager() {
         logger.info("init logPositionPersistManager begin...");
         IndexMode indexMode = parameters.getIndexMode();
-        LogPositionManager logPositionManager;
+        CanalLogPositionManager logPositionManager;
         if (indexMode.isMemory()) {
             logPositionManager = new MemoryLogPositionManager();
         } else if (indexMode.isZookeeper()) {
@@ -356,7 +356,7 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
             MemoryLogPositionManager primary = new MemoryLogPositionManager();
             MetaLogPositionManager secondary = new MetaLogPositionManager(metaManager);
 
-            logPositionManager = new FailoverLogPositionManager(primary, secondary);
+            logPositionManager = new FailbackLogPositionManager(primary, secondary);
         } else {
             throw new CanalException("unsupport indexMode for " + indexMode);
         }

+ 5 - 5
instance/spring/src/test/resources/spring/default-instance.xml

@@ -120,14 +120,14 @@
 		<!-- 解析位点记录 -->
 		<property name="logPositionManager">
 			<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
-				<property name="primary">
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
-				</property>
-				<property name="failback">
+				</constructor-arg>
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
-						<property name="metaManager" ref="metaManager" />
+						<constructor-arg ref="metaManager"/>
 					</bean>
-				</property>
+				</constructor-arg>
 			</bean>
 		</property>
 		

+ 8 - 6
instance/spring/src/test/resources/spring/file-instance.xml

@@ -102,18 +102,20 @@
 		<!-- 解析编码 -->
 		<!-- property name="connectionCharsetNumber" value="${canal.instance.connectionCharsetNumber:33}" /-->
 		<property name="connectionCharset" value="${canal.instance.connectionCharset:UTF-8}" />
-	
+
 		<!-- 解析位点记录 -->
 		<property name="logPositionManager">
 			<bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
-				<property name="primary">
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
-				</property>
-				<property name="failback">
+				</constructor-arg>
+				<constructor-arg>
 					<bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
-						<property name="metaManager" ref="metaManager" />
+						<constructor-arg>
+							<ref bean="metaManager"/>
+						</constructor-arg>
 					</bean>
-				</property>
+				</constructor-arg>
 			</bean>
 		</property>
 		

+ 4 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -8,7 +8,7 @@ import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.alibaba.otter.canal.parse.index.LogPositionManager;
+import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang.math.RandomUtils;
@@ -44,7 +44,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
 
     protected final Logger                           logger                     = LoggerFactory.getLogger(this.getClass());
 
-    protected LogPositionManager                logPositionManager         = null;
+    protected CanalLogPositionManager logPositionManager         = null;
     protected CanalEventSink<List<CanalEntry.Entry>> eventSink                  = null;
     protected CanalEventFilter                       eventFilter                = null;
     protected CanalEventFilter                       eventBlackFilter           = null;
@@ -495,7 +495,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
         return this.alarmHandler;
     }
 
-    public void setLogPositionManager(LogPositionManager logPositionManager) {
+    public void setLogPositionManager(CanalLogPositionManager logPositionManager) {
         this.logPositionManager = logPositionManager;
     }
 
@@ -503,7 +503,7 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
         this.transactionSize = transactionSize;
     }
 
-    public LogPositionManager getLogPositionManager() {
+    public CanalLogPositionManager getLogPositionManager() {
         return logPositionManager;
     }
 

+ 2 - 2
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinlogEventParser.java

@@ -2,7 +2,7 @@ package com.alibaba.otter.canal.parse.inbound.mysql;
 
 import java.io.IOException;
 
-import com.alibaba.otter.canal.parse.index.LogPositionManager;
+import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
 import org.apache.commons.lang.StringUtils;
 
 import com.alibaba.otter.canal.parse.CanalEventParser;
@@ -150,7 +150,7 @@ public class LocalBinlogEventParser extends AbstractMysqlEventParser implements
 
     // ========================= setter / getter =========================
 
-    public void setLogPositionManager(LogPositionManager logPositionManager) {
+    public void setLogPositionManager(CanalLogPositionManager logPositionManager) {
         this.logPositionManager = logPositionManager;
     }
 

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/index/AbstractLogPositionManager.java

@@ -6,5 +6,5 @@ import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
  * Created by yinxiu on 17/3/17.
  * Email: marklin.hz@gmail.com
  */
-public abstract class AbstractLogPositionManager extends AbstractCanalLifeCycle implements LogPositionManager {
+public abstract class AbstractLogPositionManager extends AbstractCanalLifeCycle implements CanalLogPositionManager {
 }

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/index/LogPositionManager.java → parse/src/main/java/com/alibaba/otter/canal/parse/index/CanalLogPositionManager.java

@@ -8,7 +8,7 @@ import com.alibaba.otter.canal.protocol.position.LogPosition;
  * Created by yinxiu on 17/3/17.
  * Email: marklin.hz@gmail.com
  */
-public interface LogPositionManager extends CanalLifeCycle {
+public interface CanalLogPositionManager extends CanalLifeCycle {
 
     LogPosition getLatestIndexBy(String destination);
 

+ 5 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/index/FailoverLogPositionManager.java → parse/src/main/java/com/alibaba/otter/canal/parse/index/FailbackLogPositionManager.java

@@ -15,14 +15,14 @@ import org.slf4j.LoggerFactory;
  * 应用场景:比如针对内存buffer,出现HA切换,先尝试从内存buffer区中找到lastest position,如果不存在才尝试找一下meta里消费的信息
  * </pre>
  */
-public class FailoverLogPositionManager extends AbstractLogPositionManager {
+public class FailbackLogPositionManager extends AbstractLogPositionManager {
 
-    private final static Logger logger = LoggerFactory.getLogger(FailoverLogPositionManager.class);
+    private final static Logger logger = LoggerFactory.getLogger(FailbackLogPositionManager.class);
 
-    private final LogPositionManager primary;
-    private final LogPositionManager secondary;
+    private final CanalLogPositionManager primary;
+    private final CanalLogPositionManager secondary;
 
-    public FailoverLogPositionManager(LogPositionManager primary, LogPositionManager secondary) {
+    public FailbackLogPositionManager(CanalLogPositionManager primary, CanalLogPositionManager secondary) {
         if (primary == null) {
             throw new NullPointerException("nul primary LogPositionManager");
         }

+ 4 - 6
parse/src/test/java/com/alibaba/otter/canal/parse/index/FileMixedLogPositionManagerTest.java

@@ -26,17 +26,15 @@ public class FileMixedLogPositionManagerTest extends AbstractLogPositionManagerT
 
     @Test
     public void testAll() {
-        FileMixedLogPositionManager logPositionManager = new FileMixedLogPositionManager();
-        logPositionManager.setDataDir(dataDir);
-        logPositionManager.setPeriod(100);
+        MemoryLogPositionManager memoryLogPositionManager = new MemoryLogPositionManager();
+
+        FileMixedLogPositionManager logPositionManager = new FileMixedLogPositionManager(dataDir, 1000, memoryLogPositionManager);
         logPositionManager.start();
 
         LogPosition position2 = doTest(logPositionManager);
         sleep(1500);
 
-        FileMixedLogPositionManager logPositionManager2 = new FileMixedLogPositionManager();
-        logPositionManager2.setDataDir(dataDir);
-        logPositionManager2.setPeriod(100);
+        FileMixedLogPositionManager logPositionManager2 = new FileMixedLogPositionManager(dataDir, 1000, memoryLogPositionManager);
         logPositionManager2.start();
 
         LogPosition getPosition2 = logPositionManager2.getLatestIndexBy(destination);