Browse Source

Add configurable op_type for index watcher action (#64590)

Dan Hermann 5 years ago
parent
commit
ad71547001

+ 7 - 3
x-pack/docs/en/watcher/actions/index.asciidoc

@@ -26,8 +26,8 @@ The following snippet shows a simple `index` action definition:
 <1> The id of the action
 <2> An optional <<condition,condition>> to restrict action execution
 <3> An optional <<transform,transform>> to transform the payload and prepare the data that should be indexed
-<4> The elasticsearch index to store the data to
-<5> An optional `_id` for the document, if it should always be the same document.
+<4> The index, alias, or data stream to which the data will be written
+<5> An optional `_id` for the document
 
 
 [[index-action-attributes]]
@@ -37,11 +37,15 @@ The following snippet shows a simple `index` action definition:
 |======
 |Name                     |Required    | Default    | Description
 
-| `index`                 | yes        | -          | The Elasticsearch index to index into.
+| `index`                 | yes        | -          | The index, alias, or data stream to index into.
 
 
 | `doc_id`                | no         | -          | The optional `_id` of the document.
 
+| `op_type`               | no         | `index`    | The <<docs-index-api-op_type,op_type>> for the index operation.
+                                                      Must be one of either `index` or `create`. Must be `create` if
+                                                      `index` is a data stream.
+
 | `execution_time_field`  | no         | -          | The field that will store/index the watch execution
                                                       time.
 

+ 6 - 0
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/ExecutableIndexAction.java

@@ -83,6 +83,9 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
 
         indexRequest.index(getField(actionId, ctx.id().watchId(), "index", data, INDEX_FIELD, action.index));
         indexRequest.id(getField(actionId, ctx.id().watchId(), "id",data, ID_FIELD, action.docId));
+        if (action.opType != null) {
+            indexRequest.opType(action.opType);
+        }
 
         data = addTimestampToDocument(data, ctx.executionTime());
         BytesReference bytesReference;
@@ -128,6 +131,9 @@ public class ExecutableIndexAction extends ExecutableAction<IndexAction> {
             IndexRequest indexRequest = new IndexRequest();
             indexRequest.index(getField(actionId, ctx.id().watchId(), "index", doc, INDEX_FIELD, action.index));
             indexRequest.id(getField(actionId, ctx.id().watchId(), "id",doc, ID_FIELD, action.docId));
+            if (action.opType != null) {
+                indexRequest.opType(action.opType);
+            }
 
             doc = addTimestampToDocument(doc, ctx.executionTime());
             try (XContentBuilder builder = jsonBuilder()) {

+ 39 - 7
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/actions/index/IndexAction.java

@@ -6,6 +6,7 @@
 package org.elasticsearch.xpack.watcher.actions.index;
 
 import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.ParseField;
@@ -19,6 +20,7 @@ import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
 
 import java.io.IOException;
 import java.time.ZoneId;
+import java.util.List;
 import java.util.Objects;
 
 import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
@@ -29,16 +31,18 @@ public class IndexAction implements Action {
 
     @Nullable final String index;
     @Nullable final String docId;
+    @Nullable final DocWriteRequest.OpType opType;
     @Nullable final String executionTimeField;
     @Nullable final TimeValue timeout;
     @Nullable final ZoneId dynamicNameTimeZone;
     @Nullable final RefreshPolicy refreshPolicy;
 
-    public IndexAction(@Nullable String index, @Nullable String docId,
-                       @Nullable String executionTimeField,
-                       @Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone, @Nullable RefreshPolicy refreshPolicy) {
+    public IndexAction(@Nullable String index, @Nullable String docId, @Nullable DocWriteRequest.OpType opType,
+                       @Nullable String executionTimeField, @Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone,
+                       @Nullable RefreshPolicy refreshPolicy) {
         this.index = index;
         this.docId = docId;
+        this.opType = opType;
         this.executionTimeField = executionTimeField;
         this.timeout = timeout;
         this.dynamicNameTimeZone = dynamicNameTimeZone;
@@ -58,6 +62,10 @@ public class IndexAction implements Action {
         return docId;
     }
 
+    public DocWriteRequest.OpType getOpType() {
+        return opType;
+    }
+
     public String getExecutionTimeField() {
         return executionTimeField;
     }
@@ -77,7 +85,9 @@ public class IndexAction implements Action {
 
         IndexAction that = (IndexAction) o;
 
-        return Objects.equals(index, that.index) && Objects.equals(docId, that.docId)
+        return Objects.equals(index, that.index)
+                && Objects.equals(docId, that.docId)
+                && Objects.equals(opType, that.opType)
                 && Objects.equals(executionTimeField, that.executionTimeField)
                 && Objects.equals(timeout, that.timeout)
                 && Objects.equals(dynamicNameTimeZone, that.dynamicNameTimeZone)
@@ -86,7 +96,7 @@ public class IndexAction implements Action {
 
     @Override
     public int hashCode() {
-        return Objects.hash(index, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
+        return Objects.hash(index, docId, opType, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
     }
 
     @Override
@@ -98,6 +108,9 @@ public class IndexAction implements Action {
         if (docId != null) {
             builder.field(Field.DOC_ID.getPreferredName(), docId);
         }
+        if (opType != null) {
+            builder.field(Field.OP_TYPE.getPreferredName(), opType);
+        }
         if (executionTimeField != null) {
             builder.field(Field.EXECUTION_TIME_FIELD.getPreferredName(), executionTimeField);
         }
@@ -116,6 +129,7 @@ public class IndexAction implements Action {
     public static IndexAction parse(String watchId, String actionId, XContentParser parser) throws IOException {
         String index = null;
         String docId = null;
+        DocWriteRequest.OpType opType = null;
         String executionTimeField = null;
         TimeValue timeout = null;
         ZoneId dynamicNameTimeZone = null;
@@ -143,6 +157,17 @@ public class IndexAction implements Action {
             } else if (token == XContentParser.Token.VALUE_STRING) {
                 if (Field.DOC_ID.match(currentFieldName, parser.getDeprecationHandler())) {
                     docId = parser.text();
+                } else if (Field.OP_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
+                    try {
+                        opType = DocWriteRequest.OpType.fromString(parser.text());
+                        if (List.of(DocWriteRequest.OpType.CREATE, DocWriteRequest.OpType.INDEX).contains(opType) == false) {
+                            throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. op_type value for field [{}] " +
+                                "must be [index] or [create]", TYPE, watchId, actionId, currentFieldName);
+                        }
+                    } catch (IllegalArgumentException e) {
+                        throw new ElasticsearchParseException("could not parse [{}] action [{}/{}]. failed to parse op_type value for " +
+                            "field [{}]", TYPE, watchId, actionId, currentFieldName);
+                    }
                 } else if (Field.EXECUTION_TIME_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                     executionTimeField = parser.text();
                 } else if (Field.TIMEOUT_HUMAN.match(currentFieldName, parser.getDeprecationHandler())) {
@@ -167,7 +192,7 @@ public class IndexAction implements Action {
             }
         }
 
-        return new IndexAction(index, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
+        return new IndexAction(index, docId, opType, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
     }
 
     public static Builder builder(String index) {
@@ -247,6 +272,7 @@ public class IndexAction implements Action {
 
         final String index;
         String docId;
+        DocWriteRequest.OpType opType;
         String executionTimeField;
         TimeValue timeout;
         ZoneId dynamicNameTimeZone;
@@ -261,6 +287,11 @@ public class IndexAction implements Action {
             return this;
         }
 
+        public Builder setOpType(DocWriteRequest.OpType opType) {
+            this.opType = opType;
+            return this;
+        }
+
         public Builder setExecutionTimeField(String executionTimeField) {
             this.executionTimeField = executionTimeField;
             return this;
@@ -283,13 +314,14 @@ public class IndexAction implements Action {
 
         @Override
         public IndexAction build() {
-            return new IndexAction(index, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
+            return new IndexAction(index, docId, opType, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
         }
     }
 
     interface Field {
         ParseField INDEX = new ParseField("index");
         ParseField DOC_ID = new ParseField("doc_id");
+        ParseField OP_TYPE = new ParseField("op_type");
         ParseField EXECUTION_TIME_FIELD = new ParseField("execution_time_field");
         ParseField SOURCE = new ParseField("source");
         ParseField RESPONSE = new ParseField("response");

+ 43 - 9
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/index/IndexActionTests.java

@@ -51,6 +51,7 @@ import static java.util.Map.entry;
 import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
 import static org.elasticsearch.common.util.set.Sets.newHashSet;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasSize;
@@ -89,6 +90,10 @@ public class IndexActionTests extends ESTestCase {
         if (writeTimeout != null) {
             builder.field(IndexAction.Field.TIMEOUT.getPreferredName(), writeTimeout.millis());
         }
+        DocWriteRequest.OpType opType = randomBoolean() ? DocWriteRequest.OpType.fromId(randomFrom(new Byte[] { 0, 1 })) : null;
+        if (opType != null) {
+            builder.field(IndexAction.Field.OP_TYPE.getPreferredName(), opType.getLowercase());
+        }
         builder.endObject();
         IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client);
         XContentParser parser = createParser(builder);
@@ -102,6 +107,9 @@ public class IndexActionTests extends ESTestCase {
         if (timestampField != null) {
             assertThat(executable.action().executionTimeField, equalTo(timestampField));
         }
+        if (opType != null) {
+            assertThat(executable.action().opType, equalTo(opType));
+        }
         assertThat(executable.action().timeout, equalTo(writeTimeout));
     }
 
@@ -130,20 +138,47 @@ public class IndexActionTests extends ESTestCase {
                 .endObject());
     }
 
+    public void testOpTypeThatCannotBeParsed() throws Exception {
+        expectParseFailure(jsonBuilder()
+            .startObject()
+            .field(IndexAction.Field.OP_TYPE.getPreferredName(), randomAlphaOfLength(10))
+            .endObject(),
+            "failed to parse op_type value for field [op_type]");
+    }
+
+    public void testUnsupportedOpType() throws Exception {
+        expectParseFailure(jsonBuilder()
+            .startObject()
+            .field(IndexAction.Field.OP_TYPE.getPreferredName(),
+                randomFrom(DocWriteRequest.OpType.UPDATE.name(), DocWriteRequest.OpType.DELETE.name()))
+            .endObject(),
+            "op_type value for field [op_type] must be [index] or [create]");
+    }
+
+    private void expectParseFailure(XContentBuilder builder, String expectedMessage) throws Exception {
+        expectFailure(ElasticsearchParseException.class, builder, expectedMessage);
+    }
+
     private void expectParseFailure(XContentBuilder builder) throws Exception {
         expectFailure(ElasticsearchParseException.class, builder);
     }
 
     private void expectFailure(Class clazz, XContentBuilder builder) throws Exception {
+        expectFailure(clazz, builder, null);
+    }
+
+    private void expectFailure(Class clazz, XContentBuilder builder, String expectedMessage) throws Exception {
         IndexActionFactory actionParser = new IndexActionFactory(Settings.EMPTY, client);
         XContentParser parser = createParser(builder);
         parser.nextToken();
-        expectThrows(clazz, () ->
-                actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser));
+        Throwable t = expectThrows(clazz, () -> actionParser.parseExecutable(randomAlphaOfLength(4), randomAlphaOfLength(5), parser));
+        if (expectedMessage != null) {
+            assertThat(t.getMessage(), containsString(expectedMessage));
+        }
     }
 
     public void testUsingParameterIdWithBulkOrIdFieldThrowsIllegalState() {
-        final IndexAction action = new IndexAction("test-index", "123", null, null, null, refreshPolicy);
+        final IndexAction action = new IndexAction("test-index", "123", null, null, null, null, refreshPolicy);
         final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
                 TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
         final Map<String, Object> docWithId = Map.of(
@@ -191,7 +226,7 @@ public class IndexActionTests extends ESTestCase {
 
         final IndexAction action = new IndexAction(configureIndexDynamically ? null : "my_index",
                 configureIdDynamically ? null : "my_id",
-                null, null, null, refreshPolicy);
+                null, null, null, null, refreshPolicy);
         final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
                 TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
 
@@ -211,7 +246,7 @@ public class IndexActionTests extends ESTestCase {
     }
 
     public void testThatIndexActionCanBeConfiguredWithDynamicIndexNameAndBulk() throws Exception {
-        final IndexAction action = new IndexAction(null, null, null, null, null, refreshPolicy);
+        final IndexAction action = new IndexAction(null, null, null, null, null, null, refreshPolicy);
         final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
                 TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
 
@@ -239,7 +274,7 @@ public class IndexActionTests extends ESTestCase {
     public void testConfigureIndexInMapAndAction() {
         String fieldName = "_index";
         final IndexAction action = new IndexAction("my_index",
-                null,null, null, null, refreshPolicy);
+                null, null,null, null, null, refreshPolicy);
         final ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
                 TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
 
@@ -258,8 +293,7 @@ public class IndexActionTests extends ESTestCase {
         String docId = randomAlphaOfLength(5);
         String timestampField = randomFrom("@timestamp", null);
 
-        IndexAction action = new IndexAction("test-index", docIdAsParam ? docId : null, timestampField, null, null,
-                refreshPolicy);
+        IndexAction action = new IndexAction("test-index", docIdAsParam ? docId : null, null, timestampField, null, null, refreshPolicy);
         ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30),
                 TimeValue.timeValueSeconds(30));
         ZonedDateTime executionTime = DateUtils.nowWithMillisResolution();
@@ -308,7 +342,7 @@ public class IndexActionTests extends ESTestCase {
     }
 
     public void testFailureResult() throws Exception {
-        IndexAction action = new IndexAction("test-index", null, "@timestamp", null, null, refreshPolicy);
+        IndexAction action = new IndexAction("test-index", null, null, "@timestamp", null, null, refreshPolicy);
         ExecutableIndexAction executable = new ExecutableIndexAction(action, logger, client,
                 TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30));
 

+ 4 - 2
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.watcher.watch;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.client.Client;
@@ -592,8 +593,9 @@ public class WatchTests extends ESTestCase {
             ZoneOffset timeZone = randomBoolean() ? ZoneOffset.UTC : null;
             TimeValue timeout = randomBoolean() ? timeValueSeconds(between(1, 10000)) : null;
             WriteRequest.RefreshPolicy refreshPolicy = randomBoolean() ? null : randomFrom(WriteRequest.RefreshPolicy.values());
-            IndexAction action = new IndexAction("_index", randomBoolean() ? "123" : null, null, timeout, timeZone,
-                    refreshPolicy);
+            IndexAction action = new IndexAction("_index", randomBoolean() ? "123" : null,
+                randomBoolean() ? DocWriteRequest.OpType.fromId(randomFrom(new Byte[] { 0, 1 })) : null, null, timeout, timeZone,
+                refreshPolicy);
             list.add(new ActionWrapper("_index_" + randomAlphaOfLength(8), randomThrottler(),
                     AlwaysConditionTests.randomCondition(scriptService),  randomTransform(),
                     new ExecutableIndexAction(action, logger, client, TimeValue.timeValueSeconds(30),