Ver código fonte

Use OpType.CREATE in GeoIpDownloader (#69951)

When indexing new chunks in GeoIpDwonloader we should never have to overwrite old chunks, if we try to that means that there are 2 simultaneous executions. This change forces one of them to throw error in such case.
Przemko Robakowski 4 anos atrás
pai
commit
d683ee12e0

+ 1 - 0
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java

@@ -170,6 +170,7 @@ class GeoIpDownloader extends AllocatedPersistentTask {
         for (byte[] buf = getChunk(is); buf.length != 0; buf = getChunk(is)) {
             md.update(buf);
             client.prepareIndex(DATABASES_INDEX).setId(name + "_" + chunk)
+                .setCreate(true)
                 .setSource(XContentType.SMILE, "name", name, "chunk", chunk, "data", buf)
                 .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
                 .setWaitForActiveShards(ActiveShardCount.ALL)

+ 2 - 0
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.DocWriteRequest.OpType;
 import org.elasticsearch.action.index.IndexAction;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
@@ -162,6 +163,7 @@ public class GeoIpDownloaderTests extends ESTestCase {
 
         client.addHandler(IndexAction.INSTANCE, (IndexRequest request, ActionListener<IndexResponse> listener) -> {
             int chunk = chunkIndex.getAndIncrement();
+            assertEquals(OpType.CREATE, request.opType());
             assertEquals("test_" + (chunk + 15), request.id());
             assertEquals(XContentType.SMILE, request.getContentType());
             Map<String, Object> source = request.sourceAsMap();