Browse Source

Clarify RemoteClusterService#groupIndices behaviour (#33899)

When executing a cross-cluster search, we need to search against all local indices (and no remote indices) in case no indices are specified. Also, if only remote indices are specified, no local indices will be queried. We previously added empty local indices whenever they were not present in the map of the grouped indices, then we would act differently later based on the extracted remote indices. Instead, we now add the empty array for local indices only in case we need to search all local indices; the entry for local indices is not added when local indices should not be searched. This way the grouped indices reflect reality and provide a better indication of what indices will be searched.
Luca Cavanna 7 years ago
parent
commit
e389d9e296

+ 2 - 3
server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

@@ -70,9 +70,8 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
             request.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
         final OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
         final String[] concreteIndices;
-        if (remoteClusterIndices.isEmpty() == false && localIndices.indices().length == 0) {
-            // in the case we have one or more remote indices but no local we don't expand to all local indices and just do remote
-            // indices
+        if (localIndices == null) {
+            // in the case we have one or more remote indices but no local we don't expand to all local indices and just do remote indices
             concreteIndices = Strings.EMPTY_ARRAY;
         } else {
             concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, localIndices);

+ 18 - 12
server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

@@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -193,7 +194,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
                 searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
             OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
             if (remoteClusterIndices.isEmpty()) {
-                executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteClusterIndices, Collections.emptyList(),
+                executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(),
                     (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY);
             } else {
                 remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(),
@@ -203,7 +204,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
                         BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses,
                             remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
                         SearchResponse.Clusters clusters = buildClusters(localIndices, remoteClusterIndices, searchShardsResponses);
-                        executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteClusterIndices,
+                        executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices,
                             remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener,
                             clusters);
                     }, listener::onFailure));
@@ -219,7 +220,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
 
     static SearchResponse.Clusters buildClusters(OriginalIndices localIndices, Map<String, OriginalIndices> remoteIndices,
                                                  Map<String, ClusterSearchShardsResponse> searchShardsResponses) {
-        int localClusters = Math.min(localIndices.indices().length, 1);
+        int localClusters = localIndices == null ? 0 : 1;
         int totalClusters = remoteIndices.size() + localClusters;
         int successfulClusters = localClusters;
         for (ClusterSearchShardsResponse searchShardsResponse : searchShardsResponses.values()) {
@@ -277,8 +278,19 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
         };
     }
 
-    private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices,
-                               Map<String, OriginalIndices> remoteClusterIndices, List<SearchShardIterator> remoteShardIterators,
+    private Index[] resolveLocalIndices(OriginalIndices localIndices,
+                                IndicesOptions indicesOptions,
+                                ClusterState clusterState,
+                                SearchTimeProvider timeProvider) {
+        if (localIndices == null) {
+            return Index.EMPTY_ARRAY; //don't search on any local index (happens when only remote indices were specified)
+        }
+        return indexNameExpressionResolver.concreteIndices(clusterState, indicesOptions,
+            timeProvider.getAbsoluteStartMillis(), localIndices.indices());
+    }
+
+    private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,
+                               OriginalIndices localIndices, List<SearchShardIterator> remoteShardIterators,
                                BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState,
                                Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener,
                                SearchResponse.Clusters clusters) {
@@ -287,13 +299,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
         // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
         // date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
         // of just for the _search api
-        final Index[] indices;
-        if (localIndices.indices().length == 0 && remoteClusterIndices.isEmpty() == false) {
-            indices = Index.EMPTY_ARRAY; // don't search on _all if only remote indices were specified
-        } else {
-            indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(),
-                timeProvider.getAbsoluteStartMillis(), localIndices.indices());
-        }
+        final Index[] indices = resolveLocalIndices(localIndices, searchRequest.indicesOptions(), clusterState, timeProvider);
         Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);
         Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
             searchRequest.indices());

+ 9 - 7
server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

