Prechádzať zdrojové kódy

Add query watches api to retrieve multiple watches. (#64582)

This api supports pagination (from / size) and
querying and sorting by watch _id and watcher metadata.

This avoids using .watch index directly.
On a per watch basis the same information that the get watch api returns is returned,
except version.

Relates #62501
Martijn van Groningen 4 rokov pred
rodič
commit
3adb6d8aee
14 zmenil súbory, kde vykonal 1109 pridanie a 10 odobranie
  1. 4 1
      x-pack/docs/en/rest-api/watcher.asciidoc
  2. 131 0
      x-pack/docs/en/rest-api/watcher/query-watches.asciidoc
  3. 345 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/QueryWatchesAction.java
  4. 1 0
      x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java
  5. 25 0
      x-pack/plugin/src/test/resources/rest-api-spec/api/watcher.query_watches.json
  6. 93 0
      x-pack/plugin/watcher/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/watcher/query_watches/10_basic.yml
  7. 1 3
      x-pack/plugin/watcher/qa/with-monitoring/src/javaRestTest/java/org/elasticsearch/smoketest/MonitoringWithWatcherRestIT.java
  8. 127 0
      x-pack/plugin/watcher/src/internalClusterTest/java/org/elasticsearch/xpack/watcher/test/integration/BasicWatcherTests.java
  9. 6 1
      x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java
  10. 45 0
      x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestQueryWatchesAction.java
  11. 113 0
      x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportQueryWatchesAction.java
  12. 9 5
      x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java
  13. 80 0
      x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/action/QueryWatchesRequestTests.java
  14. 129 0
      x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/action/QueryWatchesResponseTests.java

+ 4 - 1
x-pack/docs/en/rest-api/watcher.asciidoc

@@ -4,6 +4,7 @@
 
 * <<watcher-api-put-watch>>
 * <<watcher-api-get-watch>>
+* <<watcher-api-query-watches>>
 * <<watcher-api-delete-watch>>
 * <<watcher-api-execute-watch>>
 * <<watcher-api-ack-watch>>
@@ -26,9 +27,11 @@ include::watcher/execute-watch.asciidoc[]
 //GET
 include::watcher/get-watch.asciidoc[]
 include::watcher/stats.asciidoc[]
+//QUERY
+include::watcher/query-watches.asciidoc[]
 //PUT
 include::watcher/put-watch.asciidoc[]
 //START
 include::watcher/start.asciidoc[]
 //STOP
-include::watcher/stop.asciidoc[]
+include::watcher/stop.asciidoc[]

+ 131 - 0
x-pack/docs/en/rest-api/watcher/query-watches.asciidoc

@@ -0,0 +1,131 @@
+[role="xpack"]
+[[watcher-api-query-watches]]
+=== Query watches API
+++++
+<titleabbrev>Query watches</titleabbrev>
+++++
+
+Retrieves all registered watches.
+
+[[watcher-api-query-watches-request]]
+==== {api-request-title}
+
+`GET /_watcher/_query/watches`
+
+[[watcher-api-query-watches-prereqs]]
+==== {api-prereq-title}
+
+* You must have `manage_watcher` or `monitor_watcher` cluster privileges to use
+this API. For more information, see <<security-privileges>>.
+
+//[[watcher-api-query-watches-desc]]
+//==== {api-description-title}
+
+Retrieves all watches in a paginated manner and
+optionally filtering watches by a query.
+
+//[[watcher-api-query-watches-request-body]]
+//==== {api-request-body-title}
+
+This API supports the following fields:
+
+[cols=",^,^,", options="header"]
+|======
+| Name              | Required | Default  | Description
+
+| `from`            | no       | 0        | The offset from the first result to fetch. Needs to be non-negative.
+
+| `size`            | no       | 10       | The number of hits to return. Needs to be non-negative.
+
+| `query`           | no       | null     | Optional, <<query-dsl,query>> filter  watches to be returned.
+
+| `sort`            | no       | null     | Optional <<search-request-sort,sort definition>>.
+
+| `search_after`    | no       | null     | Optional <<search-request-search-after,search After>> to do pagination
+                                            using last hit's sort values.
+|======
+
+Note that only the `_id` and `metadata.*` fields are queryable or sortable.
+
+//[[watcher-api-query-watches-response-body]]
+//==== {api-response-body-title}
+
+This api returns the following top level fields:
+
+`count`::
+    The total number of watches found.
+
+`watches`::
+    A list of watches based on the `from`, `size` or `search_after` request body parameters.
+
+[[watcher-api-query-watches-example]]
+==== {api-examples-title}
+
+The following example list all stored watches:
+
+[source,console]
+--------------------------------------------------
+GET /_watcher/_query/watches
+--------------------------------------------------
+// TEST[setup:my_active_watch]
+
+Response:
+
+[source,console-result]
+--------------------------------------------------
+{
+    "count": 1,
+    "watches": [
+        {
+            "_id": "my_watch",
+            "watch": {
+                "trigger": {
+                    "schedule": {
+                        "hourly": {
+                            "minute": [
+                                0,
+                                5
+                            ]
+                        }
+                    }
+                },
+                "input": {
+                    "simple": {
+                        "payload": {
+                            "send": "yes"
+                        }
+                    }
+                },
+                "condition": {
+                    "always": {}
+                },
+                "actions": {
+                    "test_index": {
+                        "index": {
+                            "index": "test"
+                        }
+                    }
+                }
+            },
+            "status": {
+                "state": {
+                    "active": true,
+                    "timestamp": "2015-05-26T18:21:08.630Z"
+                },
+                "actions": {
+                    "test_index": {
+                        "ack": {
+                            "timestamp": "2015-05-26T18:21:08.630Z",
+                            "state": "awaits_successful_execution"
+                        }
+                    }
+                },
+                "version": -1
+            },
+            "_seq_no": 0,
+            "_primary_term": 1
+        }
+    ]
+}
+--------------------------------------------------
+// TESTRESPONSE[s/"timestamp": "2015-05-26T18:21:08.630Z"/"timestamp": "$body.watches.0.status.state.timestamp"/]

+ 345 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/QueryWatchesAction.java

@@ -0,0 +1,345 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.watcher.transport.actions;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.ToXContentFragment;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentParser.Token;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.searchafter.SearchAfterBuilder;
+import org.elasticsearch.search.sort.FieldSortBuilder;
+import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
+import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
+import static org.elasticsearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;
+
+public class QueryWatchesAction extends ActionType<QueryWatchesAction.Response> {
+
+    public static final QueryWatchesAction INSTANCE = new QueryWatchesAction();
+    public static final String NAME = "cluster:monitor/xpack/watcher/watch/query";
+
+    private QueryWatchesAction() {
+        super(NAME, Response::new);
+    }
+
+    public static class Request extends ActionRequest implements ToXContentObject {
+
+        public static final ParseField FROM_FIELD = new ParseField("from");
+        public static final ParseField SIZE_FIELD = new ParseField("size");
+        public static final ParseField QUERY_FIELD = new ParseField("query");
+        public static final ParseField SORT_FIELD = new ParseField("sort");
+        public static final ParseField SEARCH_AFTER_FIELD = new ParseField("search_after");
+
+        @SuppressWarnings("unchecked")
+        private static final ConstructingObjectParser<Request, Void> PARSER = new ConstructingObjectParser<>(
+            "query_watches_request",
+            true,
+            (args, c) -> {
+                Integer from = (Integer) args[0];
+                Integer size = (Integer) args[1];
+                QueryBuilder query = (QueryBuilder) args[2];
+                List<FieldSortBuilder> sort = (List<FieldSortBuilder>) args[3];
+                SearchAfterBuilder searchAfter = (SearchAfterBuilder) args[4];
+                return new Request(from, size, query, sort, searchAfter);
+            }
+        );
+
+        static {
+            PARSER.declareInt(optionalConstructorArg(), FROM_FIELD);
+            PARSER.declareInt(optionalConstructorArg(), SIZE_FIELD);
+            PARSER.declareObject(optionalConstructorArg(), (p, c) -> parseInnerQueryBuilder(p), QUERY_FIELD);
+            PARSER.declareObjectArray(optionalConstructorArg(), (p, c) -> {
+                String fieldName = null;
+                FieldSortBuilder result = null;
+                for (Token token = p.nextToken(); token != Token.END_OBJECT; token = p.nextToken()) {
+                    if (token == Token.FIELD_NAME) {
+                        fieldName = p.currentName();
+                    } else {
+                        result = FieldSortBuilder.fromXContent(p, fieldName);
+                    }
+                }
+                return result;
+            }, SORT_FIELD);
+            PARSER.declareField(optionalConstructorArg(), (p, c) -> SearchAfterBuilder.fromXContent(p), SEARCH_AFTER_FIELD,
+                ObjectParser.ValueType.VALUE_ARRAY);
+        }
+
+        public static Request fromXContent(XContentParser parser) throws IOException {
+            return PARSER.parse(parser, null);
+        }
+
+        private final Integer from;
+        private final Integer size;
+        private final QueryBuilder query;
+        private final List<FieldSortBuilder> sorts;
+        private final SearchAfterBuilder searchAfter;
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+            from = in.readOptionalVInt();
+            size = in.readOptionalVInt();
+            query = in.readOptionalNamedWriteable(QueryBuilder.class);
+            if (in.readBoolean()) {
+                sorts = in.readList(FieldSortBuilder::new);
+            } else {
+                sorts = null;
+            }
+            searchAfter = in.readOptionalWriteable(SearchAfterBuilder::new);
+        }
+
+        public Request(Integer from,
+                       Integer size,
+                       QueryBuilder query,
+                       List<FieldSortBuilder> sorts,
+                       SearchAfterBuilder searchAfter) {
+            this.from = from;
+            this.size = size;
+            this.query = query;
+            this.sorts = sorts;
+            this.searchAfter = searchAfter;
+        }
+
+        public Integer getFrom() {
+            return from;
+        }
+
+        public Integer getSize() {
+            return size;
+        }
+
+        public QueryBuilder getQuery() {
+            return query;
+        }
+
+        public List<FieldSortBuilder> getSorts() {
+            return sorts;
+        }
+
+        public SearchAfterBuilder getSearchAfter() {
+            return searchAfter;
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            return null;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeOptionalVInt(from);
+            out.writeOptionalVInt(size);
+            out.writeOptionalNamedWriteable(query);
+            if (sorts != null) {
+                out.writeBoolean(true);
+                out.writeList(sorts);
+            } else {
+                out.writeBoolean(false);
+            }
+            out.writeOptionalWriteable(searchAfter);
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
+            builder.startObject();
+            if (from != null) {
+                builder.field(FROM_FIELD.getPreferredName(), from);
+            }
+            if (size != null) {
+                builder.field(SIZE_FIELD.getPreferredName(), size);
+            }
+            if (query != null) {
+                builder.field(QUERY_FIELD.getPreferredName(), query);
+            }
+            if (sorts != null) {
+                builder.startArray(SORT_FIELD.getPreferredName());
+                for (FieldSortBuilder sort : sorts) {
+                    sort.toXContent(builder, params);
+                }
+                builder.endArray();
+            }
+            if (searchAfter != null) {
+                builder.array(SEARCH_AFTER_FIELD.getPreferredName(), searchAfter.getSortValues());
+            }
+            return builder.endObject();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Request request = (Request) o;
+            return Objects.equals(from, request.from) &&
+                Objects.equals(size, request.size) &&
+                Objects.equals(query, request.query) &&
+                Objects.equals(sorts, request.sorts) &&
+                Objects.equals(searchAfter, request.searchAfter);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(from, size, query, sorts, searchAfter);
+        }
+    }
+
+    public static class Response extends ActionResponse implements ToXContentObject {
+
+        private final List<Item> watches;
+        private final long watchTotalCount;
+
+        public Response(long watchTotalCount, List<Item> watches) {
+            this.watches = watches;
+            this.watchTotalCount = watchTotalCount;
+        }
+
+        public Response(StreamInput in) throws IOException {
+            super(in);
+            watches = in.readList(Item::new);
+            watchTotalCount = in.readVLong();
+        }
+
+        public List<Item> getWatches() {
+            return watches;
+        }
+
+        public long getWatchTotalCount() {
+            return watchTotalCount;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeList(watches);
+            out.writeVLong(watchTotalCount);
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject();
+            builder.field("count", watchTotalCount);
+            builder.startArray("watches");
+            for (Item watch : watches) {
+                builder.startObject();
+                watch.toXContent(builder, params);
+                builder.endObject();
+            }
+            builder.endArray();
+            return builder.endObject();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Response response = (Response) o;
+            return watchTotalCount == response.watchTotalCount &&
+                watches.equals(response.watches);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(watches, watchTotalCount);
+        }
+
+        public static class Item implements Writeable, ToXContentFragment {
+
+            private final String id;
+            private final XContentSource source;
+            private final WatchStatus status;
+            private final long seqNo;
+            private final long primaryTerm;
+
+            public Item(String id, XContentSource source, WatchStatus status, long seqNo, long primaryTerm) {
+                this.id = id;
+                this.source = source;
+                this.status = status;
+                this.seqNo = seqNo;
+                this.primaryTerm = primaryTerm;
+            }
+
+            public String getId() {
+                return id;
+            }
+
+            public XContentSource getSource() {
+                return source;
+            }
+
+            public WatchStatus getStatus() {
+                return status;
+            }
+
+            public long getSeqNo() {
+                return seqNo;
+            }
+
+            public long getPrimaryTerm() {
+                return primaryTerm;
+            }
+
+            public Item(StreamInput in) throws IOException {
+                id = in.readString();
+                source = XContentSource.readFrom(in);
+                status = new WatchStatus(in);
+                seqNo = in.readZLong();
+                primaryTerm = in.readVLong();
+            }
+
+            @Override
+            public void writeTo(StreamOutput out) throws IOException {
+                out.writeString(id);
+                XContentSource.writeTo(source, out);
+                status.writeTo(out);
+                out.writeZLong(seqNo);
+                out.writeVLong(primaryTerm);
+            }
+
+            @Override
+            public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+                builder.field("_id", id);
+                builder.field("watch", source, params);
+                builder.field("status", status,  params);
+                builder.field("_seq_no", seqNo);
+                builder.field("_primary_term", primaryTerm);
+                return builder;
+            }
+
+            @Override
+            public boolean equals(Object o) {
+                if (this == o) return true;
+                if (o == null || getClass() != o.getClass()) return false;
+                Item item = (Item) o;
+                return seqNo == item.seqNo &&
+                    primaryTerm == item.primaryTerm &&
+                    id.equals(item.id) &&
+                    source.equals(item.source);
+            }
+
+            @Override
+            public int hashCode() {
+                return Objects.hash(id, source, seqNo, primaryTerm);
+            }
+        }
+    }
+
+}

