1
0
Эх сурвалжийг харах

Do not mutate request on scripted upsert (#49578)

Fixes a bug where a scripted upsert that causes a dynamic mapping update is retried (because
mapping update is still in-flight), and the request is mutated multiple times.

Closes #48670
Yannick Welsch 5 жил өмнө
parent
commit
3429f79cce

+ 4 - 5
server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java

@@ -99,8 +99,7 @@ public class UpdateHelper {
      * Execute a scripted upsert, where there is an existing upsert document and a script to be executed. The script is executed and a new
      * Tuple of operation and updated {@code _source} is returned.
      */
-    Tuple<UpdateOpType, Map<String, Object>> executeScriptedUpsert(IndexRequest upsert, Script script, LongSupplier nowInMillis) {
-        Map<String, Object> upsertDoc = upsert.sourceAsMap();
+    Tuple<UpdateOpType, Map<String, Object>> executeScriptedUpsert(Map<String, Object> upsertDoc, Script script, LongSupplier nowInMillis) {
         Map<String, Object> ctx = new HashMap<>(3);
         // Tell the script that this is a create and not an update
         ctx.put(ContextFields.OP, UpdateOpType.CREATE.toString());
@@ -133,11 +132,11 @@ public class UpdateHelper {
             if (request.scriptedUpsert() && request.script() != null) {
                 // Run the script to perform the create logic
                 IndexRequest upsert = request.upsertRequest();
-                Tuple<UpdateOpType, Map<String, Object>> upsertResult = executeScriptedUpsert(upsert, request.script, nowInMillis);
+                Tuple<UpdateOpType, Map<String, Object>> upsertResult = executeScriptedUpsert(upsert.sourceAsMap(), request.script,
+                    nowInMillis);
                 switch (upsertResult.v1()) {
                     case CREATE:
-                        // Update the index request with the new "_source"
-                        indexRequest.source(upsertResult.v2());
+                        indexRequest = Requests.indexRequest(request.index()).source(upsertResult.v2());
                         break;
                     case NONE:
                         UpdateResponse update = new UpdateResponse(shardId, getResult.getId(),

+ 37 - 0
server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java

@@ -199,6 +199,43 @@ public class BulkWithUpdatesIT extends ESIntegTestCase {
         assertThat(((Number) getResponse.getSource().get("field")).longValue(), equalTo(4L));
     }
 
+    public void testBulkUpdateWithScriptedUpsertAndDynamicMappingUpdate() throws Exception {
+        assertAcked(prepareCreate("test").addAlias(new Alias("alias")));
+        ensureGreen();
+
+        final Script script = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "ctx._source.field += 1", Collections.emptyMap());
+
+        BulkResponse bulkResponse = client().prepareBulk()
+            .add(client().prepareUpdate().setIndex(indexOrAlias()).setId("1")
+                .setScript(script).setScriptedUpsert(true).setUpsert("field", 1))
+            .add(client().prepareUpdate().setIndex(indexOrAlias()).setId("2")
+                .setScript(script).setScriptedUpsert(true).setUpsert("field", 1))
+            .get();
+
+        logger.info(bulkResponse.buildFailureMessage());
+
+        assertThat(bulkResponse.hasFailures(), equalTo(false));
+        assertThat(bulkResponse.getItems().length, equalTo(2));
+        for (BulkItemResponse bulkItemResponse : bulkResponse) {
+            assertThat(bulkItemResponse.getIndex(), equalTo("test"));
+        }
+        assertThat(bulkResponse.getItems()[0].getResponse().getId(), equalTo("1"));
+        assertThat(bulkResponse.getItems()[0].getResponse().getVersion(), equalTo(1L));
+        assertThat(bulkResponse.getItems()[1].getResponse().getId(), equalTo("2"));
+        assertThat(bulkResponse.getItems()[1].getResponse().getVersion(), equalTo(1L));
+
+        GetResponse getResponse = client().prepareGet().setIndex("test").setId("1").execute()
+            .actionGet();
+        assertThat(getResponse.isExists(), equalTo(true));
+        assertThat(getResponse.getVersion(), equalTo(1L));
+        assertThat(((Number) getResponse.getSource().get("field")).longValue(), equalTo(2L));
+
+        getResponse = client().prepareGet().setIndex("test").setId("2").execute().actionGet();
+        assertThat(getResponse.isExists(), equalTo(true));
+        assertThat(getResponse.getVersion(), equalTo(1L));
+        assertThat(((Number) getResponse.getSource().get("field")).longValue(), equalTo(2L));
+    }
+
     public void testBulkWithCAS() throws Exception {
         createIndex("test", Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).build());
         ensureGreen();