Browse Source

Batch rollover cluster state updates. (#79945)

In cases where many indices are managed by ILM,
it is likely that rollovers for different indices or data streams
happen concurrently. This change allows the cluster state updates
that these rollovers generate to be batch.

This change also changes the rollover service to not do a reroute and
instead perform a single reroute for multiple batched rollovers.

Relates to #77466
Closes to #79782
Martijn van Groningen 4 years ago
parent
commit
ce1ed43d40

+ 11 - 0
server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java

@@ -46,6 +46,8 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ
 
     private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
 
+    private boolean performReroute = true;
+
     public CreateIndexClusterStateUpdateRequest(String cause, String index, String providedName) {
         this.cause = cause;
         this.index = index;
@@ -175,6 +177,15 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ
         return this;
     }
 
+    public boolean performReroute() {
+        return performReroute;
+    }
+
+    public CreateIndexClusterStateUpdateRequest performReroute(boolean performReroute) {
+        this.performReroute = performReroute;
+        return this;
+    }
+
     @Override
     public String toString() {
         return "CreateIndexClusterStateUpdateRequest{"

+ 2 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java

@@ -367,7 +367,8 @@ public class MetadataRolloverService {
             .settings(b.build())
             .aliases(createIndexRequest.aliases())
             .waitForActiveShards(ActiveShardCount.NONE) // not waiting for shards here, will wait on the alias switch operation
-            .mappings(createIndexRequest.mappings());
+            .mappings(createIndexRequest.mappings())
+            .performReroute(false);
     }
 
     /**

+ 174 - 130
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java

@@ -23,13 +23,18 @@ import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.ClusterStateTaskConfig;
+import org.elasticsearch.cluster.ClusterStateTaskExecutor;
+import org.elasticsearch.cluster.ClusterStateTaskListener;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.core.Nullable;
@@ -44,6 +49,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 /**
@@ -52,9 +58,12 @@ import java.util.stream.Collectors;
 public class TransportRolloverAction extends TransportMasterNodeAction<RolloverRequest, RolloverResponse> {
 
     private static final Logger logger = LogManager.getLogger(TransportRolloverAction.class);
+    private static final ClusterStateTaskConfig ROLLOVER_TASK_CONFIG = ClusterStateTaskConfig.build(Priority.NORMAL);
+
     private final MetadataRolloverService rolloverService;
     private final ActiveShardsObserver activeShardsObserver;
     private final Client client;
+    private final ClusterStateTaskExecutor<RolloverTask> rolloverTaskExecutor;
 
     @Inject
     public TransportRolloverAction(
@@ -64,7 +73,8 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
         ActionFilters actionFilters,
         IndexNameExpressionResolver indexNameExpressionResolver,
         MetadataRolloverService rolloverService,
-        Client client
+        Client client,
+        AllocationService allocationService
     ) {
         super(
             RolloverAction.NAME,
@@ -80,6 +90,33 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
         this.rolloverService = rolloverService;
         this.client = client;
         this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
+        this.rolloverTaskExecutor = (currentState, tasks) -> {
+            ClusterStateTaskExecutor.ClusterTasksResult.Builder<RolloverTask> builder = ClusterStateTaskExecutor.ClusterTasksResult
+                .builder();
+            ClusterState state = currentState;
+            for (RolloverTask task : tasks) {
+                try {
+                    state = task.performRollover(state);
+                    builder.success(task);
+                } catch (Exception e) {
+                    builder.failure(task, e);
+                }
+            }
+
+            if (state != currentState) {
+                var reason = new StringBuilder();
+                Strings.collectionToDelimitedStringWithLimit(
+                    (Iterable<String>) () -> tasks.stream().map(t -> t.sourceIndex.get() + "->" + t.rolloverIndex.get()).iterator(),
+                    ",",
+                    "bulk rollover [",
+                    "]",
+                    1024,
+                    reason
+                );
+                state = allocationService.reroute(state, reason.toString());
+            }
+            return builder.build(state);
+        };
     }
 
     @Override
@@ -150,144 +187,30 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
                     return;
                 }
 
-                // Holders for what our final source and rolled over index names are as well as the
-                // conditions met to cause the rollover, these are needed so we wait on and report
-                // the correct indices and conditions in the clusterStateProcessed method
-                final SetOnce<String> sourceIndex = new SetOnce<>();
-                final SetOnce<String> rolloverIndex = new SetOnce<>();
-                final SetOnce<Map<String, Boolean>> conditionResults = new SetOnce<>();
-
                 final List<Condition<?>> trialMetConditions = rolloverRequest.getConditions()
                     .values()
                     .stream()
                     .filter(condition -> trialConditionResults.get(condition.toString()))
                     .collect(Collectors.toList());
 
+                final RolloverResponse trailRolloverResponse = new RolloverResponse(
+                    trialSourceIndexName,
+                    trialRolloverIndexName,
+                    trialConditionResults,
+                    false,
+                    false,
+                    false,
+                    false
+                );
+
                 // Pre-check the conditions to see whether we should submit a new cluster state task
                 if (trialConditionResults.size() == 0 || trialMetConditions.size() > 0) {
-
-                    // Submit the cluster state, this can be thought of as a "synchronized"
-                    // block in that it is single-threaded on the master node
-                    clusterService.submitStateUpdateTask(
-                        "rollover_index source [" + trialRolloverIndexName + "] to target [" + trialRolloverIndexName + "]",
-                        new ClusterStateUpdateTask() {
-                            @Override
-                            public ClusterState execute(ClusterState currentState) throws Exception {
-                                // Regenerate the rollover names, as a rollover could have happened
-                                // in between the pre-check and the cluster state update
-                                final MetadataRolloverService.NameResolution rolloverNames = rolloverService.resolveRolloverNames(
-                                    currentState,
-                                    rolloverRequest.getRolloverTarget(),
-                                    rolloverRequest.getNewIndexName(),
-                                    rolloverRequest.getCreateIndexRequest()
-                                );
-                                final String sourceIndexName = rolloverNames.sourceName;
-
-                                // Re-evaluate the conditions, now with our final source index name
-                                final Map<String, Boolean> postConditionResults = evaluateConditions(
-                                    rolloverRequest.getConditions().values(),
-                                    buildStats(metadata.index(sourceIndexName), statsResponse)
-                                );
-                                final List<Condition<?>> metConditions = rolloverRequest.getConditions()
-                                    .values()
-                                    .stream()
-                                    .filter(condition -> postConditionResults.get(condition.toString()))
-                                    .collect(Collectors.toList());
-                                // Update the final condition results so they can be used when returning the response
-                                conditionResults.set(postConditionResults);
-
-                                if (postConditionResults.size() == 0 || metConditions.size() > 0) {
-                                    // Perform the actual rollover
-                                    MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState(
-                                        currentState,
-                                        rolloverRequest.getRolloverTarget(),
-                                        rolloverRequest.getNewIndexName(),
-                                        rolloverRequest.getCreateIndexRequest(),
-                                        metConditions,
-                                        false,
-                                        false
-                                    );
-                                    logger.trace("rollover result [{}]", rolloverResult);
-
-                                    // Update the "final" source and resulting rollover index names.
-                                    // Note that we use the actual rollover result for these, because
-                                    // even though we're single threaded, it's possible for the
-                                    // rollover names generated before the actual rollover to be
-                                    // different due to things like date resolution
-                                    sourceIndex.set(rolloverResult.sourceIndexName);
-                                    rolloverIndex.set(rolloverResult.rolloverIndexName);
-
-                                    // Return the new rollover cluster state, which includes the changes that create the new index
-                                    return rolloverResult.clusterState;
-                                } else {
-                                    // Upon re-evaluation of the conditions, none were met, so
-                                    // therefore do not perform a rollover, returning the current
-                                    // cluster state.
-                                    return currentState;
-                                }
-                            }
-
-                            @Override
-                            public void onFailure(String source, Exception e) {
-                                listener.onFailure(e);
-                            }
-
-                            @Override
-                            public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                                // Now assuming we have a new state and the name of the rolled over index, we need to wait for the
-                                // configured number of active shards, as well as return the names of the indices that were rolled/created
-                                if (newState.equals(oldState) == false) {
-                                    assert sourceIndex.get() != null : "source index missing on successful rollover";
-                                    assert rolloverIndex.get() != null : "rollover index missing on successful rollover";
-                                    assert conditionResults.get() != null : "matching rollover conditions missing on successful rollover";
-
-                                    activeShardsObserver.waitForActiveShards(
-                                        new String[] { rolloverIndex.get() },
-                                        rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
-                                        rolloverRequest.masterNodeTimeout(),
-                                        isShardsAcknowledged -> listener.onResponse(
-                                            new RolloverResponse(
-                                                sourceIndex.get(),
-                                                rolloverIndex.get(),
-                                                conditionResults.get(),
-                                                false,
-                                                true,
-                                                true,
-                                                isShardsAcknowledged
-                                            )
-                                        ),
-                                        listener::onFailure
-                                    );
-                                } else {
-                                    // We did not roll over due to conditions not being met inside the cluster state update
-                                    listener.onResponse(
-                                        new RolloverResponse(
-                                            trialSourceIndexName,
-                                            trialRolloverIndexName,
-                                            trialConditionResults,
-                                            false,
-                                            false,
-                                            false,
-                                            false
-                                        )
-                                    );
-                                }
-                            }
-                        }
-                    );
+                    String source = "rollover_index source [" + trialRolloverIndexName + "] to target [" + trialRolloverIndexName + "]";
+                    RolloverTask rolloverTask = new RolloverTask(rolloverRequest, statsResponse, trailRolloverResponse, listener);
+                    clusterService.submitStateUpdateTask(source, rolloverTask, ROLLOVER_TASK_CONFIG, rolloverTaskExecutor, rolloverTask);
                 } else {
                     // conditions not met
-                    listener.onResponse(
-                        new RolloverResponse(
-                            trialSourceIndexName,
-                            trialRolloverIndexName,
-                            trialConditionResults,
-                            false,
-                            false,
-                            false,
-                            false
-                        )
-                    );
+                    listener.onResponse(trailRolloverResponse);
                 }
             }, listener::onFailure)
         );
@@ -333,4 +256,125 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
             );
         }
     }
+
+    class RolloverTask implements ClusterStateTaskListener {
+
+        private final RolloverRequest rolloverRequest;
+        private final IndicesStatsResponse statsResponse;
+        private final RolloverResponse trialRolloverResponse;
+        private final ActionListener<RolloverResponse> listener;
+
+        private final AtomicBoolean conditionsMet = new AtomicBoolean(false);
+        // Holders for what our final source and rolled over index names are as well as the
+        // conditions met to cause the rollover, these are needed so we wait on and report
+        // the correct indices and conditions in the clusterStateProcessed method
+        private final SetOnce<String> sourceIndex = new SetOnce<>();
+        private final SetOnce<String> rolloverIndex = new SetOnce<>();
+        private final SetOnce<Map<String, Boolean>> conditionResults = new SetOnce<>();
+
+        RolloverTask(
+            RolloverRequest rolloverRequest,
+            IndicesStatsResponse statsResponse,
+            RolloverResponse trialRolloverResponse,
+            ActionListener<RolloverResponse> listener
+        ) {
+            this.rolloverRequest = rolloverRequest;
+            this.statsResponse = statsResponse;
+            this.trialRolloverResponse = trialRolloverResponse;
+            this.listener = listener;
+        }
+
+        ClusterState performRollover(ClusterState currentState) throws Exception {
+            // Regenerate the rollover names, as a rollover could have happened
+            // in between the pre-check and the cluster state update
+            final MetadataRolloverService.NameResolution rolloverNames = rolloverService.resolveRolloverNames(
+                currentState,
+                rolloverRequest.getRolloverTarget(),
+                rolloverRequest.getNewIndexName(),
+                rolloverRequest.getCreateIndexRequest()
+            );
+            final String sourceIndexName = rolloverNames.sourceName;
+
+            // Re-evaluate the conditions, now with our final source index name
+            final Map<String, Boolean> postConditionResults = evaluateConditions(
+                rolloverRequest.getConditions().values(),
+                buildStats(currentState.metadata().index(sourceIndexName), statsResponse)
+            );
+            final List<Condition<?>> metConditions = rolloverRequest.getConditions()
+                .values()
+                .stream()
+                .filter(condition -> postConditionResults.get(condition.toString()))
+                .collect(Collectors.toList());
+            // Update the final condition results so they can be used when returning the response
+            conditionResults.set(postConditionResults);
+
+            if (postConditionResults.size() == 0 || metConditions.size() > 0) {
+                conditionsMet.set(true);
+                // Perform the actual rollover
+                MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState(
+                    currentState,
+                    rolloverRequest.getRolloverTarget(),
+                    rolloverRequest.getNewIndexName(),
+                    rolloverRequest.getCreateIndexRequest(),
+                    metConditions,
+                    false,
+                    false
+                );
+                logger.trace("rollover result [{}]", rolloverResult);
+
+                // Update the "final" source and resulting rollover index names.
+                // Note that we use the actual rollover result for these, because
+                // even though we're single threaded, it's possible for the
+                // rollover names generated before the actual rollover to be
+                // different due to things like date resolution
+                sourceIndex.set(rolloverResult.sourceIndexName);
+                rolloverIndex.set(rolloverResult.rolloverIndexName);
+
+                // Return the new rollover cluster state, which includes the changes that create the new index
+                return rolloverResult.clusterState;
+            } else {
+                // Upon re-evaluation of the conditions, none were met, so
+                // therefore do not perform a rollover, returning the current
+                // cluster state.
+                return currentState;
+            }
+        }
+
+        @Override
+        public void onFailure(String source, Exception e) {
+            listener.onFailure(e);
+        }
+
+        @Override
+        public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+            // Now assuming we have a new state and the name of the rolled over index, we need to wait for the
+            // configured number of active shards, as well as return the names of the indices that were rolled/created
+            if (conditionsMet.get()) {
+                assert sourceIndex.get() != null : "source index missing on successful rollover";
+                assert rolloverIndex.get() != null : "rollover index missing on successful rollover";
+                assert conditionResults.get() != null : "matching rollover conditions missing on successful rollover";
+
+                activeShardsObserver.waitForActiveShards(
+                    new String[] { rolloverIndex.get() },
+                    rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
+                    rolloverRequest.masterNodeTimeout(),
+                    isShardsAcknowledged -> listener.onResponse(
+                        new RolloverResponse(
+                            sourceIndex.get(),
+                            rolloverIndex.get(),
+                            conditionResults.get(),
+                            false,
+                            true,
+                            true,
+                            isShardsAcknowledged
+                        )
+                    ),
+                    listener::onFailure
+                );
+            } else {
+                // We did not roll over due to conditions not being met inside the cluster state update
+                listener.onResponse(trialRolloverResponse);
+            }
+        }
+    }
 }

+ 4 - 1
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java

@@ -478,7 +478,10 @@ public class MetadataCreateIndexService {
             );
 
             indexService.getIndexEventListener().beforeIndexAddedToCluster(indexMetadata.getIndex(), indexMetadata.getSettings());
-            return clusterStateCreateIndex(currentState, request.blocks(), indexMetadata, allocationService::reroute, metadataTransformer);
+            BiFunction<ClusterState, String, ClusterState> rerouteFunction = request.performReroute()
+                ? allocationService::reroute
+                : (cs, reason) -> cs;
+            return clusterStateCreateIndex(currentState, request.blocks(), indexMetadata, rerouteFunction, metadataTransformer);
         });
     }
 

+ 4 - 1
server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java

@@ -32,6 +32,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.UnassignedInfo;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.settings.Settings;
@@ -231,6 +232,7 @@ public class TransportRolloverActionTests extends ESTestCase {
         final MetadataIndexAliasesService mdIndexAliasesService = mock(MetadataIndexAliasesService.class);
 
         final Client mockClient = mock(Client.class);
+        final AllocationService mockAllocationService = mock(AllocationService.class);
 
         final Map<String, IndexStats> indexStats = new HashMap<>();
         int total = randomIntBetween(500, 1000);
@@ -279,7 +281,8 @@ public class TransportRolloverActionTests extends ESTestCase {
             mockActionFilters,
             mockIndexNameExpressionResolver,
             rolloverService,
-            mockClient
+            mockClient,
+            mockAllocationService
         );
 
         // For given alias, verify that condition evaluation fails when the condition doc count is greater than the primaries doc count