瀏覽代碼

Prevent concurrent rollover race conditions (#67953)

We recently rewrote the `TransportRolloverAction` code to encapsulate the rollover into a single
cluster state. This is great, but we can do a bit better when there are concurrent rollover requests
that occur, specifically, we should meet both of the following criteria:

- Multiple concurrent unconditional rollovers should generate multiple rollovers without any concurrent
modification exceptions.
- Multiple concurrent rollovers with conditions should roll over exactly once, assuming the
condition is met only once

This commit changes the `TransportRolloverAction` code to reach these goals. It does it by treating
the cluster state submittion as a pseudo-"synchronized" block. This means that we calculate the
source, destination, and conditions met *prior* to performing the cluster state update. If the
conditions are met, we invoke a cluster state update. Inside the cluster state update (serialized,
so essentially inside our "synchronized" block) we recalculate the source, destination, and
contitions met, and only proceed with the rollover if the conditions on met.

As a byproduct of this, it also fixes issues where rollover could happen but the response returned
incorrect information such as conditions that weren't actually met or an incorrect
source/destination index.

I've also tried to document (with comments) the thought process and how this code works for future
generations.

This also adds a test with multiple threads all trying to invoke a rollover that ensures that only a
single rollover happens in the case conditions are specified.

Resolves #67836
Resloves #64921
Lee Hinman 4 年之前
父節點
當前提交
6edd2e47cc

+ 75 - 5
server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/rollover/RolloverIT.java

@@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.indices.rollover;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.action.admin.indices.alias.Alias;
 import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
@@ -30,6 +31,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.AutoExpandReplicas;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.time.DateFormatter;
@@ -47,6 +49,10 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -197,7 +203,7 @@ public class RolloverIT extends ESIntegTestCase {
         indexDoc("test_index-2", "1", "field", "value");
         flush("test_index-2");
         final Settings settings = Settings.builder()
-            .put("number_of_shards", 1) 
+            .put("number_of_shards", 1)
             .put("number_of_replicas", 0)
             .build();
         final RolloverResponse response = client().admin().indices().prepareRolloverIndex("test_alias")
@@ -445,13 +451,12 @@ public class RolloverIT extends ESIntegTestCase {
         assertAcked(client().admin().indices().prepareCreate("logs-000001").get());
         ensureYellow("logs-write");
         final IllegalArgumentException error = expectThrows(IllegalArgumentException.class,
-            () -> client().admin().indices().prepareRolloverIndex("logs-write").addMaxIndexSizeCondition(new ByteSizeValue(1)).get());
+            () -> client().admin().indices().prepareRolloverIndex("logs-write").get());
         assertThat(error.getMessage(), equalTo(
             "Rollover alias [logs-write] can point to multiple indices, found duplicated alias [[logs-write]] in index template [logs]"));
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/64921")
-    public void testRolloverWithClosedIndexInAlias() throws Exception {
+    public void testRolloverWithClosedIndexInAlias() {
         final String aliasName = "alias";
         final String openNonwriteIndex = "open-index-nonwrite";
         final String closedIndex = "closed-index-nonwrite";
@@ -459,13 +464,14 @@ public class RolloverIT extends ESIntegTestCase {
         assertAcked(prepareCreate(openNonwriteIndex).addAlias(new Alias(aliasName)).get());
         assertAcked(prepareCreate(closedIndex).addAlias(new Alias(aliasName)).get());
         assertAcked(prepareCreate(writeIndexPrefix + "000001").addAlias(new Alias(aliasName).writeIndex(true)).get());
+        ensureGreen();
 
         index(closedIndex, null, "{\"foo\": \"bar\"}");
         index(aliasName, null, "{\"foo\": \"bar\"}");
         index(aliasName, null, "{\"foo\": \"bar\"}");
         refresh(aliasName);
 
-        assertAcked(client().admin().indices().prepareClose(closedIndex).get());
+        assertAcked(client().admin().indices().prepareClose(closedIndex).setTimeout(TimeValue.timeValueSeconds(60)).get());
 
         RolloverResponse rolloverResponse = client().admin().indices().prepareRolloverIndex(aliasName)
             .addMaxIndexDocsCondition(1)
@@ -562,4 +568,68 @@ public class RolloverIT extends ESIntegTestCase {
         assertThat(oldIndex.getRolloverInfos().get(aliasName).getTime(),
             is(both(greaterThanOrEqualTo(beforeTime)).and(lessThanOrEqualTo(client().threadPool().absoluteTimeInMillis() + 1000L))));
     }
+
+    /**
+     * Tests that multiple threads all racing to rollover based on a condition trigger one and only one rollover
+     */
+    public void testMultiThreadedRollover() throws Exception {
+        final String aliasName = "alias";
+        final String writeIndexPrefix = "tt-";
+        assertAcked(prepareCreate(writeIndexPrefix + "000001").addAlias(new Alias(aliasName).writeIndex(true)).get());
+        ensureGreen();
+
+        final int threadCount = randomIntBetween(5, 10);
+        final CyclicBarrier barrier = new CyclicBarrier(threadCount + 1);
+        final AtomicBoolean running = new AtomicBoolean(true);
+        Set<Thread> threads = IntStream.range(0, threadCount)
+            .mapToObj(i -> new Thread(() -> {
+                try {
+                    logger.info("--> [{}] waiting for all the other threads before starting", i);
+                    barrier.await();
+                    while (running.get()) {
+                        RolloverResponse resp = client().admin().indices().prepareRolloverIndex(aliasName).
+                            addMaxIndexDocsCondition(1).get();
+                        if (resp.isRolledOver()) {
+                            logger.info("--> thread [{}] successfully rolled over: {}", i, Strings.toString(resp));
+                            assertThat(resp.getOldIndex(), equalTo(writeIndexPrefix + "000001"));
+                            assertThat(resp.getNewIndex(), equalTo(writeIndexPrefix + "000002"));
+                        }
+                    }
+                } catch (Exception e) {
+                    logger.error(new ParameterizedMessage("thread [{}] encountered unexpected exception", i), e);
+                    fail("we should not encounter unexpected exceptions");
+                }
+            }, "rollover-thread-" + i))
+            .collect(Collectors.toSet());
+
+        threads.forEach(Thread::start);
+
+        // Okay, signal the floodgates to open
+        barrier.await();
+
+        index(aliasName, null, "{\"foo\": \"bar\"}");
+
+        assertBusy(() -> {
+            try {
+                client().admin().indices().prepareGetIndex().addIndices(writeIndexPrefix + "000002").get();
+            } catch (Exception e) {
+                logger.info("--> expecting second index to be created but it has not yet been created");
+                fail("expecting second index to exist");
+            }
+        });
+
+        // Tell everyone to stop trying to roll over
+        running.set(false);
+
+        threads.forEach(thread -> {
+            try {
+                thread.join(1000);
+            } catch (Exception e) {
+                logger.warn("expected thread to be stopped, but got", e);
+            }
+        });
+
+        // We should *NOT* have a third index, it should have rolled over *exactly* once
+        expectThrows(Exception.class, () -> client().admin().indices().prepareGetIndex().addIndices(writeIndexPrefix + "000003").get());
+    }
 }

+ 58 - 6
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java

@@ -107,7 +107,7 @@ public class MetadataRolloverService {
                 return rolloverAlias(currentState, (IndexAbstraction.Alias) indexAbstraction, rolloverTarget, newIndexName,
                     createIndexRequest, metConditions, silent, onlyValidate);
             case DATA_STREAM:
-                return  rolloverDataStream(currentState, (IndexAbstraction.DataStream) indexAbstraction, rolloverTarget,
+                return rolloverDataStream(currentState, (IndexAbstraction.DataStream) indexAbstraction, rolloverTarget,
                     createIndexRequest, metConditions, silent, onlyValidate);
             default:
                 // the validate method above prevents this case
@@ -115,12 +115,43 @@ public class MetadataRolloverService {
         }
     }
 
-    private RolloverResult rolloverAlias(ClusterState currentState, IndexAbstraction.Alias alias, String aliasName,
-                                         String newIndexName, CreateIndexRequest createIndexRequest, List<Condition<?>> metConditions,
-                                         boolean silent, boolean onlyValidate) throws Exception {
-        final Metadata metadata = currentState.metadata();
+    public void validateIndexName(ClusterState state, String index) {
+        createIndexService.validateIndexName(index, state);
+    }
+
+    /**
+     * Returns the names that rollover would use, but does not perform the actual rollover
+     */
+    public NameResolution resolveRolloverNames(ClusterState currentState, String rolloverTarget, String newIndexName,
+                                               CreateIndexRequest createIndexRequest) {
+        validate(currentState.metadata(), rolloverTarget, newIndexName, createIndexRequest);
+        final IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(rolloverTarget);
+        switch (indexAbstraction.getType()) {
+            case ALIAS:
+                return resolveAliasRolloverNames((IndexAbstraction.Alias) indexAbstraction, newIndexName);
+            case DATA_STREAM:
+                return resolveDataStreamRolloverNames(currentState, (IndexAbstraction.DataStream) indexAbstraction);
+            default:
+                // the validate method above prevents this case
+                throw new IllegalStateException("unable to roll over type [" + indexAbstraction.getType().getDisplayName() + "]");
+        }
+    }
+
+    public static class NameResolution {
+        final String sourceName;
+        @Nullable
+        final String unresolvedName;
+        final String rolloverName;
+
+        NameResolution(String sourceName, String unresolvedName, String rolloverName) {
+            this.sourceName = sourceName;
+            this.unresolvedName = unresolvedName;
+            this.rolloverName = rolloverName;
+        }
+    }
+
+    private NameResolution resolveAliasRolloverNames(IndexAbstraction.Alias alias, String newIndexName) {
         final IndexMetadata writeIndex = alias.getWriteIndex();
-        final AliasMetadata aliasMetadata = writeIndex.getAliases().get(alias.getName());
         final String sourceProvidedName = writeIndex.getSettings().get(IndexMetadata.SETTING_INDEX_PROVIDED_NAME,
             writeIndex.getIndex().getName());
         final String sourceIndexName = writeIndex.getIndex().getName();
@@ -128,6 +159,27 @@ public class MetadataRolloverService {
             ? newIndexName
             : generateRolloverIndexName(sourceProvidedName, indexNameExpressionResolver);
         final String rolloverIndexName = indexNameExpressionResolver.resolveDateMathExpression(unresolvedName);
+        return new NameResolution(sourceIndexName, unresolvedName, rolloverIndexName);
+    }
+
+    private NameResolution resolveDataStreamRolloverNames(ClusterState currentState, IndexAbstraction.DataStream dataStream) {
+        final Version minNodeVersion = currentState.nodes().getMinNodeVersion();
+        final DataStream ds = dataStream.getDataStream();
+        final IndexMetadata originalWriteIndex = dataStream.getWriteIndex();
+        final DataStream rolledDataStream = ds.rollover("uuid", minNodeVersion);
+        return new NameResolution(originalWriteIndex.getIndex().getName(), null, rolledDataStream.getWriteIndex().getName());
+    }
+
+    private RolloverResult rolloverAlias(ClusterState currentState, IndexAbstraction.Alias alias, String aliasName,
+                                         String newIndexName, CreateIndexRequest createIndexRequest, List<Condition<?>> metConditions,
+                                         boolean silent, boolean onlyValidate) throws Exception {
+        final NameResolution names = resolveAliasRolloverNames(alias, newIndexName);
+        final String sourceIndexName = names.sourceName;
+        final String rolloverIndexName = names.rolloverName;
+        final String unresolvedName = names.unresolvedName;
+        final Metadata metadata = currentState.metadata();
+        final IndexMetadata writeIndex = alias.getWriteIndex();
+        final AliasMetadata aliasMetadata = writeIndex.getAliases().get(alias.getName());
         final boolean explicitWriteIndex = Boolean.TRUE.equals(aliasMetadata.writeIndex());
         final Boolean isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.exists(createIndexRequest.settings()) ?
             IndexMetadata.INDEX_HIDDEN_SETTING.get(createIndexRequest.settings()) : null;

+ 120 - 68
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java

@@ -21,7 +21,7 @@ package org.elasticsearch.action.admin.indices.rollover;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.elasticsearch.ElasticsearchException;
+import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
@@ -48,7 +48,6 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -85,85 +84,138 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
     }
 
     @Override
-    protected void masterOperation(Task task, final RolloverRequest rolloverRequest, final ClusterState state,
+    protected void masterOperation(Task task, final RolloverRequest rolloverRequest, final ClusterState oldState,
                                    final ActionListener<RolloverResponse> listener) throws Exception {
 
-        MetadataRolloverService.RolloverResult preResult =
-            rolloverService.rolloverClusterState(state,
-                rolloverRequest.getRolloverTarget(), rolloverRequest.getNewIndexName(), rolloverRequest.getCreateIndexRequest(),
-                Collections.emptyList(), true, true);
-        logger.trace("rollover pre-result [{}]", preResult);
-        Metadata metadata = state.metadata();
-        String sourceIndexName = preResult.sourceIndexName;
-        String rolloverIndexName = preResult.rolloverIndexName;
+        Metadata metadata = oldState.metadata();
+
         IndicesStatsRequest statsRequest = new IndicesStatsRequest().indices(rolloverRequest.getRolloverTarget())
             .clear()
             .indicesOptions(IndicesOptions.fromOptions(true, false, true, true))
             .docs(true);
         statsRequest.setParentTask(clusterService.localNode().getId(), task.getId());
+        // Rollover can sometimes happen concurrently, to handle these cases, we treat rollover in the same way we would treat a
+        // "synchronized" block, in that we have a "before" world, where we calculate naming and condition matching, we then enter our
+        // synchronization (in this case, the submitStateUpdateTask which is serialized on the master node), where we then regenerate the
+        // names and re-check conditions. More explanation follows inline below.
         client.execute(IndicesStatsAction.INSTANCE, statsRequest,
-            new ActionListener<>() {
-                @Override
-                public void onResponse(IndicesStatsResponse statsResponse) {
-                    final Map<String, Boolean> conditionResults = evaluateConditions(rolloverRequest.getConditions().values(),
-                        metadata.index(sourceIndexName), statsResponse);
-
-                    if (rolloverRequest.isDryRun()) {
-                        listener.onResponse(
-                            new RolloverResponse(sourceIndexName, rolloverIndexName, conditionResults, true, false, false, false));
-                        return;
-                    }
-                    List<Condition<?>> metConditions = rolloverRequest.getConditions().values().stream()
-                        .filter(condition -> conditionResults.get(condition.toString())).collect(Collectors.toList());
-                    if (conditionResults.size() == 0 || metConditions.size() > 0) {
-                        clusterService.submitStateUpdateTask("rollover_index source [" + sourceIndexName + "] to target ["
-                            + rolloverIndexName + "]", new ClusterStateUpdateTask() {
-                            @Override
-                            public ClusterState execute(ClusterState currentState) throws Exception {
-                                MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState(currentState,
-                                    rolloverRequest.getRolloverTarget(), rolloverRequest.getNewIndexName(),
-                                    rolloverRequest.getCreateIndexRequest(), metConditions, false, false);
+            ActionListener.wrap(statsResponse -> {
+                // Now that we have the stats for the cluster, we need to know the
+                // names of the index for which we should evaluate
+                // conditions, as well as what our newly created index *would* be.
+                final MetadataRolloverService.NameResolution trialRolloverNames =
+                    rolloverService.resolveRolloverNames(oldState, rolloverRequest.getRolloverTarget(),
+                        rolloverRequest.getNewIndexName(), rolloverRequest.getCreateIndexRequest());
+                final String trialSourceIndexName = trialRolloverNames.sourceName;
+                final String trialRolloverIndexName = trialRolloverNames.rolloverName;
+
+                rolloverService.validateIndexName(oldState, trialRolloverIndexName);
+
+                // Evaluate the conditions, so that we can tell without a cluster state update whether a rollover would occur.
+                final Map<String, Boolean> trialConditionResults = evaluateConditions(rolloverRequest.getConditions().values(),
+                    metadata.index(trialSourceIndexName), statsResponse);
+
+                // If this is a dry run, return with the results without invoking a cluster state update
+                if (rolloverRequest.isDryRun()) {
+                    listener.onResponse(new RolloverResponse(trialSourceIndexName, trialRolloverIndexName,
+                        trialConditionResults, true, false, false, false));
+                    return;
+                }
+
+                // Holders for what our final source and rolled over index names are as well as the
+                // conditions met to cause the rollover, these are needed so we wait on and report
+                // the correct indices and conditions in the clusterStateProcessed method
+                final SetOnce<String> sourceIndex = new SetOnce<>();
+                final SetOnce<String> rolloverIndex = new SetOnce<>();
+                final SetOnce<Map<String, Boolean>> conditionResults = new SetOnce<>();
+
+                final List<Condition<?>> trialMetConditions = rolloverRequest.getConditions().values().stream()
+                    .filter(condition -> trialConditionResults.get(condition.toString())).collect(Collectors.toList());
+
+                // Pre-check the conditions to see whether we should submit a new cluster state task
+                if (trialConditionResults.size() == 0 || trialMetConditions.size() > 0) {
+
+                    // Submit the cluster state, this can be thought of as a "synchronized"
+                    // block in that it is single-threaded on the master node
+                    clusterService.submitStateUpdateTask("rollover_index source [" + trialRolloverIndexName + "] to target ["
+                        + trialRolloverIndexName + "]", new ClusterStateUpdateTask() {
+                        @Override
+                        public ClusterState execute(ClusterState currentState) throws Exception {
+                            // Regenerate the rollover names, as a rollover could have happened
+                            // in between the pre-check and the cluster state update
+                            final MetadataRolloverService.NameResolution rolloverNames =
+                                rolloverService.resolveRolloverNames(currentState, rolloverRequest.getRolloverTarget(),
+                                    rolloverRequest.getNewIndexName(), rolloverRequest.getCreateIndexRequest());
+                            final String sourceIndexName = rolloverNames.sourceName;
+
+                            // Re-evaluate the conditions, now with our final source index name
+                            final Map<String, Boolean> postConditionResults = evaluateConditions(rolloverRequest.getConditions().values(),
+                                metadata.index(sourceIndexName), statsResponse);
+                            final List<Condition<?>> metConditions = rolloverRequest.getConditions().values().stream()
+                                .filter(condition -> postConditionResults.get(condition.toString())).collect(Collectors.toList());
+                            // Update the final condition results so they can be used when returning the response
+                            conditionResults.set(postConditionResults);
+
+                            if (postConditionResults.size() == 0 || metConditions.size() > 0) {
+                                // Perform the actual rollover
+                                MetadataRolloverService.RolloverResult rolloverResult =
+                                    rolloverService.rolloverClusterState(currentState, rolloverRequest.getRolloverTarget(),
+                                        rolloverRequest.getNewIndexName(),
+                                        rolloverRequest.getCreateIndexRequest(), metConditions, false, false);
                                 logger.trace("rollover result [{}]", rolloverResult);
-                                if (rolloverResult.sourceIndexName.equals(sourceIndexName) == false) {
-                                    throw new ElasticsearchException("Concurrent modification of alias or data stream [{}] during " +
-                                        "rollover (expected [{}] but found [{}])",
-                                        rolloverRequest.getRolloverTarget(), sourceIndexName, rolloverResult.sourceIndexName);
-                                }
-                                return rolloverResult.clusterState;
-                            }
 
-                            @Override
-                            public void onFailure(String source, Exception e) {
-                                listener.onFailure(e);
-                            }
+                                // Update the "final" source and resulting rollover index names.
+                                // Note that we use the actual rollover result for these, because
+                                // even though we're single threaded, it's possible for the
+                                // rollover names generated before the actual rollover to be
+                                // different due to things like date resolution
+                                sourceIndex.set(rolloverResult.sourceIndexName);
+                                rolloverIndex.set(rolloverResult.rolloverIndexName);
 
-                            @Override
-                            public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                                if (newState.equals(oldState) == false) {
-                                    activeShardsObserver.waitForActiveShards(new String[]{rolloverIndexName},
-                                        rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
-                                        rolloverRequest.masterNodeTimeout(),
-                                        isShardsAcknowledged -> listener.onResponse(new RolloverResponse(
-                                            sourceIndexName, rolloverIndexName, conditionResults, false, true, true,
-                                            isShardsAcknowledged)),
-                                        listener::onFailure);
-                                }
+                                // Return the new rollover cluster state, which includes the changes that create the new index
+                                return rolloverResult.clusterState;
+                            } else {
+                                // Upon re-evaluation of the conditions, none were met, so
+                                // therefore do not perform a rollover, returning the current
+                                // cluster state.
+                                return currentState;
                             }
-                        });
-                    } else {
-                        // conditions not met
-                        listener.onResponse(
-                            new RolloverResponse(sourceIndexName, rolloverIndexName, conditionResults, false, false, false, false)
-                        );
-                    }
-                }
-
-                @Override
-                public void onFailure(Exception e) {
-                    listener.onFailure(e);
+                        }
+
+                        @Override
+                        public void onFailure(String source, Exception e) {
+                            listener.onFailure(e);
+                        }
+
+                        @Override
+                        public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                            // Now assuming we have a new state and the name of the rolled over index, we need to wait for the
+                            // configured number of active shards, as well as return the names of the indices that were rolled/created
+                            if (newState.equals(oldState) == false) {
+                                assert sourceIndex.get() != null : "source index missing on successful rollover";
+                                assert rolloverIndex.get() != null : "rollover index missing on successful rollover";
+                                assert conditionResults.get() != null : "matching rollover conditions missing on successful rollover";
+
+                                activeShardsObserver.waitForActiveShards(new String[]{rolloverIndex.get()},
+                                    rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
+                                    rolloverRequest.masterNodeTimeout(),
+                                    isShardsAcknowledged -> listener.onResponse(new RolloverResponse(
+                                        sourceIndex.get(), rolloverIndex.get(), conditionResults.get(), false, true, true,
+                                        isShardsAcknowledged)),
+                                    listener::onFailure);
+                            } else {
+                                // We did not roll over due to conditions not being met inside the cluster state update
+                                listener.onResponse(new RolloverResponse(
+                                    trialSourceIndexName, trialRolloverIndexName, trialConditionResults, false, false, false, false));
+                            }
+                        }
+                    });
+                } else {
+                    // conditions not met
+                    listener.onResponse(new RolloverResponse(trialSourceIndexName, trialRolloverIndexName,
+                        trialConditionResults, false, false, false, false));
                 }
-            }
-        );
+            }, listener::onFailure));
     }
 
     static Map<String, Boolean> evaluateConditions(final Collection<Condition<?>> conditions,

+ 72 - 0
x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java

@@ -5,6 +5,7 @@
  */
 package org.elasticsearch.datastreams;
 
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.search.TotalHits;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ExceptionsHelper;
@@ -71,6 +72,11 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.elasticsearch.action.DocWriteRequest.OpType.CREATE;
 import static org.elasticsearch.cluster.DataStreamTestHelper.generateMapping;
@@ -1147,6 +1153,72 @@ public class DataStreamIT extends ESIntegTestCase {
         assertThat(state.metadata().dataStreams().get("metrics-foo").getName(), equalTo("metrics-foo"));
     }
 
+    /**
+     * Tests that multiple threads all racing to rollover based on a condition trigger one and only one rollover
+     */
+    public void testMultiThreadedRollover() throws Exception {
+        final String dsName = "potato-biscuit";
+        putComposableIndexTemplate("id1", List.of("potato-*"));
+
+        ensureGreen();
+
+        final int threadCount = randomIntBetween(5, 10);
+        final CyclicBarrier barrier = new CyclicBarrier(threadCount + 1);
+        final AtomicBoolean running = new AtomicBoolean(true);
+        Set<Thread> threads = IntStream.range(0, threadCount).mapToObj(i -> new Thread(() -> {
+            try {
+                logger.info("--> [{}] waiting for all the other threads before starting", i);
+                barrier.await();
+                while (running.get()) {
+                    RolloverResponse resp = client().admin().indices().prepareRolloverIndex(dsName).addMaxIndexDocsCondition(2).get();
+                    if (resp.isRolledOver()) {
+                        logger.info("--> thread [{}] successfully rolled over: {}", i, Strings.toString(resp));
+                        assertThat(resp.getOldIndex(), equalTo(DataStream.getDefaultBackingIndexName("potato-biscuit", 1)));
+                        assertThat(resp.getNewIndex(), equalTo(DataStream.getDefaultBackingIndexName("potato-biscuit", 2)));
+                    }
+                }
+            } catch (Exception e) {
+                logger.error(new ParameterizedMessage("thread [{}] encountered unexpected exception", i), e);
+                fail("we should not encounter unexpected exceptions");
+            }
+        }, "rollover-thread-" + i)).collect(Collectors.toSet());
+
+        threads.forEach(Thread::start);
+
+        indexDocs(dsName, 1);
+
+        // Okay, signal the floodgates to open
+        barrier.await();
+
+        indexDocs(dsName, 1);
+
+        assertBusy(() -> {
+            try {
+                client().admin().indices().prepareGetIndex().addIndices(DataStream.getDefaultBackingIndexName("potato-biscuit", 2)).get();
+            } catch (Exception e) {
+                logger.info("--> expecting second index to be created but it has not yet been created");
+                fail("expecting second index to exist");
+            }
+        });
+
+        // Tell everyone to stop trying to roll over
+        running.set(false);
+
+        threads.forEach(thread -> {
+            try {
+                thread.join(1000);
+            } catch (Exception e) {
+                logger.warn("expected thread to be stopped, but got", e);
+            }
+        });
+
+        // We should *NOT* have a third index, it should have rolled over *exactly* once
+        expectThrows(
+            Exception.class,
+            () -> client().admin().indices().prepareGetIndex().addIndices(DataStream.getDefaultBackingIndexName("potato-biscuit", 3)).get()
+        );
+    }
+
     private static void verifyResolvability(String dataStream, ActionRequestBuilder<?, ?> requestBuilder, boolean fail) {
         verifyResolvability(dataStream, requestBuilder, fail, 0);
     }