Ver Fonte

Merge remote-tracking branch 'upstream/master'

wuwo há 6 anos atrás
pai
commit
0a0a39880c

+ 16 - 87
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/ExtensionLoader.java

@@ -2,13 +2,15 @@ package com.alibaba.otter.canal.client.adapter.support;
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.IOException;
 import java.io.InputStreamReader;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.nio.file.Paths;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.regex.Pattern;
@@ -43,8 +45,6 @@ public class ExtensionLoader<T> {
 
     private static final ConcurrentMap<String, Object>               EXTENSION_KEY_INSTANCE     = new ConcurrentHashMap<>();
 
-    private static final ConcurrentMap<String, List<?>>              EXTENSION_KEY_INSTANCES    = new ConcurrentHashMap<>();
-
     private final Class<?>                                           type;
 
     private final String                                             classLoaderPolicy;
@@ -180,6 +180,9 @@ public class ExtensionLoader<T> {
 
     @SuppressWarnings("unchecked")
     private T createExtension(String name, String key) {
+        // System.out.println("xxxxxxxxxxxxx");
+        // getExtensionClasses().forEach((k, v) -> logger.info("fffff: " + k +
+        // " " + v.getName()));
         Class<?> clazz = getExtensionClasses().get(name);
         if (clazz == null) {
             throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
@@ -210,6 +213,7 @@ public class ExtensionLoader<T> {
                 }
             }
         }
+
         return classes;
     }
 
@@ -255,13 +259,13 @@ public class ExtensionLoader<T> {
 
         Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
 
-        // 1. lib folder,customized extension classLoader (jar_dir/lib
-        String dir = File.separator + this.getJarDirectoryPath() + File.separator + "lib";
+        // 1. plugin folder,customized extension classLoader (jar_dir/plugin
+        String dir = File.separator + this.getJarDirectoryPath() + File.separator + "plugin";
 
         File externalLibDir = new File(dir);
         if (!externalLibDir.exists()) {
             externalLibDir = new File(File.separator + this.getJarDirectoryPath() + File.separator + "canal-adapter"
-                                      + File.separator + "lib");
+                                      + File.separator + "plugin");
         }
         logger.info("extension classpath dir: " + externalLibDir.getAbsolutePath());
         if (externalLibDir.exists()) {
@@ -279,49 +283,7 @@ public class ExtensionLoader<T> {
                     URLClassLoader localClassLoader;
                     if (classLoaderPolicy == null || "".equals(classLoaderPolicy)
                         || DEFAULT_CLASSLOADER_POLICY.equalsIgnoreCase(classLoaderPolicy)) {
-                        localClassLoader = new URLClassLoader(new URL[] { url }, parent) {
-
-                            @Override
-                            public Class<?> loadClass(String name) throws ClassNotFoundException {
-                                Class<?> c = findLoadedClass(name);
-                                if (c != null) {
-                                    return c;
-                                }
-
-                                if (name.startsWith("java.") || name.startsWith("org.slf4j.")
-                                    || name.startsWith("org.apache.logging")
-                                    || name.startsWith("org.apache.commons.logging.")) {
-                                    // || name.startsWith("org.apache.hadoop."))
-                                    // {
-                                    c = super.loadClass(name);
-                                }
-                                if (c != null) return c;
-
-                                try {
-                                    // 先加载jar内的class,可避免jar冲突
-                                    c = findClass(name);
-                                } catch (ClassNotFoundException e) {
-                                    c = null;
-                                }
-                                if (c != null) {
-                                    return c;
-                                }
-
-                                return super.loadClass(name);
-                            }
-
-                            @Override
-                            public Enumeration<URL> getResources(String name) throws IOException {
-                                @SuppressWarnings("unchecked")
-                                Enumeration<URL>[] tmp = (Enumeration<URL>[]) new Enumeration<?>[2];
-
-                                tmp[0] = findResources(name); // local class
-                                                              // path first
-                                // tmp[1] = super.getResources(name);
-
-                                return new CompoundEnumeration<>(tmp);
-                            }
-                        };
+                        localClassLoader = new URLClassExtensionLoader(new URL[] { url });
                     } else {
                         localClassLoader = new URLClassLoader(new URL[] { url }, parent);
                     }
@@ -331,48 +293,15 @@ public class ExtensionLoader<T> {
                 }
             }
         }
+        // 只加载外部spi, 不加载classpath
         // 2. load inner extension class with default classLoader
-        ClassLoader classLoader = findClassLoader();
-        loadFile(extensionClasses, CANAL_DIRECTORY, classLoader);
-        loadFile(extensionClasses, SERVICES_DIRECTORY, classLoader);
+        // ClassLoader classLoader = findClassLoader();
+        // loadFile(extensionClasses, CANAL_DIRECTORY, classLoader);
+        // loadFile(extensionClasses, SERVICES_DIRECTORY, classLoader);
 
         return extensionClasses;
     }
 
