Browse Source

Merge pull request #14993 from jasontedor/tribe-node-cluster-state-batch

Tribe nodes should apply cluster state updates in batches
Jason Tedor 9 years ago
parent
commit
709740efd2
1 changed files with 156 additions and 116 deletions
  1. 156 116
      core/src/main/java/org/elasticsearch/tribe/TribeService.java

+ 156 - 116
core/src/main/java/org/elasticsearch/tribe/TribeService.java

@@ -26,7 +26,8 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterService;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateListener;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.ClusterStateTaskConfig;
+import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.block.ClusterBlock;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.block.ClusterBlocks;
@@ -36,6 +37,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.inject.Inject;
@@ -205,142 +207,180 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
         }
     }
 
-    class TribeClusterStateListener implements ClusterStateListener {
 
+    class TribeClusterStateListener implements ClusterStateListener {
         private final String tribeName;
+        private final TribeNodeClusterStateTaskExecutor executor;
 
         TribeClusterStateListener(Node tribeNode) {
-            this.tribeName = tribeNode.settings().get(TRIBE_NAME);
+            String tribeName = tribeNode.settings().get(TRIBE_NAME);
+            this.tribeName = tribeName;
+            executor = new TribeNodeClusterStateTaskExecutor(tribeName);
         }
 
         @Override
         public void clusterChanged(final ClusterChangedEvent event) {
             logger.debug("[{}] received cluster event, [{}]", tribeName, event.source());
-            clusterService.submitStateUpdateTask("cluster event from " + tribeName + ", " + event.source(), new ClusterStateUpdateTask() {
-                @Override
-                public boolean runOnlyOnMaster() {
-                    return false;
-                }
+            clusterService.submitStateUpdateTask(
+                    "cluster event from " + tribeName + ", " + event.source(),
+                    event,
+                    ClusterStateTaskConfig.build(Priority.NORMAL),
+                    executor,
+                    (source, t) -> logger.warn("failed to process [{}]", t, source));
+        }
+    }
 
-                @Override
-                public ClusterState execute(ClusterState currentState) throws Exception {
-                    ClusterState tribeState = event.state();
-                    DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(currentState.nodes());
-                    // -- merge nodes
-                    // go over existing nodes, and see if they need to be removed
-                    for (DiscoveryNode discoNode : currentState.nodes()) {
-                        String markedTribeName = discoNode.attributes().get(TRIBE_NAME);
-                        if (markedTribeName != null && markedTribeName.equals(tribeName)) {
-                            if (tribeState.nodes().get(discoNode.id()) == null) {
-                                logger.info("[{}] removing node [{}]", tribeName, discoNode);
-                                nodes.remove(discoNode.id());
-                            }
-                        }
+    class TribeNodeClusterStateTaskExecutor implements ClusterStateTaskExecutor<ClusterChangedEvent> {
+        private final String tribeName;
+
+        TribeNodeClusterStateTaskExecutor(String tribeName) {
+            this.tribeName = tribeName;
+        }
+
+
+        @Override
+        public boolean runOnlyOnMaster() {
+            return false;
+        }
+
+        @Override
+        public BatchResult<ClusterChangedEvent> execute(ClusterState currentState, List<ClusterChangedEvent> tasks) throws Exception {
+            ClusterState accumulator = ClusterState.builder(currentState).build();
+            BatchResult.Builder<ClusterChangedEvent> builder = BatchResult.builder();
+
+            try {
+                // we only need to apply the latest cluster state update
+                accumulator = applyUpdate(accumulator, tasks.get(tasks.size() - 1));
+                builder.successes(tasks);
+            } catch (Throwable t) {
+                builder.failures(tasks, t);
+            }
+
+            return builder.build(accumulator);
+        }
+
+        private ClusterState applyUpdate(ClusterState currentState, ClusterChangedEvent task) {
+            boolean clusterStateChanged = false;
+            ClusterState tribeState = task.state();
+            DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(currentState.nodes());
+            // -- merge nodes
+            // go over existing nodes, and see if they need to be removed
+            for (DiscoveryNode discoNode : currentState.nodes()) {
+                String markedTribeName = discoNode.attributes().get(TRIBE_NAME);
+                if (markedTribeName != null && markedTribeName.equals(tribeName)) {
+                    if (tribeState.nodes().get(discoNode.id()) == null) {
+                        clusterStateChanged = true;
+                        logger.info("[{}] removing node [{}]", tribeName, discoNode);
+                        nodes.remove(discoNode.id());
                     }
-                    // go over tribe nodes, and see if they need to be added
-                    for (DiscoveryNode tribe : tribeState.nodes()) {
-                        if (currentState.nodes().get(tribe.id()) == null) {
-                            // a new node, add it, but also add the tribe name to the attributes
-                            Map<String, String> tribeAttr = new HashMap<>();
-                            for (ObjectObjectCursor<String, String> attr : tribe.attributes()) {
-                                tribeAttr.put(attr.key, attr.value);
-                            }
-                            tribeAttr.put(TRIBE_NAME, tribeName);
-                            DiscoveryNode discoNode = new DiscoveryNode(tribe.name(), tribe.id(), tribe.getHostName(), tribe.getHostAddress(), tribe.address(), unmodifiableMap(tribeAttr), tribe.version());
-                            logger.info("[{}] adding node [{}]", tribeName, discoNode);
-                            nodes.put(discoNode);
-                        }
+                }
+            }
+            // go over tribe nodes, and see if they need to be added
+            for (DiscoveryNode tribe : tribeState.nodes()) {
+                if (currentState.nodes().get(tribe.id()) == null) {
+                    // a new node, add it, but also add the tribe name to the attributes
+                    Map<String, String> tribeAttr = new HashMap<>();
+                    for (ObjectObjectCursor<String, String> attr : tribe.attributes()) {
+                        tribeAttr.put(attr.key, attr.value);
                     }
+                    tribeAttr.put(TRIBE_NAME, tribeName);
+                    DiscoveryNode discoNode = new DiscoveryNode(tribe.name(), tribe.id(), tribe.getHostName(), tribe.getHostAddress(), tribe.address(), unmodifiableMap(tribeAttr), tribe.version());
+                    clusterStateChanged = true;
+                    logger.info("[{}] adding node [{}]", tribeName, discoNode);
+                    nodes.put(discoNode);
+                }
+            }
 
-                    // -- merge metadata
-                    ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
-                    MetaData.Builder metaData = MetaData.builder(currentState.metaData());
-                    RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
-                    // go over existing indices, and see if they need to be removed
-                    for (IndexMetaData index : currentState.metaData()) {
-                        String markedTribeName = index.getSettings().get(TRIBE_NAME);
-                        if (markedTribeName != null && markedTribeName.equals(tribeName)) {
-                            IndexMetaData tribeIndex = tribeState.metaData().index(index.getIndex());
-                            if (tribeIndex == null || tribeIndex.getState() == IndexMetaData.State.CLOSE) {
-                                logger.info("[{}] removing index [{}]", tribeName, index.getIndex());
-                                removeIndex(blocks, metaData, routingTable, index);
-                            } else {
-                                // always make sure to update the metadata and routing table, in case
-                                // there are changes in them (new mapping, shards moving from initializing to started)
-                                routingTable.add(tribeState.routingTable().index(index.getIndex()));
-                                Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME, tribeName).build();
-                                metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings));
-                            }
-                        }
+            // -- merge metadata
+            ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
+            MetaData.Builder metaData = MetaData.builder(currentState.metaData());
+            RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
+            // go over existing indices, and see if they need to be removed
+            for (IndexMetaData index : currentState.metaData()) {
+                String markedTribeName = index.getSettings().get(TRIBE_NAME);
+                if (markedTribeName != null && markedTribeName.equals(tribeName)) {
+                    IndexMetaData tribeIndex = tribeState.metaData().index(index.getIndex());
+                    clusterStateChanged = true;
+                    if (tribeIndex == null || tribeIndex.getState() == IndexMetaData.State.CLOSE) {
+                        logger.info("[{}] removing index [{}]", tribeName, index.getIndex());
+                        removeIndex(blocks, metaData, routingTable, index);
+                    } else {
+                        // always make sure to update the metadata and routing table, in case
+                        // there are changes in them (new mapping, shards moving from initializing to started)
+                        routingTable.add(tribeState.routingTable().index(index.getIndex()));
+                        Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME, tribeName).build();
+                        metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings));
                     }
-                    // go over tribe one, and see if they need to be added
-                    for (IndexMetaData tribeIndex : tribeState.metaData()) {
-                        // if there is no routing table yet, do nothing with it...
-                        IndexRoutingTable table = tribeState.routingTable().index(tribeIndex.getIndex());
-                        if (table == null) {
-                            continue;
-                        }
-                        final IndexMetaData indexMetaData = currentState.metaData().index(tribeIndex.getIndex());
-                        if (indexMetaData == null) {
-                            if (!droppedIndices.contains(tribeIndex.getIndex())) {
-                                // a new index, add it, and add the tribe name as a setting
-                                logger.info("[{}] adding index [{}]", tribeName, tribeIndex.getIndex());
+                }
+            }
+            // go over tribe one, and see if they need to be added
+            for (IndexMetaData tribeIndex : tribeState.metaData()) {
+                // if there is no routing table yet, do nothing with it...
+                IndexRoutingTable table = tribeState.routingTable().index(tribeIndex.getIndex());
+                if (table == null) {
+                    continue;
+                }
+                final IndexMetaData indexMetaData = currentState.metaData().index(tribeIndex.getIndex());
+                if (indexMetaData == null) {
+                    if (!droppedIndices.contains(tribeIndex.getIndex())) {
+                        // a new index, add it, and add the tribe name as a setting
+                        clusterStateChanged = true;
+                        logger.info("[{}] adding index [{}]", tribeName, tribeIndex.getIndex());
+                        addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex);
+                    }
+                } else {
+                    String existingFromTribe = indexMetaData.getSettings().get(TRIBE_NAME);
+                    if (!tribeName.equals(existingFromTribe)) {
+                        // we have a potential conflict on index names, decide what to do...
+                        if (ON_CONFLICT_ANY.equals(onConflict)) {
+                            // we chose any tribe, carry on
+                        } else if (ON_CONFLICT_DROP.equals(onConflict)) {
+                            // drop the indices, there is a conflict
+                            clusterStateChanged = true;
+                            logger.info("[{}] dropping index [{}] due to conflict with [{}]", tribeName, tribeIndex.getIndex(), existingFromTribe);
+                            removeIndex(blocks, metaData, routingTable, tribeIndex);
+                            droppedIndices.add(tribeIndex.getIndex());
+                        } else if (onConflict.startsWith(ON_CONFLICT_PREFER)) {
+                            // on conflict, prefer a tribe...
+                            String preferredTribeName = onConflict.substring(ON_CONFLICT_PREFER.length());
+                            if (tribeName.equals(preferredTribeName)) {
+                                // the new one is hte preferred one, replace...
+                                clusterStateChanged = true;
+                                logger.info("[{}] adding index [{}], preferred over [{}]", tribeName, tribeIndex.getIndex(), existingFromTribe);
+                                removeIndex(blocks, metaData, routingTable, tribeIndex);
                                 addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex);
-                            }
-                        } else {
-                            String existingFromTribe = indexMetaData.getSettings().get(TRIBE_NAME);
-                            if (!tribeName.equals(existingFromTribe)) {
-                                // we have a potential conflict on index names, decide what to do...
-                                if (ON_CONFLICT_ANY.equals(onConflict)) {
-                                    // we chose any tribe, carry on
-                                } else if (ON_CONFLICT_DROP.equals(onConflict)) {
-                                    // drop the indices, there is a conflict
-                                    logger.info("[{}] dropping index [{}] due to conflict with [{}]", tribeName, tribeIndex.getIndex(), existingFromTribe);
-                                    removeIndex(blocks, metaData, routingTable, tribeIndex);
-                                    droppedIndices.add(tribeIndex.getIndex());
-                                } else if (onConflict.startsWith(ON_CONFLICT_PREFER)) {
-                                    // on conflict, prefer a tribe...
-                                    String preferredTribeName = onConflict.substring(ON_CONFLICT_PREFER.length());
-                                    if (tribeName.equals(preferredTribeName)) {
-                                        // the new one is hte preferred one, replace...
-                                        logger.info("[{}] adding index [{}], preferred over [{}]", tribeName, tribeIndex.getIndex(), existingFromTribe);
-                                        removeIndex(blocks, metaData, routingTable, tribeIndex);
-                                        addNewIndex(tribeState, blocks, metaData, routingTable, tribeIndex);
-                                    } // else: either the existing one is the preferred one, or we haven't seen one, carry on
-                                }
-                            }
+                            } // else: either the existing one is the preferred one, or we haven't seen one, carry on
                         }
                     }
-
-                    return ClusterState.builder(currentState).incrementVersion().blocks(blocks).nodes(nodes).metaData(metaData).routingTable(routingTable.build()).build();
                 }
+            }
 
-                private void removeIndex(ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData index) {
-                    metaData.remove(index.getIndex());
-                    routingTable.remove(index.getIndex());
-                    blocks.removeIndexBlocks(index.getIndex());
-                }
+            if (!clusterStateChanged) {
+                return currentState;
+            } else {
+                return ClusterState.builder(currentState).incrementVersion().blocks(blocks).nodes(nodes).metaData(metaData).routingTable(routingTable.build()).build();
+            }
+        }
 
-                private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData tribeIndex) {
-                    Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME, tribeName).build();
-                    metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings));
-                    routingTable.add(tribeState.routingTable().index(tribeIndex.getIndex()));
-                    if (Regex.simpleMatch(blockIndicesMetadata, tribeIndex.getIndex())) {
-                        blocks.addIndexBlock(tribeIndex.getIndex(), IndexMetaData.INDEX_METADATA_BLOCK);
-                    }
-                    if (Regex.simpleMatch(blockIndicesRead, tribeIndex.getIndex())) {
-                        blocks.addIndexBlock(tribeIndex.getIndex(), IndexMetaData.INDEX_READ_BLOCK);
-                    }
-                    if (Regex.simpleMatch(blockIndicesWrite, tribeIndex.getIndex())) {
-                        blocks.addIndexBlock(tribeIndex.getIndex(), IndexMetaData.INDEX_WRITE_BLOCK);
-                    }
-                }
+        private void removeIndex(ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData index) {
+            metaData.remove(index.getIndex());
+            routingTable.remove(index.getIndex());
+            blocks.removeIndexBlocks(index.getIndex());
+        }
 
-                @Override
-                public void onFailure(String source, Throwable t) {
-                    logger.warn("failed to process [{}]", t, source);
-                }
-            });
+        private void addNewIndex(ClusterState tribeState, ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData tribeIndex) {
+            Settings tribeSettings = Settings.builder().put(tribeIndex.getSettings()).put(TRIBE_NAME, tribeName).build();
+            metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings));
+            routingTable.add(tribeState.routingTable().index(tribeIndex.getIndex()));
+            if (Regex.simpleMatch(blockIndicesMetadata, tribeIndex.getIndex())) {
+                blocks.addIndexBlock(tribeIndex.getIndex(), IndexMetaData.INDEX_METADATA_BLOCK);
+            }
+            if (Regex.simpleMatch(blockIndicesRead, tribeIndex.getIndex())) {
+                blocks.addIndexBlock(tribeIndex.getIndex(), IndexMetaData.INDEX_READ_BLOCK);
+            }
+            if (Regex.simpleMatch(blockIndicesWrite, tribeIndex.getIndex())) {
+                blocks.addIndexBlock(tribeIndex.getIndex(), IndexMetaData.INDEX_WRITE_BLOCK);
+            }
         }
     }
 }