|
@@ -5,8 +5,10 @@
|
|
|
*/
|
|
|
package org.elasticsearch.xpack.ccr.action;
|
|
|
|
|
|
+import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
|
|
import org.elasticsearch.Version;
|
|
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
|
|
+import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
|
|
|
import org.elasticsearch.client.Client;
|
|
|
import org.elasticsearch.cluster.ClusterName;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
@@ -18,11 +20,13 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
|
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
|
|
import org.elasticsearch.cluster.routing.TestShardRouting;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
+import org.elasticsearch.common.UUIDs;
|
|
|
import org.elasticsearch.common.collect.Tuple;
|
|
|
import org.elasticsearch.common.settings.ClusterSettings;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
+import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
|
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
|
|
import org.elasticsearch.index.Index;
|
|
|
import org.elasticsearch.index.IndexSettings;
|
|
@@ -44,10 +48,12 @@ import java.util.HashMap;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.function.BiConsumer;
|
|
|
import java.util.function.Consumer;
|
|
|
import java.util.function.Function;
|
|
@@ -57,6 +63,7 @@ import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollo
|
|
|
import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.greaterThan;
|
|
|
+import static org.hamcrest.Matchers.hasItem;
|
|
|
import static org.hamcrest.Matchers.is;
|
|
|
import static org.hamcrest.Matchers.notNullValue;
|
|
|
import static org.hamcrest.Matchers.nullValue;
|
|
@@ -80,7 +87,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
|
|
|
followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
|
|
|
Map<String, Map<String, String>> autoFollowHeaders = new HashMap<>();
|
|
|
- autoFollowHeaders.put("remote", Collections.singletonMap("key", "val"));
|
|
|
+ autoFollowHeaders.put("remote", Map.of("key", "val"));
|
|
|
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders);
|
|
|
|
|
|
ClusterState currentState = ClusterState.builder(new ClusterName("name"))
|
|
@@ -315,7 +322,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
Map<String, Map<String, String>> headers = new HashMap<>();
|
|
|
ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
|
|
|
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
|
|
- new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap(), headers)))
|
|
|
+ new AutoFollowMetadata(Map.of("remote", autoFollowPattern), Collections.emptyMap(), headers)))
|
|
|
.build();
|
|
|
|
|
|
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
|
|
@@ -377,7 +384,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
Map<String, Map<String, String>> headers = new HashMap<>();
|
|
|
ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
|
|
|
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
|
|
- new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap(), headers)))
|
|
|
+ new AutoFollowMetadata(Map.of("remote", autoFollowPattern), Collections.emptyMap(), headers)))
|
|
|
.build();
|
|
|
|
|
|
// 1 shard started and another not started:
|
|
@@ -416,9 +423,29 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
assertThat(result.get(1).getName(), equalTo("index2"));
|
|
|
}
|
|
|
|
|
|
+ public void testGetLeaderIndicesToFollowWithClosedIndices() {
|
|
|
+ final AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"),
|
|
|
+ null, null, null, null, null, null, null, null, null, null, null);
|
|
|
+
|
|
|
+ // index is opened
|
|
|
+ ClusterState remoteState = ClusterStateCreationUtils.stateWithActivePrimary("test-index", true, randomIntBetween(1, 3), 0);
|
|
|
+ List<Index> result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList());
|
|
|
+ assertThat(result.size(), equalTo(1));
|
|
|
+ assertThat(result, hasItem(remoteState.metaData().index("test-index").getIndex()));
|
|
|
+
|
|
|
+ // index is closed
|
|
|
+ remoteState = ClusterState.builder(remoteState)
|
|
|
+ .metaData(MetaData.builder(remoteState.metaData())
|
|
|
+ .put(IndexMetaData.builder(remoteState.metaData().index("test-index")).state(IndexMetaData.State.CLOSE).build(), true)
|
|
|
+ .build())
|
|
|
+ .build();
|
|
|
+ result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList());
|
|
|
+ assertThat(result.size(), equalTo(0));
|
|
|
+ }
|
|
|
+
|
|
|
public void testRecordLeaderIndexAsFollowFunction() {
|
|
|
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(),
|
|
|
- Collections.singletonMap("pattern1", Collections.emptyList()), Collections.emptyMap());
|
|
|
+ Map.of("pattern1", Collections.emptyList()), Collections.emptyMap());
|
|
|
ClusterState clusterState = new ClusterState.Builder(new ClusterName("name"))
|
|
|
.metaData(new MetaData.Builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
|
|
|
.build();
|
|
@@ -445,7 +472,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
|
|
|
public void testCleanFollowedLeaderIndices() {
|
|
|
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(),
|
|
|
- Collections.singletonMap("pattern1", Arrays.asList("index1", "index2", "index3")), Collections.emptyMap());
|
|
|
+ Map.of("pattern1", Arrays.asList("index1", "index2", "index3")), Collections.emptyMap());
|
|
|
ClusterState clusterState = new ClusterState.Builder(new ClusterName("name"))
|
|
|
.metaData(new MetaData.Builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
|
|
|
.build();
|
|
@@ -474,7 +501,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
|
|
|
public void testCleanFollowedLeaderIndicesNoChanges() {
|
|
|
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(),
|
|
|
- Collections.singletonMap("pattern1", Arrays.asList("index1", "index2", "index3")), Collections.emptyMap());
|
|
|
+ Map.of("pattern1", Arrays.asList("index1", "index2", "index3")), Collections.emptyMap());
|
|
|
ClusterState clusterState = new ClusterState.Builder(new ClusterName("name"))
|
|
|
.metaData(new MetaData.Builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
|
|
|
.build();
|
|
@@ -507,7 +534,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
|
|
|
public void testCleanFollowedLeaderIndicesNoEntry() {
|
|
|
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(),
|
|
|
- Collections.singletonMap("pattern2", Arrays.asList("index1", "index2", "index3")), Collections.emptyMap());
|
|
|
+ Map.of("pattern2", Arrays.asList("index1", "index2", "index3")), Collections.emptyMap());
|
|
|
ClusterState clusterState = new ClusterState.Builder(new ClusterName("name"))
|
|
|
.metaData(new MetaData.Builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
|
|
|
.build();
|
|
@@ -717,7 +744,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
|
|
|
followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
|
|
|
Map<String, Map<String, String>> autoFollowHeaders = new HashMap<>();
|
|
|
- autoFollowHeaders.put("remote", Collections.singletonMap("key", "val"));
|
|
|
+ autoFollowHeaders.put("remote", Map.of("key", "val"));
|
|
|
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders);
|
|
|
|
|
|
final LinkedList<ClusterState> leaderStates = new LinkedList<>();
|
|
@@ -763,7 +790,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
autoFollower.start();
|
|
|
assertThat(allResults.size(), equalTo(states.length));
|
|
|
for (int i = 0; i < states.length; i++) {
|
|
|
- assertThat(allResults.get(i).autoFollowExecutionResults.containsKey(new Index("logs-" + i, "_na_")), is(true));
|
|
|
+ final String indexName = "logs-" + i;
|
|
|
+ assertThat(allResults.get(i).autoFollowExecutionResults.keySet().stream()
|
|
|
+ .anyMatch(index -> index.getName().equals(indexName)), is(true));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -778,7 +807,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
|
|
|
followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
|
|
|
Map<String, Map<String, String>> autoFollowHeaders = new HashMap<>();
|
|
|
- autoFollowHeaders.put("remote", Collections.singletonMap("key", "val"));
|
|
|
+ autoFollowHeaders.put("remote", Map.of("key", "val"));
|
|
|
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders);
|
|
|
|
|
|
ClusterState[] states = new ClusterState[16];
|
|
@@ -836,7 +865,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
|
|
|
followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
|
|
|
Map<String, Map<String, String>> autoFollowHeaders = new HashMap<>();
|
|
|
- autoFollowHeaders.put("remote", Collections.singletonMap("key", "val"));
|
|
|
+ autoFollowHeaders.put("remote", Map.of("key", "val"));
|
|
|
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders);
|
|
|
|
|
|
ClusterState currentState = ClusterState.builder(new ClusterName("name"))
|
|
@@ -902,14 +931,14 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
|
|
|
followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
|
|
|
Map<String, Map<String, String>> autoFollowHeaders = new HashMap<>();
|
|
|
- autoFollowHeaders.put("remote", Collections.singletonMap("key", "val"));
|
|
|
+ autoFollowHeaders.put("remote", Map.of("key", "val"));
|
|
|
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders);
|
|
|
|
|
|
ClusterState currentState = ClusterState.builder(new ClusterName("name"))
|
|
|
.metaData(MetaData.builder()
|
|
|
.put(IndexMetaData.builder("logs-20190101")
|
|
|
.settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true))
|
|
|
- .putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, Collections.singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY,
|
|
|
+ .putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, Map.of(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY,
|
|
|
remoteState.metaData().index("logs-20190101").getIndexUUID()))
|
|
|
.numberOfShards(1)
|
|
|
.numberOfReplicas(0))
|
|
@@ -1045,6 +1074,85 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void testClosedIndicesAreNotAutoFollowed() {
|
|
|
+ final Client client = mock(Client.class);
|
|
|
+ when(client.getRemoteClusterClient(anyString())).thenReturn(client);
|
|
|
+
|
|
|
+ final String pattern = "pattern1";
|
|
|
+ final ClusterState localState = ClusterState.builder(new ClusterName("local"))
|
|
|
+ .metaData(MetaData.builder()
|
|
|
+ .putCustom(AutoFollowMetadata.TYPE,
|
|
|
+ new AutoFollowMetadata(Map.of(pattern, new AutoFollowPattern("remote", List.of("docs-*"), null,
|
|
|
+ null, null, null, null, null, null, null, null, null, null)),
|
|
|
+ Map.of(pattern, List.of()), Map.of(pattern, Map.of()))))
|
|
|
+ .build();
|
|
|
+
|
|
|
+ ClusterState remoteState = null;
|
|
|
+ final int nbLeaderIndices = randomInt(15);
|
|
|
+ for (int i = 0; i < nbLeaderIndices; i++) {
|
|
|
+ String indexName = "docs-" + i;
|
|
|
+ if (remoteState == null) {
|
|
|
+ remoteState = createRemoteClusterState(indexName, true);
|
|
|
+ } else {
|
|
|
+ remoteState = createRemoteClusterState(remoteState, indexName);
|
|
|
+ }
|
|
|
+ if (randomBoolean()) {
|
|
|
+ // randomly close the index
|
|
|
+ remoteState = ClusterState.builder(remoteState.getClusterName())
|
|
|
+ .routingTable(remoteState.routingTable())
|
|
|
+ .metaData(MetaData.builder(remoteState.metaData())
|
|
|
+ .put(IndexMetaData.builder(remoteState.metaData().index(indexName)).state(IndexMetaData.State.CLOSE).build(), true)
|
|
|
+ .build())
|
|
|
+ .build();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ final ClusterState finalRemoteState = remoteState;
|
|
|
+ final AtomicReference<ClusterState> lastModifiedClusterState = new AtomicReference<>(localState);
|
|
|
+ final List<AutoFollowCoordinator.AutoFollowResult> results = new ArrayList<>();
|
|
|
+ final Set<Object> followedIndices = ConcurrentCollections.newConcurrentSet();
|
|
|
+ final AutoFollower autoFollower =
|
|
|
+ new AutoFollower("remote", results::addAll, localClusterStateSupplier(localState), () -> 1L, Runnable::run) {
|
|
|
+ @Override
|
|
|
+ void getRemoteClusterState(String remoteCluster,
|
|
|
+ long metadataVersion,
|
|
|
+ BiConsumer<ClusterStateResponse, Exception> handler) {
|
|
|
+ assertThat(remoteCluster, equalTo("remote"));
|
|
|
+ handler.accept(new ClusterStateResponse(new ClusterName("remote"), finalRemoteState, false), null);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void createAndFollow(Map<String, String> headers,
|
|
|
+ PutFollowAction.Request followRequest,
|
|
|
+ Runnable successHandler,
|
|
|
+ Consumer<Exception> failureHandler) {
|
|
|
+ followedIndices.add(followRequest.getLeaderIndex());
|
|
|
+ successHandler.run();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler) {
|
|
|
+ lastModifiedClusterState.updateAndGet(updateFunction::apply);
|
|
|
+ handler.accept(null);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List<String> patterns) {
|
|
|
+ // Ignore, to avoid invoking updateAutoFollowMetadata(...) twice
|
|
|
+ }
|
|
|
+ };
|
|
|
+ autoFollower.start();
|
|
|
+
|
|
|
+ assertThat(results, notNullValue());
|
|
|
+ assertThat(results.size(), equalTo(1));
|
|
|
+
|
|
|
+ for (ObjectObjectCursor<String, IndexMetaData> index : remoteState.metaData().indices()) {
|
|
|
+ boolean expect = index.value.getState() == IndexMetaData.State.OPEN;
|
|
|
+ assertThat(results.get(0).autoFollowExecutionResults.containsKey(index.value.getIndex()), is(expect));
|
|
|
+ assertThat(followedIndices.contains(index.key), is(expect));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private static ClusterState createRemoteClusterState(String indexName, boolean enableSoftDeletes) {
|
|
|
Settings.Builder indexSettings;
|
|
|
indexSettings = settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), enableSoftDeletes);
|
|
@@ -1067,11 +1175,13 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
|
|
|
private static ClusterState createRemoteClusterState(ClusterState previous, String indexName) {
|
|
|
IndexMetaData indexMetaData = IndexMetaData.builder(indexName)
|
|
|
- .settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true))
|
|
|
+ .settings(settings(Version.CURRENT)
|
|
|
+ .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
|
|
|
+ .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())))
|
|
|
.numberOfShards(1)
|
|
|
.numberOfReplicas(0)
|
|
|
.build();
|
|
|
- ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
|
|
|
+ ClusterState.Builder csBuilder = ClusterState.builder(previous.getClusterName())
|
|
|
.metaData(MetaData.builder(previous.metaData())
|
|
|
.version(previous.metaData().version() + 1)
|
|
|
.put(indexMetaData, true));
|
|
@@ -1079,7 +1189,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
ShardRouting shardRouting =
|
|
|
TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted();
|
|
|
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex()).addShard(shardRouting).build();
|
|
|
- csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
|
|
|
+ csBuilder.routingTable(RoutingTable.builder(previous.routingTable()).add(indexRoutingTable).build()).build();
|
|
|
|
|
|
return csBuilder.build();
|
|
|
}
|