Bladeren bron

HLRC: execute watch API (#35868)

This change adds support for the execute watch API in the high level rest client
Alan Woodward 6 jaren geleden
bovenliggende
commit
d2886e1c81

+ 29 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherClient.java

@@ -26,6 +26,8 @@ import org.elasticsearch.client.watcher.ActivateWatchRequest;
 import org.elasticsearch.client.watcher.ActivateWatchResponse;
 import org.elasticsearch.client.watcher.AckWatchRequest;
 import org.elasticsearch.client.watcher.AckWatchResponse;
+import org.elasticsearch.client.watcher.ExecuteWatchRequest;
+import org.elasticsearch.client.watcher.ExecuteWatchResponse;
 import org.elasticsearch.client.watcher.GetWatchRequest;
 import org.elasticsearch.client.watcher.GetWatchResponse;
 import org.elasticsearch.client.watcher.StartWatchServiceRequest;
@@ -269,6 +271,33 @@ public final class WatcherClient {
             ActivateWatchResponse::fromXContent, listener, singleton(404));
     }
 
+    /**
+     * Execute a watch on the cluster
+     * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-execute-watch.html">
+     * the docs</a> for more.
+     * @param request the request
+     * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @return the response
+     * @throws IOException if there is a problem sending the request or parsing the response
+     */
+    public ExecuteWatchResponse executeWatch(ExecuteWatchRequest request, RequestOptions options) throws IOException {
+        return restHighLevelClient.performRequestAndParseEntity(request, WatcherRequestConverters::executeWatch, options,
+            ExecuteWatchResponse::fromXContent, emptySet());
+    }
+
+    /**
+     * Asynchronously execute a watch on the cluster
+     * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-execute-watch.html">
+     * the docs</a> for more.
+     * @param request the request
+     * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @param listener the listener to be notifed upon request completion
+     */
+    public void executeWatchAsync(ExecuteWatchRequest request, RequestOptions options, ActionListener<ExecuteWatchResponse> listener) {
+        restHighLevelClient.performRequestAsyncAndParseEntity(request, WatcherRequestConverters::executeWatch, options,
+            ExecuteWatchResponse::fromXContent, listener, emptySet());
+    }
+
     /**
      * Get the watcher stats
      * See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-stats.html">

+ 28 - 2
client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java

@@ -25,16 +25,20 @@ import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpPut;
 import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.entity.ContentType;
-import org.elasticsearch.client.watcher.DeactivateWatchRequest;
-import org.elasticsearch.client.watcher.ActivateWatchRequest;
 import org.elasticsearch.client.watcher.AckWatchRequest;
+import org.elasticsearch.client.watcher.ActivateWatchRequest;
+import org.elasticsearch.client.watcher.DeactivateWatchRequest;
 import org.elasticsearch.client.watcher.DeleteWatchRequest;
+import org.elasticsearch.client.watcher.ExecuteWatchRequest;
 import org.elasticsearch.client.watcher.GetWatchRequest;
 import org.elasticsearch.client.watcher.PutWatchRequest;
 import org.elasticsearch.client.watcher.StartWatchServiceRequest;
 import org.elasticsearch.client.watcher.StopWatchServiceRequest;
 import org.elasticsearch.client.watcher.WatcherStatsRequest;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import java.io.IOException;
 
 final class WatcherRequestConverters {
 
@@ -108,6 +112,28 @@ final class WatcherRequestConverters {
         return request;
     }
 
+    static Request executeWatch(ExecuteWatchRequest executeWatchRequest) throws IOException {
+        String endpoint = new RequestConverters.EndpointBuilder()
+            .addPathPartAsIs("_xpack", "watcher", "watch")
+            .addPathPart(executeWatchRequest.getId())       // will ignore if ID is null
+            .addPathPartAsIs("_execute").build();
+
+        Request request = new Request(HttpPost.METHOD_NAME, endpoint);
+        RequestConverters.Params params = new RequestConverters.Params(request);
+        if (executeWatchRequest.isDebug()) {
+            params.putParam("debug", "true");
+        }
+        if (executeWatchRequest.ignoreCondition()) {
+            params.putParam("ignore_condition", "true");
+        }
+        if (executeWatchRequest.recordExecution()) {
+            params.putParam("record_execution", "true");
+        }
+
+        request.setEntity(RequestConverters.createEntity(executeWatchRequest, XContentType.JSON));
+        return request;
+    }
+
     public static Request ackWatch(AckWatchRequest ackWatchRequest) {
         String endpoint = new RequestConverters.EndpointBuilder()
             .addPathPartAsIs("_xpack", "watcher", "watch")

+ 85 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/common/XContentSource.java

@@ -0,0 +1,85 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.common;
+
+import org.elasticsearch.common.xcontent.ObjectPath;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentUtils;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Encapsulates the xcontent source
+ */
+public class XContentSource {
+
+    private final Object data;
+
+    /**
+     * Constructs a new XContentSource out of the given parser
+     */
+    public XContentSource(XContentParser parser) throws IOException {
+        this.data = XContentUtils.readValue(parser, parser.nextToken());
+    }
+
+    /**
+     * @return true if the top level value of the source is a map
+     */
+    public boolean isMap() {
+        return data instanceof Map;
+    }
+
+    /**
+     * @return The source as a map
+     */
+    @SuppressWarnings("unchecked")
+    public Map<String, Object> getAsMap() {
+        return (Map<String, Object>) data;
+    }
+
+    /**
+     * @return true if the top level value of the source is a list
+     */
+    public boolean isList() {
+        return data instanceof List;
+    }
+
+    /**
+     * @return The source as a list
+     */
+    @SuppressWarnings("unchecked")
+    public List<Object> getAsList() {
+        return (List<Object>) data;
+    }
+
+    /**
+     * Extracts a value identified by the given path in the source.
+     *
+     * @param path a dot notation path to the requested value
+     * @return The extracted value or {@code null} if no value is associated with the given path
+     */
+    @SuppressWarnings("unchecked")
+    public <T> T getValue(String path) {
+        return (T) ObjectPath.eval(path, data);
+    }
+
+}

