|
@@ -28,6 +28,7 @@ import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
+import java.util.function.Function;
|
|
|
|
|
|
import static org.hamcrest.CoreMatchers.containsString;
|
|
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
|
@@ -80,6 +81,62 @@ public class OperationRoutingTests extends ESTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void testPreferCombine() throws InterruptedException, IOException {
|
|
|
+ TestThreadPool threadPool = null;
|
|
|
+ ClusterService clusterService = null;
|
|
|
+ try {
|
|
|
+ threadPool = new TestThreadPool("testPreferCombine");
|
|
|
+ clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
|
|
+ final String indexName = "test";
|
|
|
+ ClusterServiceUtils.setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(indexName, true, randomInt(8)));
|
|
|
+ final Index index = clusterService.state().metadata().index(indexName).getIndex();
|
|
|
+ final List<ShardRouting> shards = clusterService.state().getRoutingNodes().assignedShards(new ShardId(index, 0));
|
|
|
+ final ClusterState state = clusterService.state();
|
|
|
+
|
|
|
+ Function<String, List<ShardRouting>> func = prefer -> {
|
|
|
+ final ShardIterator it = new OperationRouting(
|
|
|
+ Settings.EMPTY,
|
|
|
+ new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
|
|
|
+ ).getShards(state, indexName, 0, prefer);
|
|
|
+ final List<ShardRouting> all = new ArrayList<>();
|
|
|
+ ShardRouting shard;
|
|
|
+ while (it != null && (shard = it.nextOrNull()) != null) {
|
|
|
+ all.add(shard);
|
|
|
+ }
|
|
|
+ return all;
|
|
|
+ };
|
|
|
+
|
|
|
+ // combine _shards with custom_string
|
|
|
+ final int numRepeated = 4;
|
|
|
+ for (int i = 0; i < numRepeated; i++) {
|
|
|
+ String custom_string = "a" + randomAlphaOfLength(10); // not start with _
|
|
|
+
|
|
|
+ List<ShardRouting> prefer_custom = func.apply(custom_string);
|
|
|
+ List<ShardRouting> prefer_custom_shard = func.apply("_shards:0|" + custom_string);
|
|
|
+ List<ShardRouting> prefer_custom_othershard = func.apply("_shards:1|" + custom_string);
|
|
|
+
|
|
|
+ assertThat(prefer_custom.size(), equalTo(shards.size()));
|
|
|
+ assertThat(prefer_custom_shard.size(), equalTo(shards.size()));
|
|
|
+ assertThat(prefer_custom_othershard.size(), equalTo(0));
|
|
|
+ assertThat(prefer_custom, equalTo(prefer_custom_shard)); // same order
|
|
|
+ }
|
|
|
+
|
|
|
+ // combine _shards with _local
|
|
|
+ String local = "_local";
|
|
|
+ List<ShardRouting> prefer_shard_local = func.apply("_shards:0|" + local);
|
|
|
+ assertThat(prefer_shard_local.size(), equalTo(shards.size()));
|
|
|
+
|
|
|
+ // combine _shards with failed_string (start with _)
|
|
|
+ String failed_string = "_xyz";
|
|
|
+ final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> func.apply("_shards:0|" + failed_string));
|
|
|
+ assertThat(e, hasToString(containsString("no Preference for [" + failed_string + "]")));
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ IOUtils.close(clusterService);
|
|
|
+ terminate(threadPool);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public void testFairSessionIdPreferences() throws InterruptedException, IOException {
|
|
|
// Ensure that a user session is re-routed back to same nodes for
|
|
|
// subsequent searches and that the nodes are selected fairly i.e.
|