Browse Source

CCR: Make AutoFollowMetadata immutable (#33977)

We should make AutoFollowMetadata immutable to avoid being inconsistent
when one thread modifies it while other reads it.
Nhat Nguyen 7 years ago
parent
commit
6ec36b1273

+ 12 - 11
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

@@ -368,18 +368,19 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
                                                                                       Index indexToFollow) {
             return currentState -> {
                 AutoFollowMetadata currentAutoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE);
-
-                Map<String, List<String>> newFollowedIndexUUIDS =
-                    new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs());
-                newFollowedIndexUUIDS.get(clusterAlias).add(indexToFollow.getUUID());
-
-                ClusterState.Builder newState = ClusterState.builder(currentState);
-                AutoFollowMetadata newAutoFollowMetadata = new AutoFollowMetadata(currentAutoFollowMetadata.getPatterns(),
+                Map<String, List<String>> newFollowedIndexUUIDS = new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs());
+                newFollowedIndexUUIDS.compute(clusterAlias, (key, existingUUIDs) -> {
+                    assert existingUUIDs != null;
+                    List<String> newUUIDs = new ArrayList<>(existingUUIDs);
+                    newUUIDs.add(indexToFollow.getUUID());
+                    return Collections.unmodifiableList(newUUIDs);
+                });
+                final AutoFollowMetadata newAutoFollowMetadata = new AutoFollowMetadata(currentAutoFollowMetadata.getPatterns(),
                     newFollowedIndexUUIDS, currentAutoFollowMetadata.getHeaders());
-                newState.metaData(MetaData.builder(currentState.getMetaData())
-                    .putCustom(AutoFollowMetadata.TYPE, newAutoFollowMetadata)
-                    .build());
-                return newState.build();
+                return ClusterState.builder(currentState)
+                    .metaData(MetaData.builder(currentState.getMetaData())
+                        .putCustom(AutoFollowMetadata.TYPE, newAutoFollowMetadata).build())
+                    .build();
             };
         }
 

+ 5 - 4
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java

@@ -135,12 +135,13 @@ public class TransportPutAutoFollowPatternAction extends
         }
 
         AutoFollowPattern previousPattern = patterns.get(request.getLeaderClusterAlias());
-        List<String> followedIndexUUIDs = followedLeaderIndices.get(request.getLeaderClusterAlias());
-        if (followedIndexUUIDs == null) {
+        final List<String> followedIndexUUIDs;
+        if (followedLeaderIndices.containsKey(request.getLeaderClusterAlias())) {
+            followedIndexUUIDs = new ArrayList<>(followedLeaderIndices.get(request.getLeaderClusterAlias()));
+        } else {
             followedIndexUUIDs = new ArrayList<>();
-            followedLeaderIndices.put(request.getLeaderClusterAlias(), followedIndexUUIDs);
         }
-
+        followedLeaderIndices.put(request.getLeaderClusterAlias(), followedIndexUUIDs);
         // Mark existing leader indices as already auto followed:
         if (previousPattern != null) {
             markExistingIndicesAsAutoFollowedForNewPatterns(request.getLeaderIndexPatterns(), leaderClusterState.metaData(),

+ 2 - 2
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java

@@ -85,7 +85,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
             void getLeaderClusterState(Map<String, String> headers,
                                        String leaderClusterAlias,
                                        BiConsumer<ClusterState, Exception> handler) {
-                assertThat(headers, sameInstance(autoFollowHeaders.get("remote")));
+                assertThat(headers, equalTo(autoFollowHeaders.get("remote")));
                 handler.accept(leaderState, null);
             }
 
@@ -94,7 +94,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
                                  FollowIndexAction.Request followRequest,
                                  Runnable successHandler,
                                  Consumer<Exception> failureHandler) {
-                assertThat(headers, sameInstance(autoFollowHeaders.get("remote")));
+                assertThat(headers, equalTo(autoFollowHeaders.get("remote")));
                 assertThat(followRequest.getLeaderIndex(), equalTo("remote:logs-20190101"));
                 assertThat(followRequest.getFollowerIndex(), equalTo("logs-20190101"));
                 successHandler.run();

+ 11 - 7
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java

@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * Custom metadata that contains auto follow patterns and what leader indices an auto follow pattern has already followed.
@@ -79,16 +80,19 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
     public AutoFollowMetadata(Map<String, AutoFollowPattern> patterns,
                               Map<String, List<String>> followedLeaderIndexUUIDs,
                               Map<String, Map<String, String>> headers) {
-        this.patterns = patterns;
-        this.followedLeaderIndexUUIDs = followedLeaderIndexUUIDs;
-        this.headers = Collections.unmodifiableMap(headers);
+        this.patterns = Collections.unmodifiableMap(patterns);
+        this.followedLeaderIndexUUIDs = Collections.unmodifiableMap(followedLeaderIndexUUIDs.entrySet().stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableList(e.getValue()))));
+        this.headers = Collections.unmodifiableMap(headers.entrySet().stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(e.getValue()))));
     }
 
     public AutoFollowMetadata(StreamInput in) throws IOException {
-        patterns = in.readMap(StreamInput::readString, AutoFollowPattern::new);
-        followedLeaderIndexUUIDs = in.readMapOfLists(StreamInput::readString, StreamInput::readString);
-        headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString,
-            valIn -> Collections.unmodifiableMap(valIn.readMap(StreamInput::readString, StreamInput::readString))));
+        this(
+            in.readMap(StreamInput::readString, AutoFollowPattern::new),
+            in.readMapOfLists(StreamInput::readString, StreamInput::readString),
+            in.readMap(StreamInput::readString, valIn -> valIn.readMap(StreamInput::readString, StreamInput::readString))
+        );
     }
 
     public Map<String, AutoFollowPattern> getPatterns() {