+ 175 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/ExecuteWatchRequest.java

@@ -0,0 +1,175 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.watcher;
+
+import org.elasticsearch.client.Validatable;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * An execute watch request to execute a watch by id or inline
+ */
+public class ExecuteWatchRequest implements Validatable, ToXContentObject {
+
+    public enum ActionExecutionMode {
+        SIMULATE, FORCE_SIMULATE, EXECUTE, FORCE_EXECUTE, SKIP
+    }
+
+    private final String id;
+    private final BytesReference watchContent;
+
+    private boolean ignoreCondition = false;
+    private boolean recordExecution = false;
+    private boolean debug = false;
+
+    @Nullable
+    private BytesReference triggerData = null;
+
+    @Nullable
+    private BytesReference alternativeInput = null;
+
+    private Map<String, ActionExecutionMode> actionModes = new HashMap<>();
+
+    /**
+     * Execute an existing watch on the cluster
+     *
+     * @param id the id of the watch to execute
+     */
+    public static ExecuteWatchRequest byId(String id) {
+        return new ExecuteWatchRequest(Objects.requireNonNull(id, "Watch id cannot be null"), null);
+    }
+
+    /**
+     * Execute an inline watch
+     * @param watchContent the JSON definition of the watch
+     */
+    public static ExecuteWatchRequest inline(String watchContent) {
+        return new ExecuteWatchRequest(null, Objects.requireNonNull(watchContent, "Watch content cannot be null"));
+    }
+
+    private ExecuteWatchRequest(String id, String watchContent) {
+        this.id = id;
+        this.watchContent = watchContent == null ? null : new BytesArray(watchContent);
+    }
+
+    public String getId() {
+        return this.id;
+    }
+
+    /**
+     * @param ignoreCondition set if the condition for this execution be ignored
+     */
+    public void setIgnoreCondition(boolean ignoreCondition) {
+        this.ignoreCondition = ignoreCondition;
+    }
+
+    public boolean ignoreCondition() {
+        return ignoreCondition;
+    }
+
+    /**
+     * @param recordExecution Sets if this execution be recorded in the history index
+     */
+    public void setRecordExecution(boolean recordExecution) {
+        if (watchContent != null && recordExecution) {
+            throw new IllegalArgumentException("The execution of an inline watch cannot be recorded");
+        }
+        this.recordExecution = recordExecution;
+    }
+
+    public boolean recordExecution() {
+        return recordExecution;
+    }
+
+    /**
+     * @param alternativeInput Sets the alternative input
+     */
+    public void setAlternativeInput(String alternativeInput) {
+        this.alternativeInput = new BytesArray(alternativeInput);
+    }
+
+    /**
+     * @param data A JSON string representing the data that should be associated with the trigger event.
+     */
+    public void setTriggerData(String data) {
+        this.triggerData = new BytesArray(data);
+    }
+
+    /**
+     * Sets the action execution mode for the give action (identified by its id).
+     *
+     * @param actionId      the action id.
+     * @param actionMode    the execution mode of the action.
+     */
+    public void setActionMode(String actionId, ActionExecutionMode actionMode) {
+        Objects.requireNonNull(actionId, "actionId cannot be null");
+        actionModes.put(actionId, actionMode);
+    }
+
+    public Map<String, ActionExecutionMode> getActionModes() {
+        return this.actionModes;
+    }
+
+    /**
+     * @param debug indicates whether the watch should execute in debug mode. In debug mode the
+     *              returned watch record will hold the execution {@code vars}
+     */
+    public void setDebug(boolean debug) {
+        this.debug = debug;
+    }
+
+    public boolean isDebug() {
+        return debug;
+    }
+
+    @Override
+    public String toString() {
+        return "execute[" + id + "]";
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        if (triggerData != null) {
+            builder.rawField("trigger_data", triggerData.streamInput(), XContentType.JSON);
+        }
+        if (alternativeInput != null) {
+            builder.rawField("alternative_input", alternativeInput.streamInput(), XContentType.JSON);
+        }
+        if (actionModes.size() > 0) {
+            builder.field("action_modes", actionModes);
+        }
+        if (watchContent != null) {
+            builder.rawField("watch", watchContent.streamInput(), XContentType.JSON);
+        }
+        builder.endObject();
+        return builder;
+    }
+}
+

