Переглянути джерело

Merge pull request #15023 from jasontedor/shard-started-batch

Use general cluster state batching mechanism for shard started
Jason Tedor 10 роки тому
батько
коміт
dd520161ab

+ 35 - 55
core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

@@ -36,14 +36,12 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.*;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
 
 import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;
 
@@ -60,8 +58,6 @@ public class ShardStateAction extends AbstractComponent {
     private final AllocationService allocationService;
     private final RoutingService routingService;
 
-    private final BlockingQueue<ShardRoutingEntry> startedShardsQueue = ConcurrentCollections.newBlockingQueue();
-
     @Inject
     public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
                             AllocationService allocationService, RoutingService routingService) {
@@ -155,7 +151,6 @@ public class ShardStateAction extends AbstractComponent {
             BatchResult.Builder<ShardRoutingEntry> batchResultBuilder = BatchResult.builder();
             List<FailedRerouteAllocation.FailedShard> shardRoutingsToBeApplied = new ArrayList<>(tasks.size());
             for (ShardRoutingEntry task : tasks) {
-                task.processed = true;
                 shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure));
             }
             ClusterState maybeUpdatedState = currentState;
@@ -185,60 +180,47 @@ public class ShardStateAction extends AbstractComponent {
         }
     }
 
+    private final ShardStartedClusterStateHandler shardStartedClusterStateHandler =
+            new ShardStartedClusterStateHandler();
+
     private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
         logger.debug("received shard started for {}", shardRoutingEntry);
-        // buffer shard started requests, and the state update tasks will simply drain it
-        // this is to optimize the number of "started" events we generate, and batch them
-        // possibly, we can do time based batching as well, but usually, we would want to
-        // process started events as fast as possible, to make shards available
-        startedShardsQueue.add(shardRoutingEntry);
-
-        clusterService.submitStateUpdateTask("shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]",
-                new ClusterStateUpdateTask() {
-                    @Override
-                    public Priority priority() {
-                        return Priority.URGENT;
-                    }
-
-                    @Override
-                    public ClusterState execute(ClusterState currentState) {
-
-                        if (shardRoutingEntry.processed) {
-                            return currentState;
-                        }
-
-                        List<ShardRoutingEntry> shardRoutingEntries = new ArrayList<>();
-                        startedShardsQueue.drainTo(shardRoutingEntries);
-
-                        // nothing to process (a previous event has processed it already)
-                        if (shardRoutingEntries.isEmpty()) {
-                            return currentState;
-                        }
 
-                        List<ShardRouting> shardRoutingToBeApplied = new ArrayList<>(shardRoutingEntries.size());
-
-                        // mark all entries as processed
-                        for (ShardRoutingEntry entry : shardRoutingEntries) {
-                            entry.processed = true;
-                            shardRoutingToBeApplied.add(entry.shardRouting);
-                        }
+        clusterService.submitStateUpdateTask(
+                "shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]",
+                shardRoutingEntry,
+                ClusterStateTaskConfig.build(Priority.URGENT),
+                shardStartedClusterStateHandler,
+                shardStartedClusterStateHandler);
+    }
 
-                        if (shardRoutingToBeApplied.isEmpty()) {
-                            return currentState;
-                        }
+    class ShardStartedClusterStateHandler implements ClusterStateTaskExecutor<ShardRoutingEntry>, ClusterStateTaskListener {
+        @Override
+        public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception {
+            BatchResult.Builder<ShardRoutingEntry> builder = BatchResult.builder();
+            List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(tasks.size());
+            for (ShardRoutingEntry task : tasks) {
+                shardRoutingsToBeApplied.add(task.shardRouting);
+            }
+            ClusterState maybeUpdatedState = currentState;
+            try {
+                RoutingAllocation.Result result =
+                    allocationService.applyStartedShards(currentState, shardRoutingsToBeApplied, true);
+                if (result.changed()) {
+                    maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build();
+                }
+                builder.successes(tasks);
+            } catch (Throwable t) {
+                builder.failures(tasks, t);
+            }
 
-                        RoutingAllocation.Result routingResult = allocationService.applyStartedShards(currentState, shardRoutingToBeApplied, true);
-                        if (!routingResult.changed()) {
-                            return currentState;
-                        }
-                        return ClusterState.builder(currentState).routingResult(routingResult).build();
-                    }
+            return builder.build(maybeUpdatedState);
+        }
 
-                    @Override
-                    public void onFailure(String source, Throwable t) {
-                        logger.error("unexpected failure during [{}]", t, source);
-                    }
-                });
+        @Override
+        public void onFailure(String source, Throwable t) {
+            logger.error("unexpected failure during [{}]", t, source);
+        }
     }
 
     private class ShardFailedTransportHandler implements TransportRequestHandler<ShardRoutingEntry> {
@@ -266,8 +248,6 @@ public class ShardStateAction extends AbstractComponent {
         String message;
         Throwable failure;
 
-        volatile boolean processed; // state field, no need to serialize
-
         public ShardRoutingEntry() {
         }