Selaa lähdekoodia

update中增加upsert判断

mcy 6 vuotta sitten
vanhempi
commit
c14ed724ff

+ 29 - 25
client-adapter/elasticsearch/src/main/java/com/alibaba/otter/canal/client/adapter/es/support/ESTemplate.java

@@ -100,34 +100,12 @@ public class ESTemplate {
      * @return
      */
     public void update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
-        append4Update(getBulk(), mapping, pkVal, esFieldData);
+        append4Update(mapping, pkVal, esFieldData);
         commitBatch();
     }
 
-    private void append4Update(BulkRequestBuilder bulkRequestBuilder, ESMapping mapping, Object pkVal,
-                               Map<String, Object> esFieldData) {
-        if (mapping.get_id() != null) {
-            bulkRequestBuilder
-                .add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
-                    .setDoc(esFieldData));
-        } else {
-            // TODO SearchResponse response =
-            // transportClient.prepareSearch(mapping.get_index())
-            // .setTypes(mapping.get_type())
-            // .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
-            // .setSize(MAX_BATCH_SIZE)
-            // .get();
-            // for (SearchHit hit : response.getHits()) {
-            // bulkRequestBuilder
-            // .add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(),
-            // hit.getId())
-            // .setDoc(esFieldData));
-            // }
-        }
-    }
-
     /**
-     * TODO XXX update by query
+     * update by query
      *
      * @param config
      * @param paramsTmp
@@ -154,7 +132,7 @@ public class ESTemplate {
             try {
                 while (rs.next()) {
                     Object idVal = getIdValFromRS(mapping, rs);
-                    append4Update(getBulk(), mapping, idVal, esFieldData);
+                    append4Update(mapping, idVal, esFieldData);
                     commitBatch();
                     count++;
                 }
@@ -227,6 +205,32 @@ public class ESTemplate {
         }
     }
 
+    private void append4Update(ESMapping mapping, Object pkVal, Map<String, Object> esFieldData) {
+        if (mapping.get_id() != null) {
+            if (mapping.isUpsert()) {
+                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
+                        .setDoc(esFieldData)
+                        .setDocAsUpsert(true));
+            } else {
+                getBulk().add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(), pkVal.toString())
+                        .setDoc(esFieldData));
+            }
+        } else {
+            // TODO SearchResponse response =
+            // transportClient.prepareSearch(mapping.get_index())
+            // .setTypes(mapping.get_type())
+            // .setQuery(QueryBuilders.termQuery(mapping.getPk(), pkVal))
+            // .setSize(MAX_BATCH_SIZE)
+            // .get();
+            // for (SearchHit hit : response.getHits()) {
+            // bulkRequestBuilder
+            // .add(transportClient.prepareUpdate(mapping.get_index(), mapping.get_type(),
+            // hit.getId())
+            // .setDoc(esFieldData));
+            // }
+        }
+    }
+
     public Object getValFromRS(ESMapping mapping, ResultSet resultSet, String fieldName,
                                String columnName) throws SQLException {
         String esType = getEsType(mapping, fieldName);