+ 107 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/ExecuteWatchResponse.java

@@ -0,0 +1,107 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.watcher;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.common.xcontent.XContentUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+public class ExecuteWatchResponse {
+
+    public static final ParseField ID_FIELD = new ParseField("_id");
+    public static final ParseField WATCH_FIELD = new ParseField("watch_record");
+
+    private String recordId;
+    private BytesReference contentSource;
+
+    private Map<String, Object> data;
+
+    public ExecuteWatchResponse() {
+    }
+
+    public ExecuteWatchResponse(String recordId, BytesReference contentSource) {
+        this.recordId = recordId;
+        this.contentSource = contentSource;
+    }
+
+    /**
+     * @return The id of the watch record holding the watch execution result.
+     */
+    public String getRecordId() {
+        return recordId;
+    }
+
+    /**
+     * @return The watch record source
+     */
+    public BytesReference getRecord() {
+        return contentSource;
+    }
+
+    /**
+     * Returns the watch record as a map
+     *
+     * Use {@link org.elasticsearch.common.xcontent.ObjectPath} to navigate through the data
+     */
+    @SuppressWarnings("unchecked")
+    public Map<String, Object> getRecordAsMap() {
+        if (data == null) {
+            // EMPTY is safe here because we never use namedObject
+            try (InputStream stream = contentSource.streamInput();
+                 XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, null, stream)) {
+                data = (Map<String, Object>) XContentUtils.readValue(parser, parser.nextToken());
+            } catch (IOException ex) {
+                throw new ElasticsearchException("failed to read value", ex);
+            }
+        }
+        return data;
+    }
+
+    private static final ConstructingObjectParser<ExecuteWatchResponse, Void> PARSER
+        = new ConstructingObjectParser<>("x_pack_execute_watch_response", false,
+        (fields) -> new ExecuteWatchResponse((String)fields[0], (BytesReference) fields[1]));
+    static {
+        PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD);
+        PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> readBytesReference(p), WATCH_FIELD);
+    }
+
+    public static ExecuteWatchResponse fromXContent(XContentParser parser) throws IOException {
+        return PARSER.parse(parser, null);
+    }
+
+    private static BytesReference readBytesReference(XContentParser parser) throws IOException {
+        try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
+            builder.copyCurrentStructure(parser);
+            return BytesReference.bytes(builder);
+        }
+    }
+
+}

+ 50 - 14
client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherIT.java

@@ -19,10 +19,6 @@
 package org.elasticsearch.client;
 
 import org.elasticsearch.ElasticsearchStatusException;
-import org.elasticsearch.client.watcher.DeactivateWatchRequest;
-import org.elasticsearch.client.watcher.DeactivateWatchResponse;
-import org.elasticsearch.client.watcher.ActivateWatchRequest;
-import org.elasticsearch.client.watcher.ActivateWatchResponse;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.watcher.AckWatchRequest;
 import org.elasticsearch.client.watcher.AckWatchResponse;
@@ -30,6 +26,14 @@ import org.elasticsearch.client.watcher.ActionStatus;
 import org.elasticsearch.client.watcher.ActionStatus.AckStatus;
 import org.elasticsearch.client.watcher.ActivateWatchRequest;
 import org.elasticsearch.client.watcher.ActivateWatchResponse;
+import org.elasticsearch.client.watcher.DeactivateWatchRequest;
+import org.elasticsearch.client.watcher.DeactivateWatchResponse;
+import org.elasticsearch.client.watcher.DeleteWatchRequest;
+import org.elasticsearch.client.watcher.DeleteWatchResponse;
+import org.elasticsearch.client.watcher.ExecuteWatchRequest;
+import org.elasticsearch.client.watcher.ExecuteWatchResponse;
+import org.elasticsearch.client.watcher.PutWatchRequest;
+import org.elasticsearch.client.watcher.PutWatchResponse;
 import org.elasticsearch.client.watcher.StartWatchServiceRequest;
 import org.elasticsearch.client.watcher.StopWatchServiceRequest;
 import org.elasticsearch.client.watcher.WatcherState;
@@ -37,13 +41,13 @@ import org.elasticsearch.client.watcher.WatcherStatsRequest;
 import org.elasticsearch.client.watcher.WatcherStatsResponse;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.xcontent.ObjectPath;
 import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.client.watcher.DeleteWatchRequest;
-import org.elasticsearch.client.watcher.DeleteWatchResponse;
-import org.elasticsearch.client.watcher.PutWatchRequest;
-import org.elasticsearch.client.watcher.PutWatchResponse;
 import org.elasticsearch.rest.RestStatus;
 
+import java.util.Map;
+
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
@@ -81,13 +85,14 @@ public class WatcherIT extends ESRestHighLevelClientTestCase {
         assertThat(putWatchResponse.getVersion(), is(1L));
     }
 
