Browse Source

Add alias support to fleet search API (#79285)

Currently the fleet search and msearch APIs do not support aliases. This
PR adds support if the alias resolves to a single concrete index.
Tim Brooks 4 years ago
parent
commit
e5c510c64f

+ 2 - 2
docs/reference/fleet/fleet-multi-search.asciidoc

@@ -24,9 +24,9 @@ without prior notice.
 [[fleet-multi-search-api-path-params]]
 ==== {api-path-parms-title}
 
-`<index>`::
+`<target>`::
 (Optional, string)
-A single index. Index aliases are not supported.
+A single target to search. If the target is an index alias, it must resolve to a single index.
 
 [role="child_attributes"]
 [[fleet-multi-search-api-query-parms]]

+ 5 - 4
docs/reference/fleet/fleet-search.asciidoc

@@ -26,7 +26,8 @@ refresh. The checkpoints are indexed by shard.
 If a timeout occurs before the the checkpoint has been refreshed into Elasticsearch,
 the search request will timeout.
 
-The fleet search API only supports searches against a single index.
+The fleet search API only supports searches against a single target. If an index alias
+is supplied as the search target, it must resolve to a single concrete index.
 
 [discrete]
 [[fleet-search-partial-responses]]
@@ -41,14 +42,14 @@ timed out.
 [[fleet-search-api-request]]
 ==== {api-request-title}
 
-`GET /<index>/_fleet/_search`
+`GET /<target>/_fleet/_search`
 
 [[fleet-search-api-path-params]]
 ==== {api-path-parms-title}
 
-`<index>`::
+`<target>`::
 (Required, string)
-A single index. Index aliases are not supported.
+A single target to search. If the target is an index alias, it must resolve to a single index.
 
 [role="child_attributes"]
 [[fleet-search-api-query-parms]]

+ 1 - 0
docs/reference/fleet/index.asciidoc

@@ -15,3 +15,4 @@ agent and action data. These APIs are experimental and for internal use by
 // top-level
 include::get-global-checkpoints.asciidoc[]
 include::fleet-search.asciidoc[]
+include::fleet-multi-search.asciidoc[]

+ 2 - 2
rest-api-spec/src/main/resources/rest-api-spec/api/fleet.msearch.json

@@ -27,8 +27,8 @@
           ],
           "parts":{
             "index":{
-              "type":"list",
-              "description":"A comma-separated list of index names to use as default"
+              "type":"string",
+              "description":"The index name to use as the default"
             }
           }
         }

+ 1 - 1
rest-api-spec/src/main/resources/rest-api-spec/api/fleet.search.json

@@ -21,7 +21,7 @@
           "parts":{
             "index":{
               "type":"string",
-              "description":"The name of the index."
+              "description":"The index name to search."
             }
           }
         }

+ 36 - 7
server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java

@@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
+import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.xcontent.ObjectParser;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.index.IndexSettings;
@@ -59,6 +60,7 @@ import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.test.ESIntegTestCase;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -257,20 +259,47 @@ public class TransportSearchIT extends ESIntegTestCase {
     public void testWaitForRefreshIndexValidation() throws Exception {
         int numberOfShards = randomIntBetween(3, 10);
         assertAcked(prepareCreate("test1").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)));
+        assertAcked(prepareCreate("test2").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)));
+        assertAcked(prepareCreate("test3").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)));
         client().admin().indices().prepareAliases().addAlias("test1", "testAlias").get();
+        client().admin().indices().prepareAliases().addAlias(new String[] {"test2", "test3"}, "testFailedAlias").get();
+
+        long[] validCheckpoints = new long[numberOfShards];
+        Arrays.fill(validCheckpoints, SequenceNumbers.UNASSIGNED_SEQ_NO);
 
         // no exception
-        client().prepareSearch("testAlias").get();
+        client().prepareSearch("testAlias").setWaitForCheckpoints(Collections.singletonMap("testAlias", validCheckpoints)).get();
+
 
         IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