+ 1 - 0
x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

@@ -310,6 +310,7 @@ public class Constants {
         "cluster:monitor/xpack/usage/watcher",
         "cluster:monitor/xpack/watcher/stats/dist",
         "cluster:monitor/xpack/watcher/watch/get",
+        "cluster:monitor/xpack/watcher/watch/query",
         "indices:admin/aliases",
         "indices:admin/aliases/get",
         "indices:admin/analyze",

+ 25 - 0
x-pack/plugin/src/test/resources/rest-api-spec/api/watcher.query_watches.json

@@ -0,0 +1,25 @@
+{
+  "watcher.query_watches":{
+    "documentation":{
+      "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-query-watches.html",
+      "description":"Retrieves stored watches."
+    },
+    "stability":"stable",
+    "url":{
+      "paths":[
+        {
+          "path":"/_watcher/_query/watches",
+          "methods":[
+            "GET",
+            "POST"
+          ]
+        }
+      ]
+    },
+    "params":{},
+    "body":{
+      "description":"From, size, query, sort and search_after",
+      "required":false
+    }
+  }
+}

+ 93 - 0
x-pack/plugin/watcher/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/watcher/query_watches/10_basic.yml

@@ -0,0 +1,93 @@
+---
+setup:
+  - do:
+      cluster.health:
+          wait_for_status: yellow
+
+---
+teardown:
+  - do:
+      watcher.delete_watch:
+        id: "my_watch1"
+        ignore: 404
+  - do:
+      watcher.delete_watch:
+        id: "my_watch2"
+        ignore: 404
+
+---
+"Test query watches api":
+  - skip:
+      features: warnings
+  - do:
+      watcher.put_watch:
+        id: "my_watch1"
+        body:  >
+          {
+            "trigger": {
+              "schedule": {
+                "hourly": {
+                  "minute": [ 0, 5 ]
+                  }
+                }
+            },
+            "input": {
+              "simple": {
+                "payload": {
+                  "send": "yes"
+                }
+              }
+            },
+            "condition": {
+              "always": {}
+            },
+            "actions": {
+                "test_index": {
+                  "index": {
+                    "index": "test"
+                  }
+                }
+              }
+            }
+  - match: { _id: "my_watch1" }
+  - match: { created: true }
+
+  - do:
+      watcher.put_watch:
+        id: "my_watch2"
+        body:  >
+          {
+            "trigger": {
+              "schedule": {
+                "hourly": {
+                  "minute": [ 0, 5 ]
+                  }
+                }
+            },
+            "input": {
+              "simple": {
+                "payload": {
+                  "send": "yes"
+                }
+              }
+            },
+            "condition": {
+              "always": {}
+            },
+            "actions": {
+                "test_index": {
+                  "index": {
+                    "index": "test"
+                  }
+                }
+              }
+            }
+  - match: { _id: "my_watch2" }
+  - match: { created: true }
+
+  - do:
+      watcher.query_watches:
+        body: {}
+  - match: { count: 2}
+  - match: { watches.0._id: "my_watch1" }
+  - match: { watches.1._id: "my_watch2" }

+ 1 - 3
x-pack/plugin/watcher/qa/with-monitoring/src/javaRestTest/java/org/elasticsearch/smoketest/MonitoringWithWatcherRestIT.java

@@ -102,9 +102,7 @@ public class MonitoringWithWatcherRestIT extends ESRestTestCase {
     private void assertTotalWatchCount(int expectedWatches) throws Exception {
         assertBusy(() -> {
             refreshAllIndices();
-            final Request countRequest = new Request("POST", "/.watches/_count");
-            countRequest.setOptions(expectWarnings("this request accesses system indices: [.watches], but in a future major " +
-                "version, direct access to system indices will be prevented by default"));
+            final Request countRequest = new Request("POST", "/_watcher/_query/watches");
             ObjectPath path = ObjectPath.createFromResponse(client().performRequest(countRequest));
             int count = path.evaluate("count");
             assertThat(count, is(expectedWatches));

+ 127 - 0
x-pack/plugin/watcher/src/internalClusterTest/java/org/elasticsearch/xpack/watcher/test/integration/BasicWatcherTests.java

@@ -6,18 +6,24 @@
 package org.elasticsearch.xpack.watcher.test.integration;
 
 import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.TermQueryBuilder;
 import org.elasticsearch.protocol.xpack.watcher.DeleteWatchResponse;
 import org.elasticsearch.protocol.xpack.watcher.PutWatchResponse;
 import org.elasticsearch.script.Script;
 import org.elasticsearch.script.ScriptType;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.searchafter.SearchAfterBuilder;
+import org.elasticsearch.search.sort.FieldSortBuilder;
 import org.elasticsearch.xpack.core.watcher.client.WatchSourceBuilder;
 import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
+import org.elasticsearch.xpack.core.watcher.transport.actions.QueryWatchesAction;
 import org.elasticsearch.xpack.core.watcher.transport.actions.delete.DeleteWatchRequestBuilder;
 import org.elasticsearch.xpack.core.watcher.transport.actions.get.GetWatchRequestBuilder;
 import org.elasticsearch.xpack.core.watcher.transport.actions.get.GetWatchResponse;
@@ -35,6 +41,8 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.support.WeekTimes;
 import java.time.Clock;
 import java.time.ZonedDateTime;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@@ -375,4 +383,123 @@ public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {
         timeWarp().trigger(watchName);
         assertWatchWithMinimumPerformedActionsCount(watchName, 1);
     }
+
+    public void testQueryWatches() {
+        int numWatches = 6;
+        for (int i = 0; i < numWatches; i++) {
+            PutWatchResponse putWatchResponse = new PutWatchRequestBuilder(client()).setId("" + i)
+                .setSource(watchBuilder()
+                    .trigger(schedule(interval(1, IntervalSchedule.Interval.Unit.DAYS)))
+                    .addAction("_logger", loggingAction("log me"))
+                    .metadata(Map.of("key1", i, "key2", numWatches - i)))
+                .get();
+            assertThat(putWatchResponse.isCreated(), is(true));
+        }
+        refresh();
+
+        QueryWatchesAction.Request request =
+            new QueryWatchesAction.Request(0, 2, null, List.of(new FieldSortBuilder("metadata.key1")), null);
+        QueryWatchesAction.Response response = client().execute(QueryWatchesAction.INSTANCE, request).actionGet();
+        assertThat(response.getWatchTotalCount(), equalTo((long) numWatches));
+        assertThat(response.getWatches().size(), equalTo(2));
+        assertThat(response.getWatches().get(0).getId(), equalTo("0"));
+        Map<?, ?> watcherMetadata = (Map<?, ?>) response.getWatches().get(0).getSource().getAsMap().get("metadata");
+        assertThat(watcherMetadata.get("key2"), equalTo(6));
+        assertThat(response.getWatches().get(1).getId(), equalTo("1"));
+        watcherMetadata = (Map<?, ?>) response.getWatches().get(1).getSource().getAsMap().get("metadata");
+        assertThat(watcherMetadata.get("key2"), equalTo(5));
+
+        request = new QueryWatchesAction.Request(2, 2, null, List.of(new FieldSortBuilder("metadata.key1")), null);
+        response = client().execute(QueryWatchesAction.INSTANCE, request).actionGet();
+        assertThat(response.getWatchTotalCount(), equalTo((long) numWatches));
+        assertThat(response.getWatches().size(), equalTo(2));
+        assertThat(response.getWatches().get(0).getId(), equalTo("2"));
+        watcherMetadata = (Map<?, ?>) response.getWatches().get(0).getSource().getAsMap().get("metadata");
+        assertThat(watcherMetadata.get("key2"), equalTo(4));
+        assertThat(response.getWatches().get(1).getId(), equalTo("3"));
+        watcherMetadata = (Map<?, ?>) response.getWatches().get(1).getSource().getAsMap().get("metadata");
+        assertThat(watcherMetadata.get("key2"), equalTo(3));
+
+        request = new QueryWatchesAction.Request(null, null, new TermQueryBuilder("_id", "4"), null, null);
+        response = client().execute(QueryWatchesAction.INSTANCE, request).actionGet();
+        assertThat(response.getWatchTotalCount(), equalTo(1L));
+        assertThat(response.getWatches().size(), equalTo(1));
+        assertThat(response.getWatches().get(0).getId(), equalTo("4"));
+        watcherMetadata = (Map<?, ?>) response.getWatches().get(0).getSource().getAsMap().get("metadata");
+        assertThat(watcherMetadata.get("key1"), equalTo(4));
+        assertThat(watcherMetadata.get("key2"), equalTo(2));
+
+        request = new QueryWatchesAction.Request(4, 2, null, List.of(new FieldSortBuilder("metadata.key2")), null);
+        response = client().execute(QueryWatchesAction.INSTANCE, request).actionGet();
+        assertThat(response.getWatchTotalCount(), equalTo((long) numWatches));
+        assertThat(response.getWatches().size(), equalTo(2));
+        assertThat(response.getWatches().get(0).getId(), equalTo("1"));
+        watcherMetadata = (Map<?, ?>) response.getWatches().get(0).getSource().getAsMap().get("metadata");
+        assertThat(watcherMetadata.get("key1"), equalTo(1));
+        assertThat(watcherMetadata.get("key2"), equalTo(5));
+        assertThat(response.getWatches().get(1).getId(), equalTo("0"));
+        watcherMetadata = (Map<?, ?>) response.getWatches().get(1).getSource().getAsMap().get("metadata");
+        assertThat(watcherMetadata.get("key1"), equalTo(0));
+        assertThat(watcherMetadata.get("key2"), equalTo(6));
+    }
+
+    public void testQueryWatchesNoWatches() {
+        QueryWatchesAction.Request request = new QueryWatchesAction.Request(null, null, null, null, null);
+        QueryWatchesAction.Response response = client().execute(QueryWatchesAction.INSTANCE, request).actionGet();
+        assertThat(response.getWatchTotalCount(), equalTo(0L));
+        assertThat(response.getWatches().size(), equalTo(0));
+
+        // Even if there is no .watches index this api should work and return 0 watches.
+        DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest("*");
+        deleteIndexRequest.indicesOptions(IndicesOptions.lenientExpandOpenHidden());
+        client().admin().indices().delete(deleteIndexRequest).actionGet();
+        request = new QueryWatchesAction.Request(null, null, null, null, null);
+        response = client().execute(QueryWatchesAction.INSTANCE, request).actionGet();
+        assertThat(response.getWatchTotalCount(), equalTo(0L));
+        assertThat(response.getWatches().size(), equalTo(0));
+    }
+
+    public void testQueryWatchesSearchAfter() {
+        int numWatches = 6;
+        for (int i = 0; i < numWatches; i++) {
+            PutWatchResponse putWatchResponse = new PutWatchRequestBuilder(client()).setId("" + i)
+                .setSource(watchBuilder()
+                    .trigger(schedule(interval(1, IntervalSchedule.Interval.Unit.DAYS)))
+                    .addAction("_logger", loggingAction("log me"))
+                    .metadata(Map.of("_id", i)))
+                .get();
+            assertThat(putWatchResponse.isCreated(), is(true));
+        }
+        refresh();
+
+        QueryWatchesAction.Request request =
+            new QueryWatchesAction.Request(0, 2, null, List.of(new FieldSortBuilder("metadata._id")), null);
+        QueryWatchesAction.Response response = client().execute(QueryWatchesAction.INSTANCE, request).actionGet();
+        assertThat(response.getWatchTotalCount(), equalTo((long) numWatches));
+        assertThat(response.getWatches().size(), equalTo(2));
+        assertThat(response.getWatches().get(0).getId(), equalTo("0"));
+        assertThat(response.getWatches().get(1).getId(), equalTo("1"));
+
+        request = new QueryWatchesAction.Request(0, 2, null, List.of(new FieldSortBuilder("metadata._id")),
+            new SearchAfterBuilder().setSortValues(new Object[]{"1"}));
+        response = client().execute(QueryWatchesAction.INSTANCE, request).actionGet();
+        assertThat(response.getWatchTotalCount(), equalTo((long) numWatches));
+        assertThat(response.getWatches().size(), equalTo(2));
+        assertThat(response.getWatches().get(0).getId(), equalTo("2"));
+        assertThat(response.getWatches().get(1).getId(), equalTo("3"));
+
+        request = new QueryWatchesAction.Request(0, 2, null, List.of(new FieldSortBuilder("metadata._id")),
+            new SearchAfterBuilder().setSortValues(new Object[]{"3"}));
+        response = client().execute(QueryWatchesAction.INSTANCE, request).actionGet();
+        assertThat(response.getWatchTotalCount(), equalTo((long) numWatches));
+        assertThat(response.getWatches().size(), equalTo(2));
+        assertThat(response.getWatches().get(0).getId(), equalTo("4"));
+        assertThat(response.getWatches().get(1).getId(), equalTo("5"));
+
+        request = new QueryWatchesAction.Request(0, 2, null, List.of(new FieldSortBuilder("metadata._id")),
+            new SearchAfterBuilder().setSortValues(new Object[]{"5"}));
+        response = client().execute(QueryWatchesAction.INSTANCE, request).actionGet();
+        assertThat(response.getWatchTotalCount(), equalTo((long) numWatches));
+        assertThat(response.getWatches().size(), equalTo(0));
+    }
 }

+ 6 - 1
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java

@@ -71,6 +71,7 @@ import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField;
 import org.elasticsearch.xpack.core.watcher.history.HistoryStoreField;
 import org.elasticsearch.xpack.core.watcher.input.none.NoneInput;
 import org.elasticsearch.xpack.core.watcher.transform.TransformRegistry;
+import org.elasticsearch.xpack.core.watcher.transport.actions.QueryWatchesAction;
 import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchAction;
 import org.elasticsearch.xpack.core.watcher.transport.actions.activate.ActivateWatchAction;
 import org.elasticsearch.xpack.core.watcher.transport.actions.delete.DeleteWatchAction;
@@ -143,6 +144,7 @@ import org.elasticsearch.xpack.watcher.rest.action.RestActivateWatchAction.Deact
 import org.elasticsearch.xpack.watcher.rest.action.RestDeleteWatchAction;
 import org.elasticsearch.xpack.watcher.rest.action.RestExecuteWatchAction;
 import org.elasticsearch.xpack.watcher.rest.action.RestGetWatchAction;
+import org.elasticsearch.xpack.watcher.rest.action.RestQueryWatchesAction;
 import org.elasticsearch.xpack.watcher.rest.action.RestPutWatchAction;
 import org.elasticsearch.xpack.watcher.rest.action.RestWatchServiceAction;
 import org.elasticsearch.xpack.watcher.rest.action.RestWatcherStatsAction;
@@ -153,6 +155,7 @@ import org.elasticsearch.xpack.watcher.transform.script.ScriptTransformFactory;
 import org.elasticsearch.xpack.watcher.transform.script.WatcherTransformScript;
 import org.elasticsearch.xpack.watcher.transform.search.SearchTransform;
 import org.elasticsearch.xpack.watcher.transform.search.SearchTransformFactory;
+import org.elasticsearch.xpack.watcher.transport.actions.TransportQueryWatchesAction;
 import org.elasticsearch.xpack.watcher.transport.actions.TransportAckWatchAction;
 import org.elasticsearch.xpack.watcher.transport.actions.TransportActivateWatchAction;
 import org.elasticsearch.xpack.watcher.transport.actions.TransportDeleteWatchAction;
@@ -555,6 +558,7 @@ public class Watcher extends Plugin implements SystemIndexPlugin, ScriptPlugin,
                 new ActionHandler<>(ActivateWatchAction.INSTANCE, TransportActivateWatchAction.class),
                 new ActionHandler<>(WatcherServiceAction.INSTANCE, TransportWatcherServiceAction.class),
                 new ActionHandler<>(ExecuteWatchAction.INSTANCE, TransportExecuteWatchAction.class),
+                new ActionHandler<>(QueryWatchesAction.INSTANCE, TransportQueryWatchesAction.class),
                 usageAction,
                 infoAction);
     }
@@ -576,7 +580,8 @@ public class Watcher extends Plugin implements SystemIndexPlugin, ScriptPlugin,
                 new RestAckWatchAction(),
                 new RestActivateWatchAction(),
                 new DeactivateRestHandler(),
-                new RestExecuteWatchAction());
+                new RestExecuteWatchAction(),
+                new RestQueryWatchesAction());
     }
 
     @Override