+    private static final String WATCH_JSON = "{ \n" +
+        "  \"trigger\": { \"schedule\": { \"interval\": \"10h\" } },\n" +
+        "  \"input\": { \"none\": {} },\n" +
+        "  \"actions\": { \"logme\": { \"logging\": { \"text\": \"{{ctx.payload}}\" } } }\n" +
+        "}";
+
     private PutWatchResponse createWatch(String watchId) throws Exception {
-        String json = "{ \n" +
-            "  \"trigger\": { \"schedule\": { \"interval\": \"10h\" } },\n" +
-            "  \"input\": { \"none\": {} },\n" +
-            "  \"actions\": { \"logme\": { \"logging\": { \"text\": \"{{ctx.payload}}\" } } }\n" +
-            "}";
-        BytesReference bytesReference = new BytesArray(json);
+        BytesReference bytesReference = new BytesArray(WATCH_JSON);
         PutWatchRequest putWatchRequest = new PutWatchRequest(watchId, bytesReference, XContentType.JSON);
         return highLevelClient().watcher().putWatch(putWatchRequest, RequestOptions.DEFAULT);
     }
@@ -185,6 +190,37 @@ public class WatcherIT extends ESRestHighLevelClientTestCase {
         assertEquals(RestStatus.NOT_FOUND, exception.status());
     }
 