-            () -> client().prepareSearch("testAlias").setWaitForCheckpoints(Collections.singletonMap("testAlias", new long[0])).get());
-        assertThat(e.getMessage(), containsString("Index configured with wait_for_checkpoints must be a concrete index resolved in this " +
-            "search. Index [testAlias] is not a concrete index resolved in this search."));
+            () -> client().prepareSearch("testFailedAlias")
+                .setWaitForCheckpoints(Collections.singletonMap("testFailedAlias", validCheckpoints))
+                .get());
+        assertThat(e.getMessage(), containsString("Failed to resolve wait_for_checkpoints target [testFailedAlias]. Configured target " +
+            "must resolve to a single open index."));
 
         IllegalArgumentException e2 = expectThrows(IllegalArgumentException.class,
-            () -> client().prepareSearch("test1").setWaitForCheckpoints(Collections.singletonMap("test1", new long[2])).get());
-        assertThat(e2.getMessage(), containsString("Index configured with wait_for_checkpoints must search the same number of shards as " +
-            "checkpoints provided. [2] checkpoints provided. Index [test1] has [" + numberOfShards + "] shards."));
+            () -> client().prepareSearch("test1")
+                .setWaitForCheckpoints(Collections.singletonMap("test1", new long[2]))
+                .get());
+        assertThat(e2.getMessage(), containsString("Target configured with wait_for_checkpoints must search the same number of shards as " +
+            "checkpoints provided. [2] checkpoints provided. Target [test1] which resolved to index [test1] has [" + numberOfShards +
+            "] shards."));
+
+        IllegalArgumentException e3 = expectThrows(IllegalArgumentException.class,
+            () -> client().prepareSearch("testAlias")
+                .setWaitForCheckpoints(Collections.singletonMap("testAlias", new long[2]))
+                .get());
+        assertThat(e3.getMessage(), containsString("Target configured with wait_for_checkpoints must search the same number of shards as " +
+            "checkpoints provided. [2] checkpoints provided. Target [testAlias] which resolved to index [test1] has [" + numberOfShards +
+            "] shards."));
+
+        IllegalArgumentException e4 = expectThrows(IllegalArgumentException.class,
+            () -> client().prepareSearch("testAlias")
+                .setWaitForCheckpoints(Collections.singletonMap("test2", validCheckpoints))
+                .get());
+        assertThat(e4.getMessage(), containsString("Target configured with wait_for_checkpoints must be a concrete index resolved in " +
+            "this search. Target [test2] is not a concrete index resolved in this search."));
     }
 
     public void testShardCountLimit() throws Exception {

+ 33 - 10
server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

@@ -9,6 +9,7 @@
 package org.elasticsearch.action.search;
 
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.OriginalIndices;
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
@@ -711,7 +712,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
             if (remoteShardIterators.isEmpty() == false) {
                 throw new IllegalArgumentException("Cannot use wait_for_checkpoints parameter with cross-cluster searches.");
             } else {
-                validateWaitForCheckpoint(clusterState, searchRequest, concreteLocalIndices);
+                validateAndResolveWaitForCheckpoint(clusterState, indexNameExpressionResolver, searchRequest, concreteLocalIndices);
             }
         }
 
@@ -889,24 +890,46 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
         }
     }
 