+ 45 - 0
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestQueryWatchesAction.java

@@ -0,0 +1,45 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.watcher.rest.action;
+
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.xpack.core.watcher.transport.actions.QueryWatchesAction;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.elasticsearch.rest.RestRequest.Method.GET;
+import static org.elasticsearch.rest.RestRequest.Method.POST;
+
+public class RestQueryWatchesAction extends BaseRestHandler {
+
+    @Override
+    public List<Route> routes() {
+        return List.of(
+            new Route(GET, "/_watcher/_query/watches"),
+            new Route(POST, "/_watcher/_query/watches")
+        );
+    }
+
+    @Override
+    public String getName() {
+        return "watcher_query_watches";
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(final RestRequest request, NodeClient client) throws IOException {
+        final QueryWatchesAction.Request queryWatchesRequest;
+        if (request.hasContentOrSourceParam()) {
+            queryWatchesRequest = QueryWatchesAction.Request.fromXContent(request.contentOrSourceParamParser());
+        } else {
+            queryWatchesRequest = new QueryWatchesAction.Request(null, null, null, null, null);
+        }
+        return channel -> client.execute(QueryWatchesAction.INSTANCE, queryWatchesRequest, new RestToXContentListener<>(channel));
+    }
+}

+ 113 - 0
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportQueryWatchesAction.java

@@ -0,0 +1,113 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.watcher.transport.actions;
+
+import org.apache.lucene.search.TotalHits;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.sort.FieldSortBuilder;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.watcher.support.xcontent.WatcherParams;
+import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
+import org.elasticsearch.xpack.core.watcher.transport.actions.QueryWatchesAction;
+import org.elasticsearch.xpack.core.watcher.watch.Watch;
+import org.elasticsearch.xpack.watcher.ClockHolder;
+import org.elasticsearch.xpack.watcher.watch.WatchParser;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Clock;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN;
+import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
+
+public class TransportQueryWatchesAction extends WatcherTransportAction<QueryWatchesAction.Request, QueryWatchesAction.Response> {
+
+    private final Clock clock;
+    private final Client client;
+    private final WatchParser parser;
+
+    @Inject
+    public TransportQueryWatchesAction(TransportService transportService, ActionFilters actionFilters, XPackLicenseState licenseState,
+                                       ClockHolder clockHolder, Client client, WatchParser parser) {
+        super(QueryWatchesAction.NAME, transportService, actionFilters, licenseState, QueryWatchesAction.Request::new);
+        this.clock = clockHolder.clock;
+        this.client = client;
+        this.parser = parser;
+    }
+
+    @Override
+    protected void doExecute(QueryWatchesAction.Request request, ActionListener<QueryWatchesAction.Response> listener) {
+        SearchRequest searchRequest = createSearchRequest(request);
+        executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, searchRequest,
+            ActionListener.<SearchResponse>wrap(r -> transformResponse(r, listener), listener::onFailure), client::search);
+    }
+
+    SearchRequest createSearchRequest(QueryWatchesAction.Request request) {
+        SearchRequest searchRequest = new SearchRequest(Watch.INDEX);
+        if (request.getFrom() != null) {
+            searchRequest.source().from(request.getFrom());
+        }
+        if (request.getSize() != null) {
+            searchRequest.source().size(request.getSize());
+        }
+        if (request.getQuery() != null) {
+            searchRequest.source().query(request.getQuery());
+        }
+        if (request.getSorts() != null) {
+            for (FieldSortBuilder sort : request.getSorts()) {
+                searchRequest.source().sort(sort);
+            }
+        }
+        if (request.getSearchAfter() != null) {
+            searchRequest.source().searchAfter(request.getSearchAfter().getSortValues());
+        }
+        searchRequest.source().trackTotalHits(true);
+        searchRequest.source().seqNoAndPrimaryTerm(true);
+        searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
+        return searchRequest;
+    }
+
+
+    void transformResponse(SearchResponse searchResponse, ActionListener<QueryWatchesAction.Response> listener) {
+        assert searchResponse.getHits().getTotalHits().relation == TotalHits.Relation.EQUAL_TO;
+        List<QueryWatchesAction.Response.Item> items = Arrays.stream(searchResponse.getHits().getHits())
+            .map(this::transformSearchHit)
+            .collect(Collectors.toList());
+        listener.onResponse(new QueryWatchesAction.Response(searchResponse.getHits().getTotalHits().value, items));
+    }
+
+    QueryWatchesAction.Response.Item transformSearchHit(SearchHit searchHit) {
+        ZonedDateTime now = clock.instant().atZone(ZoneOffset.UTC);
+        try (XContentBuilder builder = jsonBuilder()) {
+            Watch watch = parser.parseWithSecrets(searchHit.getId(), true, searchHit.getSourceRef(), now,
+                XContentType.JSON, searchHit.getSeqNo(), searchHit.getPrimaryTerm());
+            watch.toXContent(builder, WatcherParams.builder()
+                .hideSecrets(true)
+                .includeStatus(false)
+                .build());
+            return new QueryWatchesAction.Response.Item(searchHit.getId(), new XContentSource(builder), watch.status(),
+                watch.getSourceSeqNo(), watch.getSourcePrimaryTerm());
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+}

+ 9 - 5
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java

@@ -15,9 +15,11 @@ import org.elasticsearch.common.xcontent.XContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.xpack.core.watcher.actions.ActionStatus;
 import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
+import org.elasticsearch.xpack.core.watcher.actions.throttler.ActionThrottler;
 import org.elasticsearch.xpack.core.watcher.common.secret.Secret;
 import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext;
 import org.elasticsearch.xpack.core.watcher.execution.Wid;
@@ -55,8 +57,8 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
 import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
 import org.hamcrest.Matcher;
 
-import javax.mail.internet.AddressException;
 import java.io.IOException;
+import java.time.Clock;
 import java.time.Instant;
 import java.time.ZoneOffset;
 import java.time.ZonedDateTime;
@@ -72,6 +74,7 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
 import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
 import static org.elasticsearch.test.ESTestCase.randomFrom;
 import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.mock;
 
 public final class WatcherTestUtils {
 
@@ -149,7 +152,8 @@ public final class WatcherTestUtils {
 
 
     public static Watch createTestWatch(String watchName, Client client, HttpClient httpClient, EmailService emailService,
-                                        WatcherSearchTemplateService searchTemplateService, Logger logger) throws AddressException {
+                                        WatcherSearchTemplateService searchTemplateService, Logger logger) {
+        ActionThrottler actionThrottler = new ActionThrottler(Clock.systemUTC(), null, mock(XPackLicenseState.class));
         List<ActionWrapper> actions = new ArrayList<>();
         TextTemplateEngine engine = new MockTextTemplateEngine();
 
@@ -157,8 +161,8 @@ public final class WatcherTestUtils {
         httpRequest.method(HttpMethod.POST);
         httpRequest.path(new TextTemplate("/foobarbaz/{{ctx.watch_id}}"));
         httpRequest.body(new TextTemplate("{{ctx.watch_id}} executed with {{ctx.payload.response.hits.total_hits}} hits"));
-        actions.add(new ActionWrapper("_webhook", null, null, null, new ExecutableWebhookAction(new WebhookAction(httpRequest.build()),
-                logger, httpClient, engine), null, null));
+        actions.add(new ActionWrapper("_webhook", actionThrottler, null, null,
+            new ExecutableWebhookAction(new WebhookAction(httpRequest.build()), logger, httpClient, engine), null, null));
 
 
         EmailTemplate email = EmailTemplate.builder().from("from@test.com").to("to@test.com").build();
@@ -166,7 +170,7 @@ public final class WatcherTestUtils {
         EmailAction action = new EmailAction(email, "testaccount", auth, Profile.STANDARD, null, null);
         ExecutableEmailAction executale = new ExecutableEmailAction(action, logger, emailService, engine,
                 new HtmlSanitizer(Settings.EMPTY), Collections.emptyMap());
-        actions.add(new ActionWrapper("_email", null, null, null, executale, null, null));
+        actions.add(new ActionWrapper("_email", actionThrottler, null, null, executale, null, null));
 
         ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
         Map<String, ActionStatus> statuses = new HashMap<>();

+ 80 - 0
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/action/QueryWatchesRequestTests.java

@@ -0,0 +1,80 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.watcher.transport.action;
+
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchModule;
+import org.elasticsearch.search.searchafter.SearchAfterBuilder;
+import org.elasticsearch.search.sort.FieldSortBuilder;
+import org.elasticsearch.search.sort.SortBuilders;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.test.AbstractSerializingTestCase;
+import org.elasticsearch.xpack.core.watcher.transport.actions.QueryWatchesAction;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class QueryWatchesRequestTests extends AbstractSerializingTestCase<QueryWatchesAction.Request> {
+
+    @Override
+    protected QueryWatchesAction.Request doParseInstance(XContentParser parser) throws IOException {
+        return QueryWatchesAction.Request.fromXContent(parser);
+    }
+
+    @Override
+    protected Writeable.Reader<QueryWatchesAction.Request> instanceReader() {
+        return QueryWatchesAction.Request::new;
+    }
+
+    @Override
+    protected QueryWatchesAction.Request createTestInstance() {
+        QueryBuilder query = null;
+        if (randomBoolean()) {
+            query = QueryBuilders.termQuery(randomAlphaOfLengthBetween(5, 20), randomAlphaOfLengthBetween(5, 20));
+        }
+        List<FieldSortBuilder> sorts = null;
+        if (randomBoolean()) {
+            int numSorts = randomIntBetween(1, 3);
+            sorts = new ArrayList<>(numSorts);
+            for (int i = 0; i < numSorts; i++) {
+                sorts.add(SortBuilders.fieldSort(randomAlphaOfLengthBetween(5, 20)).order(randomFrom(SortOrder.values())));
+            }
+        }
+        SearchAfterBuilder searchAfter = null;
+        if (randomBoolean()) {
+            searchAfter = new SearchAfterBuilder();
+            searchAfter.setSortValues(new Object[]{randomInt()});
+        }
+        return new QueryWatchesAction.Request(
+            randomBoolean() ? randomIntBetween(0, 10000) : null,
+            randomBoolean() ? randomIntBetween(0, 10000) : null,
+            query,
+            sorts,
+            searchAfter
+        );
+    }
+
+    @Override
+    protected NamedWriteableRegistry getNamedWriteableRegistry() {
+        SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList());
+        return new NamedWriteableRegistry(searchModule.getNamedWriteables());
+    }
+
+    @Override
+    protected NamedXContentRegistry xContentRegistry() {
+        SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList());
+        return new NamedXContentRegistry(searchModule.getNamedXContents());
+    }
+}

+ 129 - 0
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/action/QueryWatchesResponseTests.java

@@ -0,0 +1,129 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.watcher.transport.action;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ContextParser;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractSerializingTestCase;
+import org.elasticsearch.xpack.core.watcher.support.xcontent.WatcherParams;
+import org.elasticsearch.xpack.core.watcher.support.xcontent.WatcherXContentParser;
+import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
+import org.elasticsearch.xpack.core.watcher.transport.actions.QueryWatchesAction;
+import org.elasticsearch.xpack.core.watcher.watch.Watch;
+import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
+import org.elasticsearch.xpack.watcher.actions.email.EmailActionTests;
+import org.elasticsearch.xpack.watcher.common.http.HttpClient;
+import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateService;
+import org.elasticsearch.xpack.watcher.test.WatcherTestUtils;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.mock;
+
+public class QueryWatchesResponseTests extends AbstractSerializingTestCase<QueryWatchesAction.Response> {
+
+    private static final ConstructingObjectParser<QueryWatchesAction.Response.Item, Void> TEST_ITEM_PARSER = new ConstructingObjectParser<>(
+        "query_watches_response_item",
+        false,
+        (args, c) -> new QueryWatchesAction.Response.Item((String) args[0], (XContentSource) args[1],
+            (WatchStatus) args[2], (long) args[3], (long) args[4])
+    );
+
+    static {
+        TEST_ITEM_PARSER.declareString(constructorArg(), new ParseField("_id"));
+        TEST_ITEM_PARSER.declareObject(
+            constructorArg(),
+            (p, c) -> new XContentSource(XContentBuilder.builder(p.contentType().xContent()).copyCurrentStructure(p)),
+            new ParseField("watch")
+        );
+        TEST_ITEM_PARSER.declareObject(
+            constructorArg(),
+            (p, c) -> WatchStatus.parse("_not_used", new WatcherXContentParser(p, ZonedDateTime.now(ZoneOffset.UTC), null, false)),
+            new ParseField("status")
+        );
+        TEST_ITEM_PARSER.declareLong(constructorArg(), new ParseField("_seq_no"));
+        TEST_ITEM_PARSER.declareLong(constructorArg(), new ParseField("_primary_term"));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static final ConstructingObjectParser<QueryWatchesAction.Response, Void> TEST_PARSER = new ConstructingObjectParser<>(
+        "query_watches_response",
+        false,
+        (args, c) -> new QueryWatchesAction.Response((long) args[0], (List<QueryWatchesAction.Response.Item>) args[1])
+    );
+
+    static {
+        TEST_PARSER.declareLong(constructorArg(), new ParseField("count"));
+        TEST_PARSER.declareObjectArray(constructorArg(), (ContextParser<Void, Object>) TEST_ITEM_PARSER::parse, new ParseField("watches"));
+    }
+
+    @Override
+    protected QueryWatchesAction.Response doParseInstance(XContentParser parser) throws IOException {
+        return TEST_PARSER.parse(parser, null);
+    }
+
+    @Override
+    protected Writeable.Reader<QueryWatchesAction.Response> instanceReader() {
+        return QueryWatchesAction.Response::new;
+    }
+
+    @Override
+    protected QueryWatchesAction.Response createTestInstance() {
+        int numWatches = randomIntBetween(0, 10);
+        List<QueryWatchesAction.Response.Item> items = new ArrayList<>(numWatches);
+        for (int i = 0; i < numWatches; i++) {
+            Watch watch = createWatch("_id + " + i);
+            try (XContentBuilder builder = jsonBuilder()) {
+                watch.toXContent(builder, WatcherParams.builder()
+                    .hideSecrets(true)
+                    .includeStatus(false)
+                    .build());
+                items.add(new QueryWatchesAction.Response.Item(randomAlphaOfLength(4),
+                    new XContentSource(builder), watch.status(), 1, 0));
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        }
+        return new QueryWatchesAction.Response(numWatches + randomIntBetween(0, 100), items);
+    }
+
+    private Watch createWatch(String watchId)  {
+        return WatcherTestUtils.createTestWatch(watchId,
+            mock(Client.class),
+            mock(HttpClient.class),
+            new EmailActionTests.NoopEmailService(),
+            mock(WatcherSearchTemplateService.class),
+            logger);
+    }
+
+    @Override
+    protected void assertEqualInstances(QueryWatchesAction.Response expectedInstance, QueryWatchesAction.Response newInstance) {
+        assertThat(expectedInstance.getWatchTotalCount(), equalTo(newInstance.getWatchTotalCount()));
+        assertThat(expectedInstance.getWatches().size(), equalTo(newInstance.getWatches().size()));
+        for (int i = 0; i < expectedInstance.getWatches().size(); i++) {
+            QueryWatchesAction.Response.Item expected = expectedInstance.getWatches().get(i);
+            QueryWatchesAction.Response.Item actual = newInstance.getWatches().get(i);
+            assertThat(expected.getId(), equalTo(actual.getId()));
+            assertThat(expected.getSource(), equalTo(actual.getSource()));
+            assertThat(expected.getSeqNo(), equalTo(actual.getSeqNo()));
+            assertThat(expected.getPrimaryTerm(), equalTo(actual.getPrimaryTerm()));
+        }
+    }
+}