+
+    public void testExecuteWatchById() throws Exception {
+        String watchId = randomAlphaOfLength(10);
+        createWatch(watchId);
+
+        ExecuteWatchResponse response = highLevelClient().watcher()
+            .executeWatch(ExecuteWatchRequest.byId(watchId), RequestOptions.DEFAULT);
+        assertThat(response.getRecordId(), containsString(watchId));
+
+        Map<String, Object> source = response.getRecordAsMap();
+        assertThat(ObjectPath.eval("trigger_event.type", source), is("manual"));
+
+    }
+
+    public void testExecuteWatchThatDoesNotExist() throws Exception {
+        String watchId = randomAlphaOfLength(10);
+        // exception when activating a not existing watcher
+        ElasticsearchStatusException exception =  expectThrows(ElasticsearchStatusException.class, () ->
+            highLevelClient().watcher().executeWatch(ExecuteWatchRequest.byId(watchId), RequestOptions.DEFAULT));
+        assertEquals(RestStatus.NOT_FOUND, exception.status());
+    }
+
+    public void testExecuteInlineWatch() throws Exception {
+        ExecuteWatchResponse response = highLevelClient().watcher()
+            .executeWatch(ExecuteWatchRequest.inline(WATCH_JSON), RequestOptions.DEFAULT);
+        assertThat(response.getRecordId(), containsString("_inlined_"));
+
+        Map<String, Object> source = response.getRecordAsMap();
+        assertThat(ObjectPath.eval("trigger_event.type", source), is("manual"));
+    }
+
     public void testWatcherStatsMetrics() throws Exception {
         boolean includeCurrent = randomBoolean();
         boolean includeQueued = randomBoolean();

+ 145 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/WatcherRequestConvertersTests.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.client;
 
+import org.apache.http.HttpEntity;
 import org.apache.http.client.methods.HttpDelete;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
@@ -27,6 +28,7 @@ import org.elasticsearch.client.watcher.AckWatchRequest;
 import org.elasticsearch.client.watcher.ActivateWatchRequest;
 import org.elasticsearch.client.watcher.DeactivateWatchRequest;
 import org.elasticsearch.client.watcher.DeleteWatchRequest;
+import org.elasticsearch.client.watcher.ExecuteWatchRequest;
 import org.elasticsearch.client.watcher.PutWatchRequest;
 import org.elasticsearch.client.watcher.GetWatchRequest;
 import org.elasticsearch.client.watcher.StartWatchServiceRequest;
@@ -38,12 +40,15 @@ import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.test.ESTestCase;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.StringJoiner;
 
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.hasSize;
@@ -53,6 +58,12 @@ import static org.hamcrest.Matchers.nullValue;
 
 public class WatcherRequestConvertersTests extends ESTestCase {
 
+    private static String toString(HttpEntity entity) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        entity.writeTo(baos);
+        return baos.toString(StandardCharsets.UTF_8.name());
+    }
+
     public void testStartWatchService() {
         Request request = WatcherRequestConverters.startWatchService(new StartWatchServiceRequest());
         assertEquals(HttpPost.METHOD_NAME, request.getMethod());
@@ -177,4 +188,138 @@ public class WatcherRequestConvertersTests extends ESTestCase {
         }
         assertThat(request.getEntity(), nullValue());
     }
+
+    public void testExecuteWatchByIdRequest() throws IOException {
+
+        boolean ignoreCondition = randomBoolean();
+        boolean recordExecution = randomBoolean();
+        boolean debug = randomBoolean();
+
+        ExecuteWatchRequest request = ExecuteWatchRequest.byId("my_id");
+        request.setIgnoreCondition(ignoreCondition);
+        request.setRecordExecution(recordExecution);
+        request.setDebug(debug);
+
+        boolean setActionMode = randomBoolean();
+        if (setActionMode) {
+            request.setActionMode("action1", ExecuteWatchRequest.ActionExecutionMode.SIMULATE);
+        }
+
+        boolean useTriggerData = randomBoolean();
+        String triggerData = "{ \"entry1\" : \"blah\", \"entry2\" : \"blah\" }";
+        if (useTriggerData) {
+            request.setTriggerData(triggerData);
+        }
+
+        boolean useAlternativeInput = randomBoolean();
+        String alternativeInput = "{ \"foo\" : \"bar\" }";
+        if (useAlternativeInput) {
+            request.setAlternativeInput(alternativeInput);
+        }
+
+        Request req = WatcherRequestConverters.executeWatch(request);
+        assertThat(req.getEndpoint(), equalTo("/_xpack/watcher/watch/my_id/_execute"));
+        assertThat(req.getMethod(), equalTo(HttpPost.METHOD_NAME));
+
+        if (ignoreCondition) {
+            assertThat(req.getParameters(), hasKey("ignore_condition"));
+            assertThat(req.getParameters().get("ignore_condition"), is("true"));
+        }
+
+        if (recordExecution) {
+            assertThat(req.getParameters(), hasKey("record_execution"));
+            assertThat(req.getParameters().get("record_execution"), is("true"));
+        }
+
+        if (debug) {
+            assertThat(req.getParameters(), hasKey("debug"));
+            assertThat(req.getParameters().get("debug"), is("true"));
+        }
+
+        String body = toString(req.getEntity());
+        if (setActionMode) {
+            assertThat(body, containsString("\"action_modes\":{\"action1\":\"SIMULATE\"}"));
+        }
+        else {
+            assertThat(body, not(containsString("action_modes")));
+        }
+        if (useTriggerData) {
+            assertThat(body, containsString("\"trigger_data\":" + triggerData));
+        }
+        else {
+            assertThat(body, not(containsString("trigger_data")));
+        }
+        if (useAlternativeInput) {
+            assertThat(body, containsString("\"alternative_input\":" + alternativeInput));
+        }
+        else {
+            assertThat(body, not(containsString("alternative_input")));
+        }
+        assertThat(body, not(containsString("\"watch\":")));
+
+    }
+
+    private static final String WATCH_JSON = "{ \n" +
+        "  \"trigger\": { \"schedule\": { \"interval\": \"10h\" } },\n" +
+        "  \"input\": { \"none\": {} },\n" +
+        "  \"actions\": { \"logme\": { \"logging\": { \"text\": \"{{ctx.payload}}\" } } }\n" +
+        "}";
+
+    public void testExecuteInlineWatchRequest() throws IOException {
+        boolean ignoreCondition = randomBoolean();
+
+        ExecuteWatchRequest request = ExecuteWatchRequest.inline(WATCH_JSON);
+        request.setIgnoreCondition(ignoreCondition);
+
+        expectThrows(IllegalArgumentException.class, () -> {
+            request.setRecordExecution(true);
+        });
+
+        boolean setActionMode = randomBoolean();
+        if (setActionMode) {
+            request.setActionMode("action1", ExecuteWatchRequest.ActionExecutionMode.SIMULATE);
+        }
+
+        boolean useTriggerData = randomBoolean();
+        String triggerData = "{ \"entry1\" : \"blah\", \"entry2\" : \"blah\" }";
+        if (useTriggerData) {
+            request.setTriggerData(triggerData);
+        }
+
+        boolean useAlternativeInput = randomBoolean();
+        String alternativeInput = "{ \"foo\" : \"bar\" }";
+        if (useAlternativeInput) {
+            request.setAlternativeInput(alternativeInput);
+        }
+
+        Request req = WatcherRequestConverters.executeWatch(request);
+        assertThat(req.getEndpoint(), equalTo("/_xpack/watcher/watch/_execute"));
+        assertThat(req.getMethod(), equalTo(HttpPost.METHOD_NAME));
+
+        if (ignoreCondition) {
+            assertThat(req.getParameters(), hasKey("ignore_condition"));
+            assertThat(req.getParameters().get("ignore_condition"), is("true"));
+        }
+
+        String body = toString(req.getEntity());
+        if (setActionMode) {
+            assertThat(body, containsString("\"action_modes\":{\"action1\":\"SIMULATE\"}"));
+        }
+        else {
+            assertThat(body, not(containsString("action_modes")));
+        }
+        if (useTriggerData) {
+            assertThat(body, containsString("\"trigger_data\":" + triggerData));
+        }
+        else {
+            assertThat(body, not(containsString("trigger_data")));
+        }
+        if (useAlternativeInput) {
+            assertThat(body, containsString("\"alternative_input\":" + alternativeInput));
+        }
+        else {
+            assertThat(body, not(containsString("alternative_input")));
+        }
+        assertThat(body, containsString("\"watch\":" + WATCH_JSON));
+    }
 }

+ 109 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/WatcherDocumentationIT.java

@@ -34,6 +34,8 @@ import org.elasticsearch.client.watcher.ActionStatus;
 import org.elasticsearch.client.watcher.ActionStatus.AckStatus;
 import org.elasticsearch.client.watcher.DeactivateWatchRequest;
 import org.elasticsearch.client.watcher.DeactivateWatchResponse;