-    private static void validateWaitForCheckpoint(ClusterState clusterState, SearchRequest searchRequest, String[] concreteLocalIndices) {
+    private static void validateAndResolveWaitForCheckpoint(ClusterState clusterState, IndexNameExpressionResolver resolver,
+                                                            SearchRequest searchRequest, String[] concreteLocalIndices) {
         HashSet<String> searchedIndices = new HashSet<>(Arrays.asList(concreteLocalIndices));
+        HashMap<String, long[]> newWaitForCheckpoints = new HashMap<>(searchRequest.getWaitForCheckpoints().size());
         for (Map.Entry<String, long[]> waitForCheckpointIndex : searchRequest.getWaitForCheckpoints().entrySet()) {
-            int checkpointsProvided = waitForCheckpointIndex.getValue().length;
-            String index = waitForCheckpointIndex.getKey();
+            long[] checkpoints = waitForCheckpointIndex.getValue();
+            int checkpointsProvided = checkpoints.length;
+            String target = waitForCheckpointIndex.getKey();
+            Index resolved;
+            try {
+                resolved = resolver.concreteSingleIndex(clusterState, new IndicesRequest() {
+                    @Override
+                    public String[] indices() {
+                        return new String[] { target };
+                    }
+
+                    @Override
+                    public IndicesOptions indicesOptions() {
+                        return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
+                    }
+                });
+            } catch (Exception e) {
+                throw new IllegalArgumentException("Failed to resolve wait_for_checkpoints target [" + target + "]. Configured target " +
+                    "must resolve to a single open index.", e);
+            }
+            String index = resolved.getName();
             IndexMetadata indexMetadata = clusterState.metadata().index(index);
             if (searchedIndices.contains(index) == false) {
-                throw new IllegalArgumentException("Index configured with wait_for_checkpoints must be a concrete index resolved in " +
-                    "this search. Index [" + index + "] is not a concrete index resolved in this search.");
+                throw new IllegalArgumentException("Target configured with wait_for_checkpoints must be a concrete index resolved in " +
+                    "this search. Target [" + target + "] is not a concrete index resolved in this search.");
             } else if (indexMetadata == null) {
                 throw new IllegalArgumentException("Cannot find index configured for wait_for_checkpoints parameter [" + index + "].");
             } else if (indexMetadata.getNumberOfShards() != checkpointsProvided) {
-                throw new IllegalArgumentException("Index configured with wait_for_checkpoints must search the same number of shards as " +
-                    "checkpoints provided. [" + checkpointsProvided + "] checkpoints provided. Index [" + index + "] has " +
-                    "["  + indexMetadata.getNumberOfShards() + "] shards.");
-
+                throw new IllegalArgumentException("Target configured with wait_for_checkpoints must search the same number of shards as " +
+                    "checkpoints provided. [" + checkpointsProvided + "] checkpoints provided. Target [" + target + "] which resolved to " +
+                    "index [" + index + "] has " + "["  + indexMetadata.getNumberOfShards() + "] shards.");
             }
+            newWaitForCheckpoints.put(index, checkpoints);
         }
+        searchRequest.setWaitForCheckpoints(Collections.unmodifiableMap(newWaitForCheckpoints));
     }
 
     private static void failIfOverShardCountLimit(ClusterService clusterService, int shardCount) {

+ 9 - 3
x-pack/plugin/fleet/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/fleet/20_wait_for_checkpoints.yml

@@ -45,15 +45,17 @@ setup:
   - match: { hits.total.value: 2 }
 
 ---
-"Cannot use alias":
+"Can use alias":
   - do:
-      catch: bad_request
       fleet.search:
         index: "test-alias"
         allow_partial_search_results: false
         wait_for_checkpoints: 1
         body: { query: { match_all: {} } }
 
+  - match: { _shards.successful: 1 }
+  - match: { hits.total.value: 2 }
+
 ---
 "Must provide correct number of checkpoints":
   - do:
@@ -102,9 +104,13 @@ setup:
         body:
           - {"index": "test-after-refresh", "allow_partial_search_results" : false, wait_for_checkpoints: 1}
           - {query: { match_all: {} } }
+          - { "index": "test-alias", "allow_partial_search_results": false, wait_for_checkpoints: 1 }
+          - { query: { match_all: { } } }
           - {"index": "test-refresh-disabled", "allow_partial_search_results":  false, wait_for_checkpoints: 2}
           - {query: { match_all: {} } }
 
   - match: { responses.0._shards.successful: 1 }
   - match: { responses.0.hits.total.value: 2 }
-  - match: { responses.1.error.caused_by.type: "illegal_argument_exception" }
+  - match: { responses.1._shards.successful: 1 }
+  - match: { responses.1.hits.total.value: 2 }
+  - match: { responses.2.error.caused_by.type: "illegal_argument_exception" }