Browse Source

Bulk process of shard started/failed should not execute on already processed events
closes #5061

Shay Banon 11 years ago
parent
commit
152edd1804

+ 11 - 0
src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

@@ -118,6 +118,9 @@ public class ShardStateAction extends AbstractComponent {
         clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.reason + "]", Priority.HIGH, new ClusterStateUpdateTask() {
             @Override
             public ClusterState execute(ClusterState currentState) {
+                if (shardRoutingEntry.processed) {
+                    return currentState;
+                }
 
                 List<ShardRoutingEntry> shardRoutingEntries = new ArrayList<ShardRoutingEntry>();
                 failedShardQueue.drainTo(shardRoutingEntries);
@@ -132,6 +135,7 @@ public class ShardStateAction extends AbstractComponent {
                 List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<ShardRouting>(shardRoutingEntries.size());
                 for (int i = 0; i < shardRoutingEntries.size(); i++) {
                     ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i);
+                    shardRoutingEntry.processed = true;
                     ShardRouting shardRouting = shardRoutingEntry.shardRouting;
                     IndexMetaData indexMetaData = metaData.index(shardRouting.index());
                     // if there is no metadata or the current index is not of the right uuid, the index has been deleted while it was being allocated
@@ -175,6 +179,10 @@ public class ShardStateAction extends AbstractComponent {
                     @Override
                     public ClusterState execute(ClusterState currentState) {
 
+                        if (shardRoutingEntry.processed) {
+                            return currentState;
+                        }
+
                         List<ShardRoutingEntry> shardRoutingEntries = new ArrayList<ShardRoutingEntry>();
                         startedShardsQueue.drainTo(shardRoutingEntries);
 
@@ -190,6 +198,7 @@ public class ShardStateAction extends AbstractComponent {
 
                         for (int i = 0; i < shardRoutingEntries.size(); i++) {
                             ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i);
+                            shardRoutingEntry.processed = true;
                             ShardRouting shardRouting = shardRoutingEntry.shardRouting;
                             try {
                                 IndexMetaData indexMetaData = metaData.index(shardRouting.index());
@@ -305,6 +314,8 @@ public class ShardStateAction extends AbstractComponent {
 
         private String reason;
 
+        volatile boolean processed; // state field, no need to serialize
+
         private ShardRoutingEntry() {
         }