Browse Source

Remove optimisations to reuse objects when applying a new `ClusterState` (#27317)

In order to avoid churn when applying a new `ClusterState`, there are some checks that compare parts of the old and new states and, if equal, the new object is discarded and the old one reused. Since `ClusterState` updates are now largely diff-based, this code is unnecessary: applying a diff also reuses any old objects if unchanged. Moreover, the code compares the parts of the `ClusterState` using their `version()` values which is not guaranteed to be correct, because of a lack of consensus.

This change removes this optimisation, and tests that objects are still reused as expected via the diff mechanism.
David Turner 8 years ago
parent
commit
4abb5fa297

+ 6 - 38
core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

@@ -735,7 +735,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
 
         final ClusterState newClusterState = pendingStatesQueue.getNextClusterStateToProcess();
         final ClusterState currentState = committedState.get();
-        final ClusterState adaptedNewClusterState;
         // all pending states have been processed
         if (newClusterState == null) {
             return false;
@@ -773,54 +772,23 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
         if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) {
             // its a fresh update from the master as we transition from a start of not having a master to having one
             logger.debug("got first state from fresh master [{}]", newClusterState.nodes().getMasterNodeId());
-            adaptedNewClusterState = newClusterState;
-        } else if (newClusterState.nodes().isLocalNodeElectedMaster() == false) {
-            // some optimizations to make sure we keep old objects where possible
-            ClusterState.Builder builder = ClusterState.builder(newClusterState);
-
-            // if the routing table did not change, use the original one
-            if (newClusterState.routingTable().version() == currentState.routingTable().version()) {
-                builder.routingTable(currentState.routingTable());
-            }
-            // same for metadata
-            if (newClusterState.metaData().version() == currentState.metaData().version()) {
-                builder.metaData(currentState.metaData());
-            } else {
-                // if its not the same version, only copy over new indices or ones that changed the version
-                MetaData.Builder metaDataBuilder = MetaData.builder(newClusterState.metaData()).removeAllIndices();
-                for (IndexMetaData indexMetaData : newClusterState.metaData()) {
-                    IndexMetaData currentIndexMetaData = currentState.metaData().index(indexMetaData.getIndex());
-                    if (currentIndexMetaData != null && currentIndexMetaData.isSameUUID(indexMetaData.getIndexUUID()) &&
-                        currentIndexMetaData.getVersion() == indexMetaData.getVersion()) {
-                        // safe to reuse
-                        metaDataBuilder.put(currentIndexMetaData, false);
-                    } else {
-                        metaDataBuilder.put(indexMetaData, false);
-                    }
-                }
-                builder.metaData(metaDataBuilder);
-            }
-
-            adaptedNewClusterState = builder.build();
-        } else {
-            adaptedNewClusterState = newClusterState;
         }
 
-        if (currentState == adaptedNewClusterState) {
+        if (currentState == newClusterState) {
             return false;
         }
 
-        committedState.set(adaptedNewClusterState);
+        committedState.set(newClusterState);
 
         // update failure detection only after the state has been updated to prevent race condition with handleLeaveRequest
         // and handleNodeFailure as those check the current state to determine whether the failure is to be handled by this node
-        if (adaptedNewClusterState.nodes().isLocalNodeElectedMaster()) {
+        if (newClusterState.nodes().isLocalNodeElectedMaster()) {
             // update the set of nodes to ping
-            nodesFD.updateNodesAndPing(adaptedNewClusterState);
+            nodesFD.updateNodesAndPing(newClusterState);
         } else {
             // check to see that we monitor the correct master of the cluster
-            if (masterFD.masterNode() == null || !masterFD.masterNode().equals(adaptedNewClusterState.nodes().getMasterNode())) {
-                masterFD.restart(adaptedNewClusterState.nodes().getMasterNode(),
+            if (masterFD.masterNode() == null || !masterFD.masterNode().equals(newClusterState.nodes().getMasterNode())) {
+                masterFD.restart(newClusterState.nodes().getMasterNode(),
                     "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]");
             }
         }

+ 57 - 0
core/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java

@@ -28,6 +28,7 @@ import org.elasticsearch.cluster.ESAllocationTestCase;
 import org.elasticsearch.cluster.RestoreInProgress;
 import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.RoutingTable;
@@ -43,6 +44,8 @@ import org.elasticsearch.snapshots.Snapshot;
 import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.test.VersionUtils;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 
 import static org.hamcrest.Matchers.equalTo;
@@ -154,4 +157,58 @@ public class ClusterSerializationTests extends ESAllocationTestCase {
         assertThat(stateAfterDiffs.custom(SnapshotDeletionsInProgress.TYPE), notNullValue());
     }
 
+    private ClusterState updateUsingSerialisedDiff(ClusterState original, Diff<ClusterState> diff) throws IOException {
+        BytesStreamOutput outStream = new BytesStreamOutput();
+        outStream.setVersion(Version.CURRENT);
+        diff.writeTo(outStream);
+        StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(),
+            new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
+        diff = ClusterState.readDiffFrom(inStream, newNode("node-name"));
+        return diff.apply(original);
+    }
+
+    public void testObjectReuseWhenApplyingClusterStateDiff() throws Exception {
+        IndexMetaData indexMetaData
+            = IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(10).numberOfReplicas(1).build();
+        IndexTemplateMetaData indexTemplateMetaData
+            = IndexTemplateMetaData.builder("test-template").patterns(new ArrayList<>()).build();
+        MetaData metaData = MetaData.builder().put(indexMetaData, true).put(indexTemplateMetaData).build();
+
+        RoutingTable routingTable = RoutingTable.builder().addAsNew(metaData.index("test")).build();
+
+        ClusterState clusterState1 = ClusterState.builder(new ClusterName("clusterName1"))
+            .metaData(metaData).routingTable(routingTable).build();
+        BytesStreamOutput outStream = new BytesStreamOutput();
+        outStream.setVersion(Version.CURRENT);
+        clusterState1.writeTo(outStream);
+        StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(),
+            new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
+        ClusterState serializedClusterState1 = ClusterState.readFrom(inStream, newNode("node4"));
+
+        // Create a new, albeit equal, IndexMetadata object
+        ClusterState clusterState2 = ClusterState.builder(clusterState1).incrementVersion()
+            .metaData(MetaData.builder().put(IndexMetaData.builder(indexMetaData).numberOfReplicas(1).build(), true)).build();
+        assertNotSame("Should have created a new, equivalent, IndexMetaData object in clusterState2",
+            clusterState1.metaData().index("test"), clusterState2.metaData().index("test"));
+
+        ClusterState serializedClusterState2 = updateUsingSerialisedDiff(serializedClusterState1, clusterState2.diff(clusterState1));
+        assertSame("Unchanged metadata should not create new IndexMetaData objects",
+            serializedClusterState1.metaData().index("test"), serializedClusterState2.metaData().index("test"));
+        assertSame("Unchanged routing table should not create new IndexRoutingTable objects",
+            serializedClusterState1.routingTable().index("test"), serializedClusterState2.routingTable().index("test"));
+
+        // Create a new and different IndexMetadata object
+        ClusterState clusterState3 = ClusterState.builder(clusterState1).incrementVersion()
+            .metaData(MetaData.builder().put(IndexMetaData.builder(indexMetaData).numberOfReplicas(2).build(), true)).build();
+        ClusterState serializedClusterState3 = updateUsingSerialisedDiff(serializedClusterState2, clusterState3.diff(clusterState2));
+        assertNotEquals("Should have a new IndexMetaData object",
+            serializedClusterState2.metaData().index("test"), serializedClusterState3.metaData().index("test"));
+        assertSame("Unchanged routing table should not create new IndexRoutingTable objects",
+            serializedClusterState2.routingTable().index("test"), serializedClusterState3.routingTable().index("test"));
+
+        assertSame("nodes", serializedClusterState2.nodes(), serializedClusterState3.nodes());
+        assertSame("blocks", serializedClusterState2.blocks(), serializedClusterState3.blocks());
+        assertSame("template", serializedClusterState2.metaData().templates().get("test-template"),
+            serializedClusterState3.metaData().templates().get("test-template"));
+    }
 }