|
@@ -25,11 +25,13 @@ import org.elasticsearch.Version;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.LatchedActionListener;
|
|
|
import org.elasticsearch.action.OriginalIndices;
|
|
|
+import org.elasticsearch.action.OriginalIndicesTests;
|
|
|
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
|
|
|
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
|
|
|
import org.elasticsearch.action.support.IndicesOptions;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
|
|
+import org.elasticsearch.cluster.routing.GroupShardsIteratorTests;
|
|
|
import org.elasticsearch.cluster.routing.PlainShardIterator;
|
|
|
import org.elasticsearch.cluster.routing.ShardIterator;
|
|
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
|
@@ -39,6 +41,7 @@ import org.elasticsearch.common.Strings;
|
|
|
import org.elasticsearch.common.collect.Tuple;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.transport.TransportAddress;
|
|
|
+import org.elasticsearch.index.Index;
|
|
|
import org.elasticsearch.index.query.InnerHitBuilder;
|
|
|
import org.elasticsearch.index.query.MatchAllQueryBuilder;
|
|
|
import org.elasticsearch.index.query.TermsQueryBuilder;
|
|
@@ -71,6 +74,7 @@ import org.elasticsearch.transport.TransportRequestOptions;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
@@ -85,7 +89,6 @@ import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.function.BiFunction;
|
|
|
import java.util.function.Function;
|
|
|
|
|
|
-import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.awaitLatch;
|
|
|
import static org.hamcrest.CoreMatchers.containsString;
|
|
|
import static org.hamcrest.CoreMatchers.instanceOf;
|
|
@@ -101,92 +104,96 @@ public class TransportSearchActionTests extends ESTestCase {
|
|
|
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
|
|
|
}
|
|
|
|
|
|
+ private static SearchShardIterator createSearchShardIterator(int id, Index index,
|
|
|
+ OriginalIndices originalIndices, String clusterAlias) {
|
|
|
+ ShardId shardId = new ShardId(index, id);
|
|
|
+ List<ShardRouting> shardRoutings = GroupShardsIteratorTests.randomShardRoutings(shardId);
|
|
|
+ return new SearchShardIterator(clusterAlias, shardId, shardRoutings, originalIndices);
|
|
|
+ }
|
|
|
+
|
|
|
public void testMergeShardsIterators() {
|
|
|
- List<ShardIterator> localShardIterators = new ArrayList<>();
|
|
|
- {
|
|
|
- ShardId shardId = new ShardId("local_index", "local_index_uuid", 0);
|
|
|
- ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, "local_node", true, STARTED);
|
|
|
- ShardIterator shardIterator = new PlainShardIterator(shardId, Collections.singletonList(shardRouting));
|
|
|
- localShardIterators.add(shardIterator);
|
|
|
+ Index[] indices = new Index[randomIntBetween(1, 10)];
|
|
|
+ for (int i = 0; i < indices.length; i++) {
|
|
|
+ if (randomBoolean() && i > 0) {
|
|
|
+ Index existingIndex = indices[randomIntBetween(0, i - 1)];
|
|
|
+ indices[i] = new Index(existingIndex.getName(), randomAlphaOfLength(10));
|
|
|
+ } else {
|
|
|
+ indices[i] = new Index(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10));
|
|
|
+ }
|
|
|
}
|
|
|
- {
|
|
|
- ShardId shardId2 = new ShardId("local_index_2", "local_index_2_uuid", 1);
|
|
|
- ShardRouting shardRouting2 = TestShardRouting.newShardRouting(shardId2, "local_node", true, STARTED);
|
|
|
- ShardIterator shardIterator2 = new PlainShardIterator(shardId2, Collections.singletonList(shardRouting2));
|
|
|
- localShardIterators.add(shardIterator2);
|
|
|
+ Arrays.sort(indices, (o1, o2) -> {
|
|
|
+ int nameCompareTo = o1.getName().compareTo(o2.getName());
|
|
|
+ if (nameCompareTo == 0) {
|
|
|
+ return o1.getUUID().compareTo(o2.getUUID());
|
|
|
+ }
|
|
|
+ return nameCompareTo;
|
|
|
+ });
|
|
|
+ String[] remoteClusters = new String[randomIntBetween(1, 3)];
|
|
|
+ for (int i = 0; i < remoteClusters.length; i++) {
|
|
|
+ remoteClusters[i] = randomAlphaOfLengthBetween(5, 10);
|
|
|
}
|
|
|
- GroupShardsIterator<ShardIterator> localShardsIterator = new GroupShardsIterator<>(localShardIterators);
|
|
|
-
|
|
|
- OriginalIndices localIndices = new OriginalIndices(new String[]{"local_alias", "local_index_2"},
|
|
|
- SearchRequest.DEFAULT_INDICES_OPTIONS);
|
|
|
+ Arrays.sort(remoteClusters);
|
|
|
|
|
|
- OriginalIndices remoteIndices = new OriginalIndices(new String[]{"remote_alias", "remote_index_2"},
|
|
|
- IndicesOptions.strictExpandOpen());
|
|
|
+ List<SearchShardIterator> expected = new ArrayList<>();
|
|
|
+ String localClusterAlias = randomAlphaOfLengthBetween(5, 10);
|
|
|
+ OriginalIndices localIndices = OriginalIndicesTests.randomOriginalIndices();
|
|
|
+ List<ShardIterator> localShardIterators = new ArrayList<>();
|
|
|
List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
|
|
|
- {
|
|
|
- ShardId remoteShardId = new ShardId("remote_index", "remote_index_uuid", 2);
|
|
|
- ShardRouting remoteShardRouting = TestShardRouting.newShardRouting(remoteShardId, "remote_node", true, STARTED);
|
|
|
- SearchShardIterator remoteShardIterator = new SearchShardIterator("remote", remoteShardId,
|
|
|
- Collections.singletonList(remoteShardRouting), remoteIndices);
|
|
|
- remoteShardIterators.add(remoteShardIterator);
|
|
|
- }
|
|
|
- {
|
|
|
- ShardId remoteShardId2 = new ShardId("remote_index_2", "remote_index_2_uuid", 3);
|
|
|
- ShardRouting remoteShardRouting2 = TestShardRouting.newShardRouting(remoteShardId2, "remote_node", true, STARTED);
|
|
|
- SearchShardIterator remoteShardIterator2 = new SearchShardIterator("remote", remoteShardId2,
|
|
|
- Collections.singletonList(remoteShardRouting2), remoteIndices);
|
|
|
- remoteShardIterators.add(remoteShardIterator2);
|
|
|
+ int numShards = randomIntBetween(0, 10);
|
|
|
+ for (int i = 0; i < numShards; i++) {
|
|
|
+ int numIndices = randomIntBetween(0, indices.length);
|
|
|
+ for (int j = 0; j < numIndices; j++) {
|
|
|
+ Index index = indices[j];
|
|
|
+ boolean localIndex = randomBoolean();
|
|
|
+ if (localIndex) {
|
|
|
+ SearchShardIterator localIterator = createSearchShardIterator(i, index, localIndices, localClusterAlias);
|
|
|
+ localShardIterators.add(new PlainShardIterator(localIterator.shardId(), localIterator.getShardRoutings()));
|
|
|
+ if (rarely()) {
|
|
|
+ String remoteClusterAlias = randomFrom(remoteClusters);
|
|
|
+ //simulate scenario where the local cluster is also registered as a remote one
|
|
|
+ SearchShardIterator remoteIterator = createSearchShardIterator(i, index,
|
|
|
+ OriginalIndicesTests.randomOriginalIndices(), remoteClusterAlias);
|
|
|
+ remoteShardIterators.add(remoteIterator);
|
|
|
+ assert remoteClusterAlias.equals(localClusterAlias) == false;
|
|
|
+ if (remoteClusterAlias.compareTo(localClusterAlias) < 0) {
|
|
|
+ expected.add(remoteIterator);
|
|
|
+ expected.add(localIterator);
|
|
|
+ } else {
|
|
|
+ expected.add(localIterator);
|
|
|
+ expected.add(remoteIterator);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ expected.add(localIterator);
|
|
|
+ }
|
|
|
+ } else if (rarely()) {
|
|
|
+ int numClusters = randomIntBetween(1, remoteClusters.length);
|
|
|
+ for (int k = 0; k < numClusters; k++) {
|
|
|
+ //simulate scenario where the same cluster is registered multiple times with different aliases
|
|
|
+ String clusterAlias = remoteClusters[k];
|
|
|
+ SearchShardIterator iterator = createSearchShardIterator(i, index, OriginalIndicesTests.randomOriginalIndices(),
|
|
|
+ clusterAlias);
|
|
|
+ expected.add(iterator);
|
|
|
+ remoteShardIterators.add(iterator);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ SearchShardIterator iterator = createSearchShardIterator(i, index, OriginalIndicesTests.randomOriginalIndices(),
|
|
|
+ randomFrom(remoteClusters));
|
|
|
+ expected.add(iterator);
|
|
|
+ remoteShardIterators.add(iterator);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- OriginalIndices remoteIndices2 = new OriginalIndices(new String[]{"remote_index_3"}, IndicesOptions.strictExpand());
|
|
|
|
|
|
- {
|
|
|
- ShardId remoteShardId3 = new ShardId("remote_index_3", "remote_index_3_uuid", 4);
|
|
|
- ShardRouting remoteShardRouting3 = TestShardRouting.newShardRouting(remoteShardId3, "remote_node", true, STARTED);
|
|
|
- SearchShardIterator remoteShardIterator3 = new SearchShardIterator("remote", remoteShardId3,
|
|
|
- Collections.singletonList(remoteShardRouting3), remoteIndices2);
|
|
|
- remoteShardIterators.add(remoteShardIterator3);
|
|
|
- }
|
|
|
+ Collections.shuffle(localShardIterators, random());
|
|
|
+ Collections.shuffle(remoteShardIterators, random());
|
|
|
|
|
|
- String localClusterAlias = randomBoolean() ? null : "local";
|
|
|
- GroupShardsIterator<SearchShardIterator> searchShardIterators = TransportSearchAction.mergeShardsIterators(localShardsIterator,
|
|
|
- localIndices, localClusterAlias, remoteShardIterators);
|
|
|
-
|
|
|
- assertEquals(searchShardIterators.size(), 5);
|
|
|
- int i = 0;
|
|
|
- for (SearchShardIterator searchShardIterator : searchShardIterators) {
|
|
|
- switch(i++) {
|
|
|
- case 0:
|
|
|
- assertEquals("local_index", searchShardIterator.shardId().getIndexName());
|
|
|
- assertEquals(0, searchShardIterator.shardId().getId());
|
|
|
- assertSame(localIndices, searchShardIterator.getOriginalIndices());
|
|
|
- assertEquals(localClusterAlias, searchShardIterator.getClusterAlias());
|
|
|
- break;
|
|
|
- case 1:
|
|
|
- assertEquals("local_index_2", searchShardIterator.shardId().getIndexName());
|
|
|
- assertEquals(1, searchShardIterator.shardId().getId());
|
|
|
- assertSame(localIndices, searchShardIterator.getOriginalIndices());
|
|
|
- assertEquals(localClusterAlias, searchShardIterator.getClusterAlias());
|
|
|
- break;
|
|
|
- case 2:
|
|
|
- assertEquals("remote_index", searchShardIterator.shardId().getIndexName());
|
|
|
- assertEquals(2, searchShardIterator.shardId().getId());
|
|
|
- assertSame(remoteIndices, searchShardIterator.getOriginalIndices());
|
|
|
- assertEquals("remote", searchShardIterator.getClusterAlias());
|
|
|
- break;
|
|
|
- case 3:
|
|
|
- assertEquals("remote_index_2", searchShardIterator.shardId().getIndexName());
|
|
|
- assertEquals(3, searchShardIterator.shardId().getId());
|
|
|
- assertSame(remoteIndices, searchShardIterator.getOriginalIndices());
|
|
|
- assertEquals("remote", searchShardIterator.getClusterAlias());
|
|
|
- break;
|
|
|
- case 4:
|
|
|
- assertEquals("remote_index_3", searchShardIterator.shardId().getIndexName());
|
|
|
- assertEquals(4, searchShardIterator.shardId().getId());
|
|
|
- assertSame(remoteIndices2, searchShardIterator.getOriginalIndices());
|
|
|
- assertEquals("remote", searchShardIterator.getClusterAlias());
|
|
|
- break;
|
|
|
- }
|
|
|
+ GroupShardsIterator<SearchShardIterator> groupShardsIterator = TransportSearchAction.mergeShardsIterators(
|
|
|
+ new GroupShardsIterator<>(localShardIterators), localIndices, localClusterAlias, remoteShardIterators);
|
|
|
+ List<SearchShardIterator> result = new ArrayList<>();
|
|
|
+ for (SearchShardIterator searchShardIterator : groupShardsIterator) {
|
|
|
+ result.add(searchShardIterator);
|
|
|
}
|
|
|
+ assertEquals(expected, result);
|
|
|
}
|
|
|
|
|
|
public void testProcessRemoteShards() {
|