浏览代码

支持用户自定义的CanalAlarmHandler (#1736)

* 支持用户自定义的CanalAlarmHandler

* RowsLogEvent增加对TableMapLogEvent判空检查,防止NPE异常
dataccs 6 年之前
父节点
当前提交
3165cb9640

+ 5 - 0
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogEvent.java

@@ -2,6 +2,7 @@ package com.taobao.tddl.dbsync.binlog.event;
 
 import java.util.BitSet;
 
+import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException;
 import com.taobao.tddl.dbsync.binlog.LogBuffer;
 import com.taobao.tddl.dbsync.binlog.LogContext;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
@@ -179,6 +180,10 @@ public abstract class RowsLogEvent extends LogEvent {
     public final void fillTable(LogContext context) {
         table = context.getTable(tableId);
 
+        if (table == null) {
+            throw new TableIdNotFoundException("not found tableId:" + tableId);
+        }
+
         // end of statement check:
         if ((flags & RowsLogEvent.STMT_END_F) != 0) {
             // Now is safe to clear ignored map (clear_tables will also

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/exception/TableIdNotFoundException.java → dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/exception/TableIdNotFoundException.java

@@ -1,4 +1,4 @@
-package com.alibaba.otter.canal.parse.exception;
+package com.taobao.tddl.dbsync.binlog.exception;
 
 import com.alibaba.otter.canal.common.CanalException;
 

+ 32 - 1
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/CanalInstanceWithManager.java

@@ -1,6 +1,9 @@
 package com.alibaba.otter.canal.instance.manager;
 
+import java.io.File;
 import java.net.InetSocketAddress;
+import java.net.URL;
+import java.net.URLClassLoader;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -8,6 +11,7 @@ import java.util.List;
 
 import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.CollectionUtils;
@@ -106,7 +110,34 @@ public class CanalInstanceWithManager extends AbstractCanalInstance {
 
     protected void initAlarmHandler() {
         logger.info("init alarmHandler begin...");
-        alarmHandler = new LogAlarmHandler();
+        String alarmHandlerClass = parameters.getAlarmHandlerClass();
+        String alarmHandlerPluginDir = parameters.getAlarmHandlerPluginDir();
+        if (alarmHandlerClass == null || alarmHandlerPluginDir == null) {
+            alarmHandler = new LogAlarmHandler();
+        } else {
+            try {
+                File externalLibDir = new File(alarmHandlerPluginDir);
+                File[] jarFiles = externalLibDir.listFiles((dir1, name) -> name.endsWith(".jar"));
+                if (jarFiles == null || jarFiles.length == 0) {
+                    throw new IllegalStateException(String.format("alarmHandlerPluginDir [%s] can't find any name endswith \".jar\" file.",
+                        alarmHandlerPluginDir));
+                }
+                URL[] urls = new URL[jarFiles.length];
+                for (int i = 0; i < jarFiles.length; i++) {
+                    urls[i] = jarFiles[i].toURI().toURL();
+                }
+                ClassLoader currentClassLoader = new URLClassLoader(urls, CanalInstanceWithManager.class.getClassLoader());
+                Class<CanalAlarmHandler> _alarmClass =
+                    (Class<CanalAlarmHandler>)currentClassLoader.loadClass(alarmHandlerClass);
+                alarmHandler = _alarmClass.newInstance();
+                logger.info("init [{}] alarm handler success.", alarmHandlerClass);
+            } catch (Throwable e) {
+                String errorMsg = String.format("init alarmHandlerPluginDir [%s] alarm handler [%s] error: %s",
+                    alarmHandlerPluginDir, alarmHandlerClass, ExceptionUtils.getFullStackTrace(e));
+                logger.error(errorMsg);
+                throw new CanalException(errorMsg, e);
+            }
+        }
         logger.info("init alarmHandler end! \n\t load CanalAlarmHandler:{} ", alarmHandler.getClass().getName());
     }
 

+ 21 - 0
instance/manager/src/main/java/com/alibaba/otter/canal/instance/manager/model/CanalParameter.java

@@ -120,6 +120,11 @@ public class CanalParameter implements Serializable {
     private Long                     standbyTimestamp                   = null;
     private Boolean                  parallel                           = Boolean.FALSE;
 
+    //自定义alarmHandler类全路径
+    private String                   alarmHandlerClass                  = null;
+    //自定义alarmHandler插件文件夹路径
+    private String                   alarmHandlerPluginDir              = null;
+
     public static enum RunMode {
 
         /** 嵌入式 */
@@ -991,6 +996,22 @@ public class CanalParameter implements Serializable {
         this.parallel = parallel;
     }
 
+    public String getAlarmHandlerClass() {
+        return alarmHandlerClass;
+    }
+
+    public void setAlarmHandlerClass(String alarmHandlerClass) {
+        this.alarmHandlerClass = alarmHandlerClass;
+    }
+
+    public String getAlarmHandlerPluginDir() {
+        return alarmHandlerPluginDir;
+    }
+
+    public void setAlarmHandlerPluginDir(String alarmHandlerPluginDir) {
+        this.alarmHandlerPluginDir = alarmHandlerPluginDir;
+    }
+
     public String toString() {
         return ToStringBuilder.reflectionToString(this, CanalToStringStyle.DEFAULT_STYLE);
     }

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -23,7 +23,7 @@ import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.PositionNotFoundException;
-import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException;
+import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException;
 import com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer.TransactionFlushCallback;
 import com.alibaba.otter.canal.parse.inbound.mysql.MysqlMultiStageCoprocessor;
 import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -18,7 +18,7 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
-import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException;
+import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException;
 import com.alibaba.otter.canal.parse.inbound.BinlogParser;
 import com.alibaba.otter.canal.parse.inbound.TableMeta;
 import com.alibaba.otter.canal.parse.inbound.TableMeta.FieldMeta;