Browse Source

Merge pull request #1340 from rewerma/master

fixed #1338
agapple 6 years ago
parent
commit
ade2a54369

+ 10 - 12
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/ESAdapter.java

@@ -75,8 +75,8 @@ public class ESAdapter implements OuterAdapter {
             // 过滤不匹配的key的配置
             esSyncConfigTmp.forEach((key, config) -> {
                 if ((config.getOuterAdapterKey() == null && configuration.getKey() == null)
-                    || (config.getOuterAdapterKey() != null && config.getOuterAdapterKey()
-                        .equalsIgnoreCase(configuration.getKey()))) {
+                    || (config.getOuterAdapterKey() != null
+                        && config.getOuterAdapterKey().equalsIgnoreCase(configuration.getKey()))) {
                     esSyncConfig.put(key, config);
                 }
             });
@@ -98,15 +98,11 @@ public class ESAdapter implements OuterAdapter {
                 }
                 String schema = matcher.group(2);
 
-                schemaItem.getAliasTableItems()
-                    .values()
-                    .forEach(tableItem -> {
-                        Map<String, ESSyncConfig> esSyncConfigMap = dbTableEsSyncConfig.computeIfAbsent(schema
-                                                                                                        + "-"
-                                                                                                        + tableItem.getTableName(),
-                            k -> new HashMap<>());
-                        esSyncConfigMap.put(configName, config);
-                    });
+                schemaItem.getAliasTableItems().values().forEach(tableItem -> {
+                    Map<String, ESSyncConfig> esSyncConfigMap = dbTableEsSyncConfig
+                        .computeIfAbsent(schema + "-" + tableItem.getTableName(), k -> new HashMap<>());
+                    esSyncConfigMap.put(configName, config);
+                });
             }
 
             Map<String, String> properties = configuration.getProperties();
@@ -140,7 +136,9 @@ public class ESAdapter implements OuterAdapter {
         String database = dml.getDatabase();
         String table = dml.getTable();
         Map<String, ESSyncConfig> configMap = dbTableEsSyncConfig.get(database + "-" + table);
-        esSyncService.sync(configMap.values(), dml);
+        if (configMap != null) {
+            esSyncService.sync(configMap.values(), dml);
+        }
     }
 
     @Override

+ 3 - 1
client-adapter/hbase/src/main/java/com/alibaba/otter/canal/client/adapter/hbase/HbaseAdapter.java

@@ -115,7 +115,9 @@ public class HbaseAdapter implements OuterAdapter {
         String database = dml.getDatabase();
         String table = dml.getTable();
         Map<String, MappingConfig> configMap = mappingConfigCache.get(destination + "." + database + "." + table);
-        configMap.values().forEach(config -> hbaseSyncService.sync(config, dml));
+        if (configMap != null) {
+            configMap.values().forEach(config -> hbaseSyncService.sync(config, dml));
+        }
     }
 
     @Override

+ 10 - 10
client/src/main/java/com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.java

@@ -30,16 +30,16 @@ import com.google.common.collect.Lists;
  */
 public class KafkaCanalConnector implements CanalMQConnector {
 
-    private KafkaConsumer<String, Message> kafkaConsumer;
-    private KafkaConsumer<String, String>  kafkaConsumer2;                   // 用于扁平message的数据消费
-    private String                         topic;
-    private Integer                        partition;
-    private Properties                     properties;
-    private volatile boolean               connected       = false;
-    private volatile boolean               running         = false;
-    private boolean                        flatMessage;
-
-    private Map<Integer, Long>             currentOffsets  = new HashMap<>();
+    protected KafkaConsumer<String, Message> kafkaConsumer;
+    protected KafkaConsumer<String, String>  kafkaConsumer2;                  // 用于扁平message的数据消费
+    protected String                         topic;
+    protected Integer                        partition;
+    protected Properties                     properties;
+    protected volatile boolean               connected      = false;
+    protected volatile boolean               running        = false;
+    protected boolean                        flatMessage;
+
+    private Map<Integer, Long>               currentOffsets = new HashMap<>();
 
     public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId, Integer batchSize,
                                boolean flatMessage){