Bläddra i källkod

Fix Concurrent Index Auto Create Failing Bulk Requests (#82541)

Batching these requests introduced a bug where auto-create requests for system
indices would fail because system indices are always auto-created and thus
throw resource-already-exists if multiple equal ones are batched together
even though the index doesn't yet exist in the cluster state but only
in the intermediary task executor state.
This leads to bulk requests ignoring the exeception (thinking that the index
already exists) in their auto-create callback when the request doesn't yet
exist.
Fixed by deduplicating these requests for now, added a TODO to do it a little
nicer down the road but this fix is somewhat urgent as it breaks ML integ
tests.
Armin Braun 3 år sedan
förälder
incheckning
584df17c53

+ 32 - 0
server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CreateSystemIndicesIT.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.action.admin.indices.create;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.alias.Alias;
 import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
 import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
@@ -17,7 +18,11 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
 import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
 import org.elasticsearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction;
 import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
 import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
 import org.elasticsearch.cluster.metadata.MappingMetadata;
@@ -40,6 +45,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.indices.TestSystemIndexDescriptor.INDEX_NAME;
@@ -201,6 +208,31 @@ public class CreateSystemIndicesIT extends ESIntegTestCase {
         assertAliases(concreteIndex);
     }
 
+    public void testConcurrentAutoCreates() throws InterruptedException {
+        internalCluster().startNodes(3);
+
+        final Client client = client();
+        final int count = randomIntBetween(5, 30);
+        final CountDownLatch latch = new CountDownLatch(count);
+        final ActionListener<BulkResponse> listener = new ActionListener<>() {
+            @Override
+            public void onResponse(BulkResponse o) {
+                latch.countDown();
+                assertFalse(o.hasFailures());
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                latch.countDown();
+                throw new AssertionError(e);
+            }
+        };
+        for (int i = 0; i < count; i++) {
+            client.bulk(new BulkRequest().add(new IndexRequest(INDEX_NAME).source(Map.of("foo", "bar"))), listener);
+        }
+        assertTrue(latch.await(30L, TimeUnit.SECONDS));
+    }
+
     /**
      * Make sure that aliases are created hidden
      */

+ 157 - 133
server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java

@@ -44,6 +44,8 @@ import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
@@ -72,7 +74,7 @@ public final class AutoCreateAction extends ActionType<CreateIndexResponse> {
         private final AutoCreateIndex autoCreateIndex;
         private final SystemIndices systemIndices;
 
-        private final ClusterStateTaskExecutor<AckedClusterStateUpdateTask> executor;
+        private final ClusterStateTaskExecutor<CreateIndexTask> executor;
 
         @Inject
         public TransportAction(
@@ -104,17 +106,28 @@ public final class AutoCreateAction extends ActionType<CreateIndexResponse> {
             this.metadataCreateDataStreamService = metadataCreateDataStreamService;
             this.autoCreateIndex = autoCreateIndex;
             executor = (currentState, tasks) -> {
-                ClusterTasksResult.Builder<AckedClusterStateUpdateTask> builder = ClusterTasksResult.builder();
+                ClusterTasksResult.Builder<CreateIndexTask> builder = ClusterTasksResult.builder();
                 ClusterState state = currentState;
-                for (AckedClusterStateUpdateTask task : tasks) {
+                final Map<CreateIndexRequest, CreateIndexTask> successfulRequests = new HashMap<>(tasks.size());
+                for (CreateIndexTask task : tasks) {
                     try {
-                        state = task.execute(state);
+                        final CreateIndexTask successfulBefore = successfulRequests.putIfAbsent(task.request, task);
+                        if (successfulBefore == null) {
+                            state = task.execute(state);
+                        } else {
+                            // TODO: clean this up to just deduplicate the task listener instead of setting the generated name from
+                            // duplicate tasks here and then waiting for shards to become available multiple times in parallel for
+                            // each duplicate task
+                            task.indexNameRef.set(successfulBefore.indexNameRef.get());
+                        }
                         builder.success(task);
                     } catch (Exception e) {
                         builder.failure(task, e);
                     }
                 }
-                state = allocationService.reroute(state, "auto-create");
+                if (state != currentState) {
+                    state = allocationService.reroute(state, "auto-create");
+                }
                 return builder.build(state);
             };
         }
@@ -135,157 +148,168 @@ public final class AutoCreateAction extends ActionType<CreateIndexResponse> {
                         new String[] { indexName },
                         ActiveShardCount.DEFAULT,
                         request.timeout(),
-                        shardsAcked -> { finalListener.onResponse(new CreateIndexResponse(true, shardsAcked, indexName)); },
+                        shardsAcked -> finalListener.onResponse(new CreateIndexResponse(true, shardsAcked, indexName)),
                         finalListener::onFailure
                     );
                 } else {
                     finalListener.onResponse(new CreateIndexResponse(false, false, indexName));
                 }
             }, finalListener::onFailure);
-            // TODO: move this to custom class instead of AckedClusterStateUpdateTask
-            AckedClusterStateUpdateTask clusterTask = new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) {
+            CreateIndexTask clusterTask = new CreateIndexTask(request, listener, indexNameRef);
+            clusterService.submitStateUpdateTask("auto create [" + request.index() + "]", clusterTask, clusterTask, executor, clusterTask);
+        }
 
