Browse Source

Honor update request timeout

When executing an update request, the request timeout is not transferred
to the index/delete request executed on behalf of the update
request. This leads to update requests not timing out when they should
(e.g., if not all shards are available when the request specifies
wait_for_shards=all with a small timeout). This commit causes the
index/delete requests to honor the update request timeout.

Relates #23825
Jason Tedor 8 years ago
parent
commit
48357e43d3

+ 3 - 0
core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java

@@ -122,6 +122,7 @@ public class UpdateHelper extends AbstractComponent {
                     .setRefreshPolicy(request.getRefreshPolicy())
                     .routing(request.routing())
                     .parent(request.parent())
+                    .timeout(request.timeout())
                     .waitForActiveShards(request.waitForActiveShards());
             if (request.versionType() != VersionType.INTERNAL) {
                 // in all but the internal versioning mode, we want to create the new document using the given version.
@@ -188,12 +189,14 @@ public class UpdateHelper extends AbstractComponent {
                     .source(updatedSourceAsMap, updateSourceContentType)
                     .version(updateVersion).versionType(request.versionType())
                     .waitForActiveShards(request.waitForActiveShards())
+                    .timeout(request.timeout())
                     .setRefreshPolicy(request.getRefreshPolicy());
             return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
         } else if ("delete".equals(operation)) {
             DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
                     .version(updateVersion).versionType(request.versionType())
                     .waitForActiveShards(request.waitForActiveShards())
+                    .timeout(request.timeout())
                     .setRefreshPolicy(request.getRefreshPolicy());
             return new Result(deleteRequest, DocWriteResponse.Result.DELETED, updatedSourceAsMap, updateSourceContentType);
         } else if ("none".equals(operation)) {

+ 123 - 39
core/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java

@@ -21,10 +21,12 @@ package org.elasticsearch.action.update;
 
 import org.elasticsearch.Version;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.replication.ReplicationRequest;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.Streamable;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentParser;
@@ -43,16 +45,20 @@ import org.elasticsearch.script.ScriptType;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.RandomObjects;
 import org.elasticsearch.watcher.ResourceWatcherService;
+import org.junit.Before;
 
 import java.io.IOException;
 import java.nio.file.Path;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Function;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
+import static org.elasticsearch.script.MockScriptEngine.mockInlineScript;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
 import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.equalTo;
@@ -61,6 +67,66 @@ import static org.hamcrest.Matchers.notNullValue;
 
 public class UpdateRequestTests extends ESTestCase {
 
+    private UpdateHelper updateHelper;
+
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        final Path genericConfigFolder = createTempDir();
+        final Settings baseSettings = Settings.builder()
+                .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
+                .put(Environment.PATH_CONF_SETTING.getKey(), genericConfigFolder)
+                .build();
+        final Environment environment = new Environment(baseSettings);
+        final Map<String, Function<Map<String, Object>, Object>> scripts =  new HashMap<>();
+        scripts.put(
+                "ctx._source.update_timestamp = ctx._now",
+                vars -> {
+                    @SuppressWarnings("unchecked")
+                    final Map<String, Object> ctx = (Map<String, Object>) vars.get("ctx");
+                    @SuppressWarnings("unchecked")
+                    final Map<String, Object> source = (Map<String, Object>) ctx.get("_source");
+                    source.put("update_timestamp", ctx.get("_now"));
+                    return null;
+                });
+        scripts.put(
+                "ctx._timestamp = ctx._now",
+                vars -> {
+                    @SuppressWarnings("unchecked")
+                    final Map<String, Object> ctx = (Map<String, Object>) vars.get("ctx");
+                    ctx.put("_timestamp", ctx.get("_now"));
+                    return null;
+                });
+        scripts.put(
+                "ctx.op = delete",
+                vars -> {
+                    @SuppressWarnings("unchecked")
+                    final Map<String, Object> ctx = (Map<String, Object>) vars.get("ctx");
+                    ctx.put("op", "delete");
+                    return null;
+                });
+        scripts.put("return", vars -> null);
+        final ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(emptyList());
+        final MockScriptEngine engine = new MockScriptEngine("mock", scripts);
+        final ScriptEngineRegistry scriptEngineRegistry =
+                new ScriptEngineRegistry(singletonList(engine));
+
+        final ScriptSettings scriptSettings =
+                new ScriptSettings(scriptEngineRegistry, scriptContextRegistry);
+        final ResourceWatcherService watcherService =
+                new ResourceWatcherService(baseSettings, null);
+        ScriptService scriptService = new ScriptService(
+                baseSettings,
+                environment,
+                watcherService,
+                scriptEngineRegistry,
+                scriptContextRegistry,
+                scriptSettings);
+        final Settings settings = settings(Version.CURRENT).build();
+
+        updateHelper = new UpdateHelper(settings, scriptService);
+    }
+
     public void testFromXContent() throws Exception {
         UpdateRequest request = new UpdateRequest("test", "type", "1");
         // simple script
@@ -74,7 +140,7 @@ public class UpdateRequestTests extends ESTestCase {
         assertThat(script.getType(), equalTo(ScriptType.INLINE));
         assertThat(script.getLang(), equalTo(Script.DEFAULT_SCRIPT_LANG));
         Map<String, Object> params = script.getParams();
-        assertThat(params, equalTo(Collections.emptyMap()));
+        assertThat(params, equalTo(emptyMap()));
 
         // simple verbose script
         request.fromXContent(createParser(XContentFactory.jsonBuilder().startObject()
@@ -86,7 +152,7 @@ public class UpdateRequestTests extends ESTestCase {
         assertThat(script.getType(), equalTo(ScriptType.INLINE));
         assertThat(script.getLang(), equalTo(Script.DEFAULT_SCRIPT_LANG));
         params = script.getParams();
-        assertThat(params, equalTo(Collections.emptyMap()));
+        assertThat(params, equalTo(emptyMap()));
 
         // script with params
         request = new UpdateRequest("test", "type", "1");
@@ -258,39 +324,6 @@ public class UpdateRequestTests extends ESTestCase {
     }
 
     public void testNowInScript() throws IOException {
-        Path genericConfigFolder = createTempDir();
-        Settings baseSettings = Settings.builder()
-            .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
-            .put(Environment.PATH_CONF_SETTING.getKey(), genericConfigFolder)
-            .build();
-        Environment environment = new Environment(baseSettings);
-        Map<String, Function<Map<String, Object>, Object>> scripts =  new HashMap<>();
-        scripts.put("ctx._source.update_timestamp = ctx._now",
-            (vars) -> {
-                Map<String, Object> vars2 = vars;
-                @SuppressWarnings("unchecked")
-                Map<String, Object> ctx = (Map<String, Object>) vars2.get("ctx");
-                @SuppressWarnings("unchecked")
-                Map<String, Object> source = (Map<String, Object>) ctx.get("_source");
-                source.put("update_timestamp", ctx.get("_now"));
-                return null;});
-        scripts.put("ctx._timestamp = ctx._now",
-            (vars) -> {
-                @SuppressWarnings("unchecked")
-                Map<String, Object> ctx = (Map<String, Object>) vars.get("ctx");
-                ctx.put("_timestamp", ctx.get("_now"));
-                return null;});
-        ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(Collections.emptyList());
-        ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(Collections.singletonList(new MockScriptEngine("mock",
-            scripts)));
-
-        ScriptSettings scriptSettings = new ScriptSettings(scriptEngineRegistry, scriptContextRegistry);
-        ScriptService scriptService = new ScriptService(baseSettings, environment,
-            new ResourceWatcherService(baseSettings, null), scriptEngineRegistry, scriptContextRegistry, scriptSettings);
-        Settings settings = settings(Version.CURRENT).build();
-
-        UpdateHelper updateHelper = new UpdateHelper(settings, scriptService);
-
         // We just upsert one document with now() using a script
         IndexRequest indexRequest = new IndexRequest("test", "type1", "2")
             .source(jsonBuilder().startObject().field("foo", "bar").endObject());
@@ -298,7 +331,7 @@ public class UpdateRequestTests extends ESTestCase {
         {
             UpdateRequest updateRequest = new UpdateRequest("test", "type1", "2")
                 .upsert(indexRequest)
-                .script(new Script(ScriptType.INLINE, "mock", "ctx._source.update_timestamp = ctx._now", Collections.emptyMap()))
+                .script(mockInlineScript("ctx._source.update_timestamp = ctx._now"))
                 .scriptedUpsert(true);
             long nowInMillis = randomNonNegativeLong();
             // We simulate that the document is not existing yet
@@ -307,12 +340,12 @@ public class UpdateRequestTests extends ESTestCase {
             Streamable action = result.action();
             assertThat(action, instanceOf(IndexRequest.class));
             IndexRequest indexAction = (IndexRequest) action;
-            assertEquals(indexAction.sourceAsMap().get("update_timestamp"), nowInMillis);
+            assertEquals(nowInMillis, indexAction.sourceAsMap().get("update_timestamp"));
         }
         {
             UpdateRequest updateRequest = new UpdateRequest("test", "type1", "2")
                 .upsert(indexRequest)
-                .script(new Script(ScriptType.INLINE, "mock", "ctx._timestamp = ctx._now", Collections.emptyMap()))
+                .script(mockInlineScript("ctx._timestamp = ctx._now"))
                 .scriptedUpsert(true);
             // We simulate that the document is not existing yet
             GetResult getResult = new GetResult("test", "type1", "2", 0, true, new BytesArray("{}"), null);
@@ -322,6 +355,57 @@ public class UpdateRequestTests extends ESTestCase {
         }
     }
 
+    public void testIndexTimeout() {
+        final GetResult getResult =
+                new GetResult("test", "type", "1", 0, true, new BytesArray("{\"f\":\"v\"}"), null);
+        final UpdateRequest updateRequest =
+                new UpdateRequest("test", "type", "1")
+                        .script(mockInlineScript("return"))
+                        .timeout(randomTimeValue());
+        runTimeoutTest(getResult, updateRequest);
+    }
+
+    public void testDeleteTimeout() {
+        final GetResult getResult =
+                new GetResult("test", "type", "1", 0, true, new BytesArray("{\"f\":\"v\"}"), null);
+        final UpdateRequest updateRequest =
+                new UpdateRequest("test", "type", "1")
+                        .script(mockInlineScript("ctx.op = delete"))
+                        .timeout(randomTimeValue());
+        runTimeoutTest(getResult, updateRequest);
+    }
+
+    public void testUpsertTimeout() throws IOException {
+        final boolean exists = randomBoolean();
+        final BytesReference source = exists ? new BytesArray("{\"f\":\"v\"}") : null;
+        final GetResult getResult = new GetResult("test", "type", "1", 0, exists, source, null);
+        final XContentBuilder sourceBuilder = jsonBuilder();
+        sourceBuilder.startObject();
+        {
+            sourceBuilder.field("f", "v");
+        }
+        sourceBuilder.endObject();
+        final IndexRequest upsert = new IndexRequest("test", "type", "1").source(sourceBuilder);
+        final UpdateRequest updateRequest =
+                new UpdateRequest("test", "type", "1")
+                .upsert(upsert)
+                .script(mockInlineScript("return"))
+                .timeout(randomTimeValue());
+        runTimeoutTest(getResult, updateRequest);
+    }
+
+    private void runTimeoutTest(final GetResult getResult, final UpdateRequest updateRequest) {
+        final UpdateHelper.Result result = updateHelper.prepare(
+                new ShardId("test", "", 0),
+                updateRequest,
+                getResult,
+                ESTestCase::randomNonNegativeLong);
+        final Streamable action = result.action();
+        assertThat(action, instanceOf(ReplicationRequest.class));
+        final ReplicationRequest request = (ReplicationRequest) action;
+        assertThat(request.timeout(), equalTo(updateRequest.timeout()));
+    }
+
     public void testToAndFromXContent() throws IOException {
         UpdateRequest updateRequest = new UpdateRequest();
         updateRequest.detectNoop(randomBoolean());

+ 7 - 0
test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java

@@ -32,6 +32,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Function;
 
+import static java.util.Collections.emptyMap;
+
 /**
  * A mocked script engine that can be used for testing purpose.
  *
@@ -215,4 +217,9 @@ public class MockScriptEngine implements ScriptEngineService {
             return true;
         }
     }
+
+    public static Script mockInlineScript(final String script) {
+        return new Script(ScriptType.INLINE, "mock", script, emptyMap());
+    }
+
 }