+import org.elasticsearch.client.watcher.ExecuteWatchRequest;
+import org.elasticsearch.client.watcher.ExecuteWatchResponse;
 import org.elasticsearch.client.watcher.GetWatchRequest;
 import org.elasticsearch.client.watcher.GetWatchResponse;
 import org.elasticsearch.client.watcher.StartWatchServiceRequest;
@@ -43,6 +45,7 @@ import org.elasticsearch.client.watcher.WatcherStatsRequest;
 import org.elasticsearch.client.watcher.WatcherStatsResponse;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.xcontent.ObjectPath;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.client.watcher.DeleteWatchRequest;
 import org.elasticsearch.client.watcher.DeleteWatchResponse;
@@ -51,6 +54,7 @@ import org.elasticsearch.client.watcher.PutWatchResponse;
 import org.elasticsearch.rest.RestStatus;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -199,6 +203,52 @@ public class WatcherDocumentationIT extends ESRestHighLevelClientTestCase {
             assertTrue(latch.await(30L, TimeUnit.SECONDS));
         }
 
+        {
+            // tag::x-pack-execute-watch-by-id
+            ExecuteWatchRequest request = ExecuteWatchRequest.byId("my_watch_id");
+            request.setAlternativeInput("{ \"foo\" : \"bar\" }");                                         // <1>
+            request.setActionMode("action1", ExecuteWatchRequest.ActionExecutionMode.SIMULATE);             // <2>
+            request.setRecordExecution(true);                                                               // <3>
+            request.setIgnoreCondition(true);                                                               // <4>
+            request.setTriggerData("{\"triggered_time\":\"now\"}");                                         // <5>
+            request.setDebug(true);                                                                         // <6>
+            ExecuteWatchResponse response = client.watcher().executeWatch(request, RequestOptions.DEFAULT);
+            // end::x-pack-execute-watch-by-id
+
+            // tag::x-pack-execute-watch-by-id-response
+            String id = response.getRecordId();                                         // <1>
+            Map<String, Object> watch = response.getRecordAsMap();                      // <2>
+            String watch_id = ObjectPath.eval("watch_record.watch_id", watch);          // <3>
+            // end::x-pack-execute-watch-by-id-response
+        }
+
+        {
+            ExecuteWatchRequest request = ExecuteWatchRequest.byId("my_watch_id");
+            // tag::x-pack-execute-watch-by-id-execute-listener
+            ActionListener<ExecuteWatchResponse> listener = new ActionListener<ExecuteWatchResponse>() {
+                @Override
+                public void onResponse(ExecuteWatchResponse response) {
+                    // <1>
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    // <2>
+                }
+            };
+            // end::x-pack-execute-watch-by-id-execute-listener
+
+            // Replace the empty listener by a blocking listener in test
+            final CountDownLatch latch = new CountDownLatch(1);
+            listener = new LatchedActionListener<>(listener, latch);
+
+            // tag::x-pack-execute-watch-by-id-execute-async
+            client.watcher().executeWatchAsync(request, RequestOptions.DEFAULT, listener); // <1>
+            // end::x-pack-execute-watch-by-id-execute-async
+
+            assertTrue(latch.await(30L, TimeUnit.SECONDS));
+        }
+
         {
             //tag::get-watch-request
             GetWatchRequest request = new GetWatchRequest("my_watch_id");
@@ -285,6 +335,65 @@ public class WatcherDocumentationIT extends ESRestHighLevelClientTestCase {
         }
     }
 