-                @Override
-                public ClusterState execute(ClusterState currentState) throws Exception {
-                    final SystemDataStreamDescriptor dataStreamDescriptor = systemIndices.validateDataStreamAccess(
-                        request.index(),
-                        threadPool.getThreadContext()
-                    );
-                    final boolean isSystemDataStream = dataStreamDescriptor != null;
-                    final boolean isSystemIndex = isSystemDataStream == false && systemIndices.isSystemIndex(request.index());
-                    final ComposableIndexTemplate template = resolveTemplate(request, currentState.metadata());
-                    final boolean isDataStream = isSystemIndex == false
-                        && (isSystemDataStream || (template != null && template.getDataStreamTemplate() != null));
-
-                    if (isDataStream) {
-                        // This expression only evaluates to true when the argument is non-null and false
-                        if (isSystemDataStream == false && Boolean.FALSE.equals(template.getAllowAutoCreate())) {
-                            throw new IndexNotFoundException(
-                                "composable template " + template.indexPatterns() + " forbids index auto creation"
-                            );
-                        }
+        @Override
+        protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterState state) {
+            return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.index());
+        }
 
-                        CreateDataStreamClusterStateUpdateRequest createRequest = new CreateDataStreamClusterStateUpdateRequest(
-                            request.index(),
-                            dataStreamDescriptor,
-                            request.masterNodeTimeout(),
-                            request.timeout(),
-                            false
+        // TODO: split the listner out of this task and use AckedClusterStateTaskListener directly to avoid the complicated listener
+        // construction upstream when instantiating these
+        private final class CreateIndexTask extends AckedClusterStateUpdateTask {
+
+            final CreateIndexRequest request;
+            final AtomicReference<String> indexNameRef;
+
+            CreateIndexTask(
+                CreateIndexRequest request,
+                ActionListener<AcknowledgedResponse> listener,
+                AtomicReference<String> indexNameRef
+            ) {
+                super(Priority.URGENT, request, listener);
+                this.request = request;
+                this.indexNameRef = indexNameRef;
+            }
+
+            @Override
+            public ClusterState execute(ClusterState currentState) throws Exception {
+                final SystemDataStreamDescriptor dataStreamDescriptor = systemIndices.validateDataStreamAccess(
+                    request.index(),
+                    threadPool.getThreadContext()
+                );
+                final boolean isSystemDataStream = dataStreamDescriptor != null;
+                final boolean isSystemIndex = isSystemDataStream == false && systemIndices.isSystemIndex(request.index());
+                final ComposableIndexTemplate template = resolveTemplate(request, currentState.metadata());
+                final boolean isDataStream = isSystemIndex == false
+                    && (isSystemDataStream || (template != null && template.getDataStreamTemplate() != null));
+
+                if (isDataStream) {
+                    // This expression only evaluates to true when the argument is non-null and false
+                    if (isSystemDataStream == false && Boolean.FALSE.equals(template.getAllowAutoCreate())) {
+                        throw new IndexNotFoundException(
+                            "composable template " + template.indexPatterns() + " forbids index auto creation"
                         );
-                        ClusterState clusterState = metadataCreateDataStreamService.createDataStream(createRequest, currentState);
-                        indexNameRef.set(clusterState.metadata().dataStreams().get(request.index()).getIndices().get(0).getName());
-                        return clusterState;
-                    } else {
-                        String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
-                        indexNameRef.set(indexName);
-                        if (isSystemIndex) {
-                            if (indexName.equals(request.index()) == false) {
-                                throw new IllegalStateException("system indices do not support date math expressions");
-                            }
-                        } else {
-                            // This will throw an exception if the index does not exist and creating it is prohibited
-                            final boolean shouldAutoCreate = autoCreateIndex.shouldAutoCreate(indexName, currentState);
+                    }
 
-                            if (shouldAutoCreate == false) {
-                                // The index already exists.
-                                return currentState;
-                            }
+                    CreateDataStreamClusterStateUpdateRequest createRequest = new CreateDataStreamClusterStateUpdateRequest(
+                        request.index(),
+                        dataStreamDescriptor,
+                        request.masterNodeTimeout(),
+                        request.timeout(),
+                        false
+                    );
+                    ClusterState clusterState = metadataCreateDataStreamService.createDataStream(createRequest, currentState);
+                    indexNameRef.set(clusterState.metadata().dataStreams().get(request.index()).getIndices().get(0).getName());
+                    return clusterState;
+                } else {
+                    String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
+                    indexNameRef.set(indexName);
+                    if (isSystemIndex) {
+                        if (indexName.equals(request.index()) == false) {
+                            throw new IllegalStateException("system indices do not support date math expressions");
                         }
+                    } else {
+                        // This will throw an exception if the index does not exist and creating it is prohibited
+                        final boolean shouldAutoCreate = autoCreateIndex.shouldAutoCreate(indexName, currentState);
 
-                        final SystemIndexDescriptor mainDescriptor = isSystemIndex ? systemIndices.findMatchingDescriptor(indexName) : null;
-                        final boolean isManagedSystemIndex = mainDescriptor != null && mainDescriptor.isAutomaticallyManaged();
-
-                        final CreateIndexClusterStateUpdateRequest updateRequest;
-
-                        if (isManagedSystemIndex) {
-                            final SystemIndexDescriptor descriptor = mainDescriptor.getDescriptorCompatibleWith(
-                                state.nodes().getSmallestNonClientNodeVersion()
-                            );
-                            if (descriptor == null) {
-                                final String message = mainDescriptor.getMinimumNodeVersionMessage("auto-create index");
-                                logger.warn(message);
-                                throw new IllegalStateException(message);
-                            }
-
-                            updateRequest = buildSystemIndexUpdateRequest(indexName, descriptor);
-                        } else if (isSystemIndex) {
-                            updateRequest = buildUpdateRequest(indexName);
-
-                            if (Objects.isNull(request.settings())) {
-                                updateRequest.settings(SystemIndexDescriptor.DEFAULT_SETTINGS);
-                            } else if (false == request.settings().hasValue(SETTING_INDEX_HIDDEN)) {
-                                updateRequest.settings(Settings.builder().put(request.settings()).put(SETTING_INDEX_HIDDEN, true).build());
-                            } else if ("false".equals(request.settings().get(SETTING_INDEX_HIDDEN))) {
-                                final String message = "Cannot auto-create system index ["
-                                    + indexName
-                                    + "] with [index.hidden] set to 'false'";
-                                logger.warn(message);
-                                throw new IllegalStateException(message);
-                            }
-                        } else {
-                            updateRequest = buildUpdateRequest(indexName);
+                        if (shouldAutoCreate == false) {
+                            // The index already exists.
+                            return currentState;
                         }
-
-                        return createIndexService.applyCreateIndexRequest(currentState, updateRequest, false);
                     }
-                }
 
-                private CreateIndexClusterStateUpdateRequest buildUpdateRequest(String indexName) {
-                    CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(
-                        request.cause(),
-                        indexName,
-                        request.index()
-                    ).ackTimeout(request.timeout()).performReroute(false).masterNodeTimeout(request.masterNodeTimeout());
-                    logger.debug("Auto-creating index {}", indexName);
-                    return updateRequest;
-                }
+                    final SystemIndexDescriptor mainDescriptor = isSystemIndex ? systemIndices.findMatchingDescriptor(indexName) : null;
+                    final boolean isManagedSystemIndex = mainDescriptor != null && mainDescriptor.isAutomaticallyManaged();
 
-                private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest(
-                    String indexName,
-                    SystemIndexDescriptor descriptor
-                ) {
-                    String mappings = descriptor.getMappings();
-                    Settings settings = descriptor.getSettings();
-                    String aliasName = descriptor.getAliasName();
+                    final CreateIndexClusterStateUpdateRequest updateRequest;
 
-                    // if we are writing to the alias name, we should create the primary index here
-                    String concreteIndexName = indexName.equals(aliasName) ? descriptor.getPrimaryIndex() : indexName;
-
-                    CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(
-                        request.cause(),
-                        concreteIndexName,
-                        request.index()
-                    ).ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()).performReroute(false);
-
-                    updateRequest.waitForActiveShards(ActiveShardCount.ALL);
-
-                    if (mappings != null) {
-                        updateRequest.mappings(mappings);
-                    }
-                    if (settings != null) {
-                        updateRequest.settings(settings);
-                    }
-                    if (aliasName != null) {
-                        updateRequest.aliases(Set.of(new Alias(aliasName).isHidden(true)));
-                    }
+                    if (isManagedSystemIndex) {
+                        final SystemIndexDescriptor descriptor = mainDescriptor.getDescriptorCompatibleWith(
+                            currentState.nodes().getSmallestNonClientNodeVersion()
+                        );
+                        if (descriptor == null) {
+                            final String message = mainDescriptor.getMinimumNodeVersionMessage("auto-create index");
+                            logger.warn(message);
+                            throw new IllegalStateException(message);
+                        }
 
-                    if (logger.isDebugEnabled()) {
-                        if (concreteIndexName.equals(indexName) == false) {
-                            logger.debug("Auto-creating backing system index {} for alias {}", concreteIndexName, indexName);
-                        } else {
-                            logger.debug("Auto-creating system index {}", concreteIndexName);
+                        updateRequest = buildSystemIndexUpdateRequest(indexName, descriptor);
+                    } else if (isSystemIndex) {
+                        updateRequest = buildUpdateRequest(indexName);
+
+                        if (Objects.isNull(request.settings())) {
+                            updateRequest.settings(SystemIndexDescriptor.DEFAULT_SETTINGS);
+                        } else if (false == request.settings().hasValue(SETTING_INDEX_HIDDEN)) {
+                            updateRequest.settings(Settings.builder().put(request.settings()).put(SETTING_INDEX_HIDDEN, true).build());
+                        } else if ("false".equals(request.settings().get(SETTING_INDEX_HIDDEN))) {
+                            final String message = "Cannot auto-create system index [" + indexName + "] with [index.hidden] set to 'false'";
+                            logger.warn(message);
+                            throw new IllegalStateException(message);
                         }
+                    } else {
+                        updateRequest = buildUpdateRequest(indexName);
                     }
 
-                    return updateRequest;
+                    return createIndexService.applyCreateIndexRequest(currentState, updateRequest, false);
+                }
+            }
+
+            private CreateIndexClusterStateUpdateRequest buildUpdateRequest(String indexName) {
+                CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(
+                    request.cause(),
+                    indexName,
+                    request.index()
+                ).ackTimeout(request.timeout()).performReroute(false).masterNodeTimeout(request.masterNodeTimeout());
+                logger.debug("Auto-creating index {}", indexName);
+                return updateRequest;
+            }
+
+            private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest(String indexName, SystemIndexDescriptor descriptor) {
+                String mappings = descriptor.getMappings();
+                Settings settings = descriptor.getSettings();
+                String aliasName = descriptor.getAliasName();
+
+                // if we are writing to the alias name, we should create the primary index here
+                String concreteIndexName = indexName.equals(aliasName) ? descriptor.getPrimaryIndex() : indexName;
+
+                CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(
+                    request.cause(),
+                    concreteIndexName,
+                    request.index()
+                ).ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()).performReroute(false);
+
+                updateRequest.waitForActiveShards(ActiveShardCount.ALL);
+
+                if (mappings != null) {
+                    updateRequest.mappings(mappings);
+                }
+                if (settings != null) {
+                    updateRequest.settings(settings);
+                }
+                if (aliasName != null) {
+                    updateRequest.aliases(Set.of(new Alias(aliasName).isHidden(true)));
                 }
-            };
-            clusterService.submitStateUpdateTask("auto create [" + request.index() + "]", clusterTask, clusterTask, executor, clusterTask);
-        }
 
-        @Override
-        protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterState state) {
-            return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.index());
+                if (logger.isDebugEnabled()) {
+                    if (concreteIndexName.equals(indexName) == false) {
+                        logger.debug("Auto-creating backing system index {} for alias {}", concreteIndexName, indexName);
+                    } else {
+                        logger.debug("Auto-creating system index {}", concreteIndexName);
+                    }
+                }
+
+                return updateRequest;
+            }
         }
     }