@@ -262,14 +262,16 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
         Map<String, OriginalIndices> originalIndicesMap = new HashMap<>();
         if (isCrossClusterSearchEnabled()) {
             final Map<String, List<String>> groupedIndices = groupClusterIndices(indices, indexExists);
-            for (Map.Entry<String, List<String>> entry : groupedIndices.entrySet()) {
-                String clusterAlias = entry.getKey();
-                List<String> originalIndices = entry.getValue();
-                originalIndicesMap.put(clusterAlias,
-                        new OriginalIndices(originalIndices.toArray(new String[originalIndices.size()]), indicesOptions));
-            }
-            if (originalIndicesMap.containsKey(LOCAL_CLUSTER_GROUP_KEY) == false) {
+            if (groupedIndices.isEmpty()) {
+                //search on _all in the local cluster if neither local indices nor remote indices were specified
                 originalIndicesMap.put(LOCAL_CLUSTER_GROUP_KEY, new OriginalIndices(Strings.EMPTY_ARRAY, indicesOptions));
+            } else {
+                for (Map.Entry<String, List<String>> entry : groupedIndices.entrySet()) {
+                    String clusterAlias = entry.getKey();
+                    List<String> originalIndices = entry.getValue();
+                    originalIndicesMap.put(clusterAlias,
+                        new OriginalIndices(originalIndices.toArray(new String[originalIndices.size()]), indicesOptions));
+                }
             }
         } else {
             originalIndicesMap.put(LOCAL_CLUSTER_GROUP_KEY, new OriginalIndices(indices, indicesOptions));

+ 4 - 5
server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java

@@ -44,7 +44,6 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.RemoteClusterService;
 import org.elasticsearch.transport.TransportService;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -64,7 +63,7 @@ public class TransportSearchActionTests extends ESTestCase {
         ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
     }
 
-    public void testMergeShardsIterators() throws IOException {
+    public void testMergeShardsIterators() {
         List<ShardIterator> localShardIterators = new ArrayList<>();
         {
             ShardId shardId = new ShardId("local_index", "local_index_uuid", 0);
@@ -146,7 +145,7 @@ public class TransportSearchActionTests extends ESTestCase {
         }
     }
 
-    public void testProcessRemoteShards() throws IOException {
+    public void testProcessRemoteShards() {
         try (TransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool,
             null)) {
             RemoteClusterService service = transportService.getRemoteClusterService();
@@ -241,12 +240,12 @@ public class TransportSearchActionTests extends ESTestCase {
     }
 
     public void testBuildClusters() {
-        OriginalIndices localIndices = randomOriginalIndices();
+        OriginalIndices localIndices = randomBoolean() ? null : randomOriginalIndices();
         Map<String, OriginalIndices> remoteIndices = new HashMap<>();
         Map<String, ClusterSearchShardsResponse> searchShardsResponses = new HashMap<>();
         int numRemoteClusters = randomIntBetween(0, 10);
         boolean onlySuccessful = randomBoolean();
-        int localClusters = localIndices.indices().length == 0 ? 0 : 1;
+        int localClusters = localIndices == null ? 0 : 1;
         int total = numRemoteClusters + localClusters;
         int successful = localClusters;
         int skipped = 0;

+ 66 - 5
server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java

@@ -25,6 +25,7 @@ import org.elasticsearch.action.OriginalIndices;
 import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.settings.AbstractScopedSettings;
 import org.elasticsearch.common.settings.ClusterSettings;
@@ -151,7 +152,6 @@ public class RemoteClusterServiceTests extends ESTestCase {
         assertEquals(boom.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
     }
 
-
     public void testGroupClusterIndices() throws IOException {
         List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
         try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
@@ -179,10 +179,9 @@ public class RemoteClusterServiceTests extends ESTestCase {
                     Map<String, List<String>> perClusterIndices = service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar",
                         "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo", "cluster*:baz", "*:boo", "no*match:boo"},
                         i -> false);
-                    String[] localIndices = perClusterIndices.computeIfAbsent(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
-                        k -> Collections.emptyList()).toArray(new String[0]);
-                    assertNotNull(perClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
-                    assertArrayEquals(new String[]{"foo:bar", "foo", "no*match:boo"}, localIndices);
+                    List<String> localIndices = perClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
+                    assertNotNull(localIndices);
+                    assertEquals(Arrays.asList("foo:bar", "foo", "no*match:boo"), localIndices);
                     assertEquals(2, perClusterIndices.size());
                     assertEquals(Arrays.asList("bar", "test", "baz", "boo"), perClusterIndices.get("cluster_1"));
                     assertEquals(Arrays.asList("foo:bar", "foo*", "baz", "boo"), perClusterIndices.get("cluster_2"));
@@ -198,6 +197,68 @@ public class RemoteClusterServiceTests extends ESTestCase {
         }
     }
 
+    public void testGroupIndices() throws IOException {
+        List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
+        try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
+             MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) {
+            DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
+            DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode();
+            knownNodes.add(seedTransport.getLocalDiscoNode());
+            knownNodes.add(otherSeedTransport.getLocalDiscoNode());
+            Collections.shuffle(knownNodes, random());
+
+            try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool,
+                null)) {
+                transportService.start();
+                transportService.acceptIncomingRequests();
+                Settings.Builder builder = Settings.builder();
+                builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString());
+                builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString());
+                try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
+                    assertFalse(service.isCrossClusterSearchEnabled());
+                    service.initializeRemoteClusters();
+                    assertTrue(service.isCrossClusterSearchEnabled());
+                    assertTrue(service.isRemoteClusterRegistered("cluster_1"));
+                    assertTrue(service.isRemoteClusterRegistered("cluster_2"));
+                    assertFalse(service.isRemoteClusterRegistered("foo"));
+                    {
+                        Map<String, OriginalIndices> perClusterIndices = service.groupIndices(IndicesOptions.LENIENT_EXPAND_OPEN,
+                            new String[]{"foo:bar", "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo",
+                                "cluster*:baz", "*:boo", "no*match:boo"},
+                            i -> false);
+                        assertEquals(3, perClusterIndices.size());
+                        assertArrayEquals(new String[]{"foo:bar", "foo", "no*match:boo"},
+                            perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).indices());
+                        assertArrayEquals(new String[]{"bar", "test", "baz", "boo"}, perClusterIndices.get("cluster_1").indices());
+                        assertArrayEquals(new String[]{"foo:bar", "foo*", "baz", "boo"}, perClusterIndices.get("cluster_2").indices());
+                    }
+                    {
+                        IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () ->
+                            service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar",
+                                "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, "cluster_1:bar"::equals));
+                        assertEquals("Can not filter indices; index cluster_1:bar exists but there is also a remote cluster named:" +
+                            " cluster_1", iae.getMessage());
+                    }
+                    {
+                        Map<String, OriginalIndices> perClusterIndices = service.groupIndices(IndicesOptions.LENIENT_EXPAND_OPEN,
+                            new String[]{"cluster_1:bar", "cluster_2:foo*"},
+                            i -> false);
+                        assertEquals(2, perClusterIndices.size());
+                        assertArrayEquals(new String[]{"bar"}, perClusterIndices.get("cluster_1").indices());
+                        assertArrayEquals(new String[]{"foo*"}, perClusterIndices.get("cluster_2").indices());
+                    }
+                    {
+                        Map<String, OriginalIndices> perClusterIndices = service.groupIndices(IndicesOptions.LENIENT_EXPAND_OPEN,
+                            Strings.EMPTY_ARRAY,
+                            i -> false);
+                        assertEquals(1, perClusterIndices.size());
+                        assertArrayEquals(Strings.EMPTY_ARRAY, perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).indices());
+                    }
+                }
+            }
+        }
+    }
+
     public void testIncrementallyAddClusters() throws IOException {
         List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
         try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);