+    public void testExecuteInlineWatch() throws Exception {
+        RestHighLevelClient client = highLevelClient();
+
+        {
+            // tag::x-pack-execute-inline-watch
+            String watchJson = "{ \n" +
+                "  \"trigger\": { \"schedule\": { \"interval\": \"10h\" } },\n" +
+                "  \"input\": { \"none\": {} },\n" +
+                "  \"actions\": { \"logme\": { \"logging\": { \"text\": \"{{ctx.payload}}\" } } }\n" +
+                "}";
+            ExecuteWatchRequest request = ExecuteWatchRequest.inline(watchJson);
+            request.setAlternativeInput("{ \"foo\" : \"bar\" }");                                         // <1>
+            request.setActionMode("action1", ExecuteWatchRequest.ActionExecutionMode.SIMULATE);             // <2>
+            request.setIgnoreCondition(true);                                                               // <3>
+            request.setTriggerData("{\"triggered_time\":\"now\"}");                                         // <4>
+            request.setDebug(true);                                                                         // <5>
+            ExecuteWatchResponse response = client.watcher().executeWatch(request, RequestOptions.DEFAULT);
+            // end::x-pack-execute-inline-watch
+
+            // tag::x-pack-execute-watch-by-id-response
+            String id = response.getRecordId();                                         // <1>
+            Map<String, Object> watch = response.getRecordAsMap();                      // <2>
+            String watch_id = ObjectPath.eval("watch_record.watch_id", watch);          // <3>
+            // end::x-pack-execute-watch-by-id-response
+        }
+
+        {
+            String watchJson = "{ \n" +
+                "  \"trigger\": { \"schedule\": { \"interval\": \"10h\" } },\n" +
+                "  \"input\": { \"none\": {} },\n" +
+                "  \"actions\": { \"logme\": { \"logging\": { \"text\": \"{{ctx.payload}}\" } } }\n" +
+                "}";
+            ExecuteWatchRequest request = ExecuteWatchRequest.inline(watchJson);
+            // tag::x-pack-execute-inline-watch-execute-listener
+            ActionListener<ExecuteWatchResponse> listener = new ActionListener<ExecuteWatchResponse>() {
+                @Override
+                public void onResponse(ExecuteWatchResponse response) {
+                    // <1>
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    // <2>
+                }
+            };
+            // end::x-pack-execute-inline-watch-execute-listener
+
+            // Replace the empty listener by a blocking listener in test
+            final CountDownLatch latch = new CountDownLatch(1);
+            listener = new LatchedActionListener<>(listener, latch);
+
+            // tag::x-pack-execute-inline-watch-execute-async
+            client.watcher().executeWatchAsync(request, RequestOptions.DEFAULT, listener); // <1>
+            // end::x-pack-execute-inline-watch-execute-async
+
+            assertTrue(latch.await(30L, TimeUnit.SECONDS));
+        }
+    }
+
     public void testAckWatch() throws Exception {
         RestHighLevelClient client = highLevelClient();
 

+ 2 - 0
docs/java-rest/high-level/supported-apis.asciidoc

@@ -427,6 +427,7 @@ The Java High Level REST Client supports the following Watcher APIs:
 * <<java-rest-high-watcher-deactivate-watch>>
 * <<{upid}-ack-watch>>
 * <<{upid}-activate-watch>>
+* <<{upid}-execute-watch>>
 * <<{upid}-watcher-stats>>
 
 include::watcher/start-watch-service.asciidoc[]
@@ -437,6 +438,7 @@ include::watcher/delete-watch.asciidoc[]
 include::watcher/ack-watch.asciidoc[]
 include::watcher/deactivate-watch.asciidoc[]
 include::watcher/activate-watch.asciidoc[]
+include::watcher/execute-watch.asciidoc[]
 include::watcher/watcher-stats.asciidoc[]
 
 == Graph APIs

+ 88 - 0
docs/java-rest/high-level/watcher/execute-watch.asciidoc

@@ -0,0 +1,88 @@
+--
+:api: execute-watch
+:request: ExecuteWatchRequest
+:response: ExecuteWatchResponse
+--
+[id="{upid}-{api}"]
+=== Execute Watch API
+
+The execute watch API allows clients to immediately execute a watch, either
+one that has been previously added via the
+{ref}/put-watch.html[Put Watch API] or inline as part of the request.
+
+[id="{upid}-{api}-request-by-id"]
+==== Execute by id
+
+Submit the following request to execute a previously added watch:
+
+["source","java",subs="attributes,callouts,macros"]
+---------------------------------------------------
+include-tagged::{doc-tests-file}[x-pack-execute-watch-by-id]
+---------------------------------------------------
+<1> Alternative input for the watch to use in json format
+<2> Set the mode for action "action1" to SIMULATE
+<3> Record this execution in watcher history
+<4> Execute the watch regardless of the watch's condition
+<5> Set the trigger data for the watch in json format
+<6> Enable debug mode
+
+[id="{upid}-{api}-response-by-id"]
+==== Execute by id Response
+
+The returned `Response` contains details of the execution:
+
+["source","java",subs="attributes,callouts,macros"]
+---------------------------------------------------
+include-tagged::{doc-tests-file}[x-pack-execute-watch-by-id-response]
+---------------------------------------------------
+<1> The record ID for this execution
+<2> The execution response as a java `Map`
+<3> Extract information from the response map using `ObjectPath`
+
+[id="{upid}-{api}-response-by-id-async"]
+==== Asynchronous execution by id
+
+This request can be executed asynchronously:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests-file}[x-pack-execute-watch-by-id-execute-async]
+--------------------------------------------------
+<1> The `ExecuteWatchRequest` to execute and the `ActionListener` to use when
+the execution completes
+
+The asynchronous method does not block and returns immediately. Once it is
+completed the `ActionListener` is called back using the `onResponse` method
+if the execution successfully completed or using the `onFailure` method if
+it failed.
+
+A typical listener for `ExecuteWatchResponse` looks like:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests-file}[x-pack-execute-watch-by-id-execute-listener]
+--------------------------------------------------
+<1> Called when the execution is successfully completed. The response is
+provided as an argument
+<2> Called in case of failure. The raised exception is provided as an argument
+
+
+[id="{upid}-{api}-request-inline"]
+==== Execute inline
+
+Submit the following request to execute a watch defined as part of the request:
+
+["source","java",subs="attributes,callouts,macros"]
+---------------------------------------------------
+include-tagged::{doc-tests-file}[x-pack-execute-watch-inline]
+---------------------------------------------------
+<1> Alternative input for the watch to use in json format
+<2> Set the mode for action "action1" to SIMULATE
+<3> Execute the watch regardless of the watch's condition
+<4> Set the trigger data for the watch in json format
+<5> Enable debug mode
+
+Note that inline watches cannot be recorded.
+
+The response format and asynchronous execution methods are the same as for the
+Execute Watch by ID API.