-    public static class CompoundEnumeration<E> implements Enumeration<E> {
-
-        private Enumeration<E>[] enums;
-        private int              index = 0;
-
-        public CompoundEnumeration(Enumeration<E>[] enums){
-            this.enums = enums;
-        }
-
-        private boolean next() {
-            while (this.index < this.enums.length) {
-                if (this.enums[this.index] != null && this.enums[this.index].hasMoreElements()) {
-                    return true;
-                }
-
-                ++this.index;
-            }
-
-            return false;
-        }
-
-        public boolean hasMoreElements() {
-            return this.next();
-        }
-
-        public E nextElement() {
-            if (!this.next()) {
-                throw new NoSuchElementException();
-            } else {
-                return this.enums[this.index].nextElement();
-            }
-        }
-    }
-
     private void loadFile(Map<String, Class<?>> extensionClasses, String dir, ClassLoader classLoader) {
         String fileName = dir + type.getName();
         try {

+ 88 - 0
client-adapter/common/src/main/java/com/alibaba/otter/canal/client/adapter/support/URLClassExtensionLoader.java

@@ -0,0 +1,88 @@
+package com.alibaba.otter.canal.client.adapter.support;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Enumeration;
+import java.util.NoSuchElementException;
+
+public class URLClassExtensionLoader extends URLClassLoader {
+    public URLClassExtensionLoader(URL[] urls) {
+        super(urls);
+    }
+
+    @Override
+    public Class<?> loadClass(String name) throws ClassNotFoundException {
+        Class<?> c = findLoadedClass(name);
+        if (c != null) {
+            return c;
+        }
+
+        if (name.startsWith("java.") || name.startsWith("org.slf4j.")
+                || name.startsWith("org.apache.logging")
+                || name.startsWith("org.apache.commons.logging.")) {
+            // || name.startsWith("org.apache.hadoop."))
+            // {
+            c = super.loadClass(name);
+        }
+        if (c != null) return c;
+
+        try {
+            // 先加载jar内的class,可避免jar冲突
+            c = findClass(name);
+        } catch (ClassNotFoundException e) {
+            c = null;
+        }
+        if (c != null) {
+            return c;
+        }
+
+        return super.loadClass(name);
+    }
+
+    @Override
+    public Enumeration<URL> getResources(String name) throws IOException {
+        @SuppressWarnings("unchecked")
+        Enumeration<URL>[] tmp = (Enumeration<URL>[]) new Enumeration<?>[2];
+
+        tmp[0] = findResources(name); // local class
+        // path first
+        // tmp[1] = super.getResources(name);
+
+        return new CompoundEnumeration<>(tmp);
+    }
+
+    private static class CompoundEnumeration<E> implements Enumeration<E> {
+
+        private Enumeration<E>[] enums;
+        private int              index = 0;
+
+        public CompoundEnumeration(Enumeration<E>[] enums){
+            this.enums = enums;
+        }
+
+        private boolean next() {
+            while (this.index < this.enums.length) {
+                if (this.enums[this.index] != null && this.enums[this.index].hasMoreElements()) {
+                    return true;
+                }
+
+                ++this.index;
+            }
+
+            return false;
+        }
+
+        public boolean hasMoreElements() {
+            return this.next();
+        }
+
+        public E nextElement() {
+            if (!this.next()) {
+                throw new NoSuchElementException();
+            } else {
+                return this.enums[this.index].nextElement();
+            }
+        }
+    }
+}

+ 1 - 1
client-adapter/launcher/pom.xml

@@ -137,7 +137,7 @@
                         </goals>
                         <configuration>
                             <includeClassifiers>jar-with-dependencies</includeClassifiers>
-                            <outputDirectory>${project.basedir}/target/canal-adapter/lib</outputDirectory>
+                            <outputDirectory>${project.basedir}/target/canal-adapter/plugin</outputDirectory>
                         </configuration>
                     </execution>
                 </executions>

+ 49 - 49
client-adapter/launcher/src/main/assembly/dev.xml

@@ -6,36 +6,36 @@
 	</formats>
 	<includeBaseDirectory>false</includeBaseDirectory>
 	<fileSets>
-		<fileSet>
-			<directory>.</directory>
-			<outputDirectory>/</outputDirectory>
-			<includes>
-				<include>README*</include>
-			</includes>
-		</fileSet>
-		<fileSet>
-			<directory>./src/main/bin</directory>
-			<outputDirectory>bin</outputDirectory>
-			<includes>
-				<include>**/*</include>
-			</includes>
-			<fileMode>0755</fileMode>
-		</fileSet>
-		<fileSet>
-			<directory>./src/main/resources</directory>
-			<outputDirectory>/conf</outputDirectory>
-			<includes>
-				<include>**/*</include>
+        <fileSet>
+            <directory>.</directory>
+            <outputDirectory>/</outputDirectory>
+            <includes>
+                <include>README*</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>./src/main/bin</directory>
+            <outputDirectory>bin</outputDirectory>
+            <includes>
+                <include>**/*</include>
+            </includes>
+            <fileMode>0755</fileMode>
+        </fileSet>
+        <fileSet>
+            <directory>./src/main/resources</directory>
+            <outputDirectory>/conf</outputDirectory>
+            <includes>
+                <include>**/*</include>
 
-			</includes>
-		</fileSet>
-		<fileSet>
-			<directory>../elasticsearch/src/main/resources/es</directory>
-			<outputDirectory>/conf/es</outputDirectory>
-			<includes>
-				<include>**/*</include>
-			</includes>
-		</fileSet>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>../elasticsearch/src/main/resources/es</directory>
+            <outputDirectory>/conf/es</outputDirectory>
+            <includes>
+                <include>**/*</include>
+            </includes>
+        </fileSet>
         <fileSet>
             <directory>../hbase/src/main/resources/hbase</directory>
             <outputDirectory>/conf/hbase</outputDirectory>
@@ -43,27 +43,27 @@
                 <include>**/*</include>
             </includes>
         </fileSet>
-		<fileSet>
-			<directory>../rdb/src/main/resources/</directory>
-			<outputDirectory>/conf</outputDirectory>
+        <fileSet>
+            <directory>../rdb/src/main/resources/</directory>
+            <outputDirectory>/conf</outputDirectory>
             <excludes>
                 <exclude>META-INF/**</exclude>
             </excludes>
-		</fileSet>
-		<fileSet>
-			<directory>target</directory>
-			<outputDirectory>logs</outputDirectory>
-			<excludes>
-				<exclude>**/*</exclude>
-			</excludes>
-		</fileSet>
-	</fileSets>
-	<dependencySets>
-		<dependencySet>
-			<outputDirectory>lib</outputDirectory>
-			<excludes>
-				<exclude>junit:junit</exclude>
-			</excludes>
-		</dependencySet>
-	</dependencySets>
+        </fileSet>
+        <fileSet>
+            <directory>target</directory>
+            <outputDirectory>logs</outputDirectory>
+            <excludes>
+                <exclude>**/*</exclude>
+            </excludes>
+        </fileSet>
+    </fileSets>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>lib</outputDirectory>
+            <excludes>
+                <exclude>junit:junit</exclude>
+            </excludes>
+        </dependencySet>
+    </dependencySets>
 </assembly>

+ 2 - 6
client-adapter/launcher/src/main/assembly/release.xml

@@ -58,13 +58,9 @@
                 <exclude>**/*</exclude>
             </excludes>
         </fileSet>
-
         <fileSet>
-            <directory>${project.basedir}/target/canal-adapter/lib</directory>
-            <outputDirectory>/lib/</outputDirectory>
-            <includes>
-                <include>*-jar-with-dependencies.jar</include>
-            </includes>
+            <directory>${project.basedir}/target/canal-adapter/plugin</directory>
+            <outputDirectory>/plugin/</outputDirectory>
         </fileSet>
     </fileSets>
     <dependencySets>

+ 1 - 0
client-adapter/launcher/src/main/bin/startup.sh

@@ -53,6 +53,7 @@ ADAPTER_OPTS="-DappName=canal-adapter"
 for i in $base/lib/*;
     do CLASSPATH=$i:"$CLASSPATH";
 done
+
 CLASSPATH="$base/conf:$CLASSPATH";
 
 echo "cd to $bin_abs_path for workaround relative path"

+ 1 - 1
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -652,7 +652,7 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
                 throw new CanalParseException("command : 'show master status' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
             }
             EntryPosition endPosition = new EntryPosition(fields.get(0), Long.valueOf(fields.get(1)));
-            if (isGTIDMode && fields.size() > 4) {
+            if (isGTIDMode() && fields.size() > 4) {
                 endPosition.setGtid(fields.get(4));
             }
             return endPosition;

+ 4 - 3
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -646,15 +646,16 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
                 continue;
             }
 
-            if (fieldMeta != null && existOptionalMetaData) {
+            if (fieldMeta != null && existOptionalMetaData && tableMetaCache.isOnTSDB()) {
                 // check column info
                 boolean check = StringUtils.equalsIgnoreCase(fieldMeta.getColumnName(), info.name);
                 check &= (fieldMeta.isUnsigned() == info.unsigned);
                 check &= (fieldMeta.isNullable() == info.nullable);
 
                 if (!check) {
-                    throw new CanalParseException("MySQL8.0 unmatch column metadata & pls submit issue , db : "
-                                                  + fieldMeta.toString() + " , binlog : " + info.toString()
+                    throw new CanalParseException("MySQL8.0 unmatch column metadata & pls submit issue , table : "
+                                                  + tableMeta.getFullName() + ", db fieldMeta : "
+                                                  + fieldMeta.toString() + " , binlog fieldMeta : " + info.toString()
                                                   + " , on : " + event.getHeader().getLogFileName() + ":"
                                                   + event.getHeader().getLogPos());
                 }

+ 12 - 0
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java

@@ -39,6 +39,7 @@ public class TableMetaCache {
     public static final String              EXTRA          = "EXTRA";
     private MysqlConnection                 connection;
     private boolean                         isOnRDS        = false;
+    private boolean                         isOnTSDB       = false;
 
     private TableMetaTSDB                   tableMetaTSDB;
     // 第一层tableId,第二层schema.table,解决tableId重复,对应多张表
@@ -67,6 +68,8 @@ public class TableMetaCache {
                 }
 
             });
+        } else {
+            isOnTSDB = true;
         }
 
         try {
@@ -244,6 +247,15 @@ public class TableMetaCache {
             .toString();
     }
 
+
+    public boolean isOnTSDB() {
+        return isOnTSDB;
+    }
+
+    public void setOnTSDB(boolean isOnTSDB) {
+        this.isOnTSDB = isOnTSDB;
+    }
+
     public boolean isOnRDS() {
         return isOnRDS;
     }