rewerma 5 năm trước cách đây
mục cha
commit
460479390a

+ 5 - 4
client-adapter/escore/src/main/java/com/alibaba/otter/canal/client/adapter/es/core/monitor/ESConfigMonitor.java

@@ -26,11 +26,11 @@ import com.alibaba.otter.canal.client.adapter.support.Util;
 
 public class ESConfigMonitor {
 
-    private static final Logger   logger      = LoggerFactory.getLogger(ESConfigMonitor.class);
+    private static final Logger   logger = LoggerFactory.getLogger(ESConfigMonitor.class);
 
-    private static final String   adapterName = "es";
+    private String                adapterName;
 
-    private ESAdapter esAdapter;
+    private ESAdapter             esAdapter;
 
     private Properties            envProperties;
 
@@ -39,6 +39,7 @@ public class ESConfigMonitor {
     public void init(ESAdapter esAdapter, Properties envProperties) {
         this.esAdapter = esAdapter;
         this.envProperties = envProperties;
+        this.adapterName = envProperties.getProperty("es.version");
         File confDir = Util.getConfDirPath(adapterName);
         try {
             FileAlterationObserver observer = new FileAlterationObserver(confDir,
@@ -130,7 +131,7 @@ public class ESConfigMonitor {
         private void addConfigToCache(File file, ESSyncConfig config) {
             esAdapter.getEsSyncConfig().put(file.getName(), config);
 
-            esAdapter.addSyncConfigToCache(file.getName(),config);
+            esAdapter.addSyncConfigToCache(file.getName(), config);
         }
 
         private void deleteConfigFromCache(File file) {

+ 10 - 9
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.java

@@ -7,6 +7,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -63,10 +64,10 @@ public class CanalAdapterLoader {
         if ("tcp".equalsIgnoreCase(canalClientConfig.getMode())) {
             // 初始化canal-client的适配器
             for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {
-                List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
+                List<List<OuterAdapter>> canalOuterAdapterGroups = new CopyOnWriteArrayList<>();
 
                 for (CanalClientConfig.Group connectorGroup : canalAdapter.getGroups()) {
-                    List<OuterAdapter> canalOutConnectors = new ArrayList<>();
+                    List<OuterAdapter> canalOutConnectors = new CopyOnWriteArrayList<>();
                     for (OuterAdapterConfig c : connectorGroup.getOuterAdapters()) {
                         loadAdapter(c, canalOutConnectors);
                     }
@@ -94,8 +95,8 @@ public class CanalAdapterLoader {
             // 初始化canal-client-kafka的适配器
             for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {
                 for (CanalClientConfig.Group group : canalAdapter.getGroups()) {
-                    List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
-                    List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
+                    List<List<OuterAdapter>> canalOuterAdapterGroups = new CopyOnWriteArrayList<>();
+                    List<OuterAdapter> canalOuterAdapters = new CopyOnWriteArrayList<>();
                     for (OuterAdapterConfig config : group.getOuterAdapters()) {
                         loadAdapter(config, canalOuterAdapters);
                     }
@@ -117,8 +118,8 @@ public class CanalAdapterLoader {
             // 初始化canal-client-rocketMQ的适配器
             for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {
                 for (CanalClientConfig.Group group : canalAdapter.getGroups()) {
-                    List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
-                    List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
+                    List<List<OuterAdapter>> canalOuterAdapterGroups = new CopyOnWriteArrayList<>();
+                    List<OuterAdapter> canalOuterAdapters = new CopyOnWriteArrayList<>();
                     for (OuterAdapterConfig config : group.getOuterAdapters()) {
                         loadAdapter(config, canalOuterAdapters);
                     }
@@ -146,8 +147,8 @@ public class CanalAdapterLoader {
             // 初始化canal-client-rabbitMQ的适配器
             for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) {
                 for (CanalClientConfig.Group group : canalAdapter.getGroups()) {
-                    List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>();
-                    List<OuterAdapter> canalOuterAdapters = new ArrayList<>();
+                    List<List<OuterAdapter>> canalOuterAdapterGroups = new CopyOnWriteArrayList<>();
+                    List<OuterAdapter> canalOuterAdapters = new CopyOnWriteArrayList<>();
                     for (OuterAdapterConfig config : group.getOuterAdapters()) {
                         loadAdapter(config, canalOuterAdapters);
                     }
@@ -184,7 +185,7 @@ public class CanalAdapterLoader {
                     if (propertySource instanceof EnumerablePropertySource) {
                         String[] names = ((EnumerablePropertySource<?>) propertySource).getPropertyNames();
                         for (String name : names) {
-                            Object val = propertySource.getProperty(name);
+                            Object val = env.getProperty(name);
                             if (val != null) {
                                 evnProperties.put(name, val);
                             }