+ 7 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/xcontent/XContentSource.java

@@ -146,12 +146,16 @@ public class XContentSource implements ToXContent {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         XContentSource that = (XContentSource) o;
-        return Objects.equals(bytes, that.bytes) &&
-            contentType == that.contentType;
+        return Objects.equals(data(), that.data());
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(bytes, contentType);
+        return Objects.hash(data());
+    }
+
+    @Override
+    public String toString() {
+        return bytes.utf8ToString();
     }
 }

+ 59 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/execute/ExecuteWatchResponse.java

@@ -6,18 +6,28 @@
 package org.elasticsearch.xpack.core.watcher.transport.actions.execute;
 
 import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
 
 import java.io.IOException;
+import java.util.Objects;
 
 /**
  * This class contains the WatchHistory generated by running the watch
  */
-public class ExecuteWatchResponse extends ActionResponse {
+public class ExecuteWatchResponse extends ActionResponse implements ToXContentObject {
+
+    public static final ParseField ID_FIELD = new ParseField("_id");
+    public static final ParseField WATCH_FIELD = new ParseField("watch_record");
 
     private String recordId;
     private XContentSource recordSource;
@@ -30,6 +40,25 @@ public class ExecuteWatchResponse extends ActionResponse {
         this.recordSource = new XContentSource(recordSource, contentType);
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ExecuteWatchResponse that = (ExecuteWatchResponse) o;
+        return Objects.equals(recordId, that.recordId) &&
+            Objects.equals(recordSource, that.recordSource);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(recordId, recordSource);
+    }
+
+    @Override
+    public String toString() {
+        return recordId + ":" + recordSource;
+    }
+
     /**
      * @return The id of the watch record holding the watch execution result.
      */
@@ -57,4 +86,33 @@ public class ExecuteWatchResponse extends ActionResponse {
         out.writeString(recordId);
         XContentSource.writeTo(recordSource, out);
     }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.field("_id", recordId);
+        builder.field("watch_record");
+        recordSource.toXContent(builder, params);
+        builder.endObject();
+        return builder;
+    }
+
+    private static final ConstructingObjectParser<ExecuteWatchResponse, Void> PARSER
+        = new ConstructingObjectParser<>("x_pack_execute_watch_response", false,
+        (fields) -> new ExecuteWatchResponse((String)fields[0], (BytesReference) fields[1], XContentType.JSON));
+    static {
+        PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD);
+        PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> readBytesReference(p), WATCH_FIELD);
+    }
+
+    public static ExecuteWatchResponse fromXContent(XContentParser parser) throws IOException {
+        return PARSER.parse(parser, null);
+    }
+
+    private static BytesReference readBytesReference(XContentParser parser) throws IOException {
+        try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
+            builder.copyCurrentStructure(parser);
+            return BytesReference.bytes(builder);
+        }
+    }
 }

+ 64 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/ExecuteWatchResponseTests.java

@@ -0,0 +1,64 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.protocol.xpack.watcher;
+
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.protocol.AbstractHlrcXContentTestCase;
+import org.elasticsearch.xpack.core.watcher.transport.actions.execute.ExecuteWatchResponse;
+
+import java.io.IOException;
+
+public class ExecuteWatchResponseTests
+    extends AbstractHlrcXContentTestCase<ExecuteWatchResponse, org.elasticsearch.client.watcher.ExecuteWatchResponse> {
+
+    @Override
+    public org.elasticsearch.client.watcher.ExecuteWatchResponse doHlrcParseInstance(XContentParser parser) throws IOException {
+        return org.elasticsearch.client.watcher.ExecuteWatchResponse.fromXContent(parser);
+    }
+
+    @Override
+    public ExecuteWatchResponse convertHlrcToInternal(org.elasticsearch.client.watcher.ExecuteWatchResponse instance) {
+        return new ExecuteWatchResponse(instance.getRecordId(), instance.getRecord(), XContentType.JSON);
+    }
+
+    @Override
+    protected ExecuteWatchResponse createTestInstance() {
+        String id = "my_watch_0-2015-06-02T23:17:55.124Z";
+        try {
+            XContentBuilder builder = XContentFactory.jsonBuilder();
+            builder.startObject();
+            builder.field("watch_id", "my_watch");
+            builder.field("node", "my_node");
+            builder.startArray("messages");
+            builder.endArray();
+            builder.startObject("trigger_event");
+            builder.field("type", "manual");
+            builder.endObject();
+            builder.field("state", "executed");
+            builder.endObject();
+            BytesReference bytes = BytesReference.bytes(builder);
+            return new ExecuteWatchResponse(id, bytes, XContentType.JSON);
+        }
+        catch (IOException e) {
+            throw new AssertionError(e);
+        }
+    }
+
+    @Override
+    protected ExecuteWatchResponse doParseInstance(XContentParser parser) throws IOException {
+        return ExecuteWatchResponse.fromXContent(parser);
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return false;
+    }
+}