Browse Source

Batch Index Settings Update Requests (#82896)

Joe Gallo 3 years ago
parent
commit
4bf8aecab6

+ 6 - 0
docs/changelog/82896.yaml

@@ -0,0 +1,6 @@
+pr: 82896
+summary: Batch Index Settings Update Requests
+area: Cluster Coordination
+type: enhancement
+issues:
+ - 79866

+ 146 - 130
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java

@@ -17,6 +17,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateTaskExecutor;
+import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult;
 import org.elasticsearch.cluster.block.ClusterBlock;
 import org.elasticsearch.cluster.block.ClusterBlocks;
 import org.elasticsearch.cluster.routing.RoutingTable;
@@ -50,13 +51,12 @@ public class MetadataUpdateSettingsService {
     private static final Logger logger = LogManager.getLogger(MetadataUpdateSettingsService.class);
 
     private final ClusterService clusterService;
-
     private final AllocationService allocationService;
-
     private final IndexScopedSettings indexScopedSettings;
     private final IndicesService indicesService;
     private final ShardLimitValidator shardLimitValidator;
     private final ThreadPool threadPool;
+    private final ClusterStateTaskExecutor<AckedClusterStateUpdateTask> executor;
 
     public MetadataUpdateSettingsService(
         ClusterService clusterService,
@@ -67,11 +67,28 @@ public class MetadataUpdateSettingsService {
         ThreadPool threadPool
     ) {
         this.clusterService = clusterService;
-        this.threadPool = threadPool;
         this.allocationService = allocationService;
         this.indexScopedSettings = indexScopedSettings;
         this.indicesService = indicesService;
         this.shardLimitValidator = shardLimitValidator;
+        this.threadPool = threadPool;
+        this.executor = (currentState, tasks) -> {
+            ClusterTasksResult.Builder<AckedClusterStateUpdateTask> builder = ClusterTasksResult.builder();
+            ClusterState state = currentState;
+            for (AckedClusterStateUpdateTask task : tasks) {
+                try {
+                    state = task.execute(state);
+                    builder.success(task);
+                } catch (Exception e) {
+                    builder.failure(task, e);
+                }
+            }
+            if (state != currentState) {
+                // reroute in case things change that require it (like number of replicas)
+                state = allocationService.reroute(state, "settings update");
+            }
+            return builder.build(state);
+        };
     }
 
     public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ActionListener<AcknowledgedResponse> listener) {
@@ -105,149 +122,149 @@ public class MetadataUpdateSettingsService {
         final Settings openSettings = settingsForOpenIndices.build();
         final boolean preserveExisting = request.isPreserveExisting();
 
-        clusterService.submitStateUpdateTask(
-            "update-settings " + Arrays.toString(request.indices()),
-            new AckedClusterStateUpdateTask(Priority.URGENT, request, wrapPreservingContext(listener, threadPool.getThreadContext())) {
+        // TODO: move this to custom class instead of AckedClusterStateUpdateTask
+        AckedClusterStateUpdateTask clusterTask = new AckedClusterStateUpdateTask(
+            Priority.URGENT,
+            request,
+            wrapPreservingContext(listener, threadPool.getThreadContext())
+        ) {
+            @Override
+            public ClusterState execute(ClusterState currentState) {
+                RoutingTable.Builder routingTableBuilder = null;
+                Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata());
 
-                @Override
-                public ClusterState execute(ClusterState currentState) {
-
-                    RoutingTable.Builder routingTableBuilder = null;
-                    Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata());
-
-                    // allow to change any settings to a close index, and only allow dynamic settings to be changed
-                    // on an open index
-                    Set<Index> openIndices = new HashSet<>();
-                    Set<Index> closeIndices = new HashSet<>();
-                    final String[] actualIndices = new String[request.indices().length];
-                    for (int i = 0; i < request.indices().length; i++) {
-                        Index index = request.indices()[i];
-                        actualIndices[i] = index.getName();
-                        final IndexMetadata metadata = currentState.metadata().getIndexSafe(index);
-                        if (metadata.getState() == IndexMetadata.State.OPEN) {
-                            openIndices.add(index);
-                        } else {
-                            closeIndices.add(index);
-                        }
+                // allow to change any settings to a closed index, and only allow dynamic settings to be changed
+                // on an open index
+                Set<Index> openIndices = new HashSet<>();
+                Set<Index> closedIndices = new HashSet<>();
+                final String[] actualIndices = new String[request.indices().length];
+                for (int i = 0; i < request.indices().length; i++) {
+                    Index index = request.indices()[i];
+                    actualIndices[i] = index.getName();
+                    final IndexMetadata metadata = currentState.metadata().getIndexSafe(index);
+                    if (metadata.getState() == IndexMetadata.State.OPEN) {
+                        openIndices.add(index);
+                    } else {
+                        closedIndices.add(index);
                     }
+                }
 
-                    if (skippedSettings.isEmpty() == false && openIndices.isEmpty() == false) {
-                        throw new IllegalArgumentException(
-                            String.format(
-                                Locale.ROOT,
-                                "Can't update non dynamic settings [%s] for open indices %s",
-                                skippedSettings,
-                                openIndices
-                            )
-                        );
-                    }
+                if (skippedSettings.isEmpty() == false && openIndices.isEmpty() == false) {
+                    throw new IllegalArgumentException(
+                        String.format(
+                            Locale.ROOT,
+                            "Can't update non dynamic settings [%s] for open indices %s",
+                            skippedSettings,
+                            openIndices
+                        )
+                    );
+                }
 
-                    if (IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.exists(openSettings)) {
-                        final int updatedNumberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(openSettings);
-                        if (preserveExisting == false) {
-                            // Verify that this won't take us over the cluster shard limit.
-                            shardLimitValidator.validateShardLimitOnReplicaUpdate(currentState, request.indices(), updatedNumberOfReplicas);
+                if (IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.exists(openSettings)) {
+                    final int updatedNumberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(openSettings);
+                    if (preserveExisting == false) {
+                        // Verify that this won't take us over the cluster shard limit.
+                        shardLimitValidator.validateShardLimitOnReplicaUpdate(currentState, request.indices(), updatedNumberOfReplicas);
 
-                            /*
-                             * We do not update the in-sync allocation IDs as they will be removed upon the first index operation
-                             * which makes these copies stale.
-                             *
-                             * TODO: should we update the in-sync allocation IDs once the data is deleted by the node?
-                             */
-                            routingTableBuilder = RoutingTable.builder(currentState.routingTable());
-                            routingTableBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
-                            metadataBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
-                            logger.info("updating number_of_replicas to [{}] for indices {}", updatedNumberOfReplicas, actualIndices);
-                        }
+                        /*
+                         * We do not update the in-sync allocation IDs as they will be removed upon the first index operation
+                         * which makes these copies stale.
+                         *
+                         * TODO: should we update the in-sync allocation IDs once the data is deleted by the node?
+                         */
+                        routingTableBuilder = RoutingTable.builder(currentState.routingTable());
+                        routingTableBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
+                        metadataBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
+                        logger.info("updating number_of_replicas to [{}] for indices {}", updatedNumberOfReplicas, actualIndices);
                     }
+                }
 
-                    updateIndexSettings(
-                        openIndices,
-                        metadataBuilder,
-                        (index, indexSettings) -> indexScopedSettings.updateDynamicSettings(
-                            openSettings,
-                            indexSettings,
-                            Settings.builder(),
-                            index.getName()
-                        ),
-                        preserveExisting,
-                        indexScopedSettings
-                    );
+                updateIndexSettings(
+                    openIndices,
+                    metadataBuilder,
+                    (index, indexSettings) -> indexScopedSettings.updateDynamicSettings(
+                        openSettings,
+                        indexSettings,
+                        Settings.builder(),
+                        index.getName()
+                    ),
+                    preserveExisting,
+                    indexScopedSettings
+                );
 
-                    updateIndexSettings(
-                        closeIndices,
-                        metadataBuilder,
-                        (index, indexSettings) -> indexScopedSettings.updateSettings(
-                            closedSettings,
-                            indexSettings,
-                            Settings.builder(),
-                            index.getName()
-                        ),
-                        preserveExisting,
-                        indexScopedSettings
-                    );
+                updateIndexSettings(
+                    closedIndices,
+                    metadataBuilder,
+                    (index, indexSettings) -> indexScopedSettings.updateSettings(
+                        closedSettings,
+                        indexSettings,
+                        Settings.builder(),
+                        index.getName()
+                    ),
+                    preserveExisting,
+                    indexScopedSettings
+                );
 
-                    if (IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.exists(normalizedSettings)
-                        || IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.exists(normalizedSettings)) {
-                        for (String index : actualIndices) {
-                            final Settings settings = metadataBuilder.get(index).getSettings();
-                            MetadataCreateIndexService.validateTranslogRetentionSettings(settings);
-                            MetadataCreateIndexService.validateStoreTypeSetting(settings);
-                        }
+                if (IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.exists(normalizedSettings)
+                    || IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.exists(normalizedSettings)) {
+                    for (String index : actualIndices) {
+                        final Settings settings = metadataBuilder.get(index).getSettings();
+                        MetadataCreateIndexService.validateTranslogRetentionSettings(settings);
+                        MetadataCreateIndexService.validateStoreTypeSetting(settings);
                     }
-                    boolean changed = false;
-                    // increment settings versions
-                    for (final String index : actualIndices) {
-                        if (same(currentState.metadata().index(index).getSettings(), metadataBuilder.get(index).getSettings()) == false) {
-                            changed = true;
-                            final IndexMetadata.Builder builder = IndexMetadata.builder(metadataBuilder.get(index));
-                            builder.settingsVersion(1 + builder.settingsVersion());
-                            metadataBuilder.put(builder);
-                        }
+                }
+                boolean changed = false;
+                // increment settings versions
+                for (final String index : actualIndices) {
+                    if (same(currentState.metadata().index(index).getSettings(), metadataBuilder.get(index).getSettings()) == false) {
+                        changed = true;
+                        final IndexMetadata.Builder builder = IndexMetadata.builder(metadataBuilder.get(index));
+                        builder.settingsVersion(1 + builder.settingsVersion());
+                        metadataBuilder.put(builder);
                     }
+                }
 
-                    final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
-                    boolean changedBlocks = false;
-                    for (IndexMetadata.APIBlock block : IndexMetadata.APIBlock.values()) {
-                        changedBlocks |= maybeUpdateClusterBlock(actualIndices, blocks, block.block, block.setting, openSettings);
-                    }
-                    changed |= changedBlocks;
+                final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
+                boolean changedBlocks = false;
+                for (IndexMetadata.APIBlock block : IndexMetadata.APIBlock.values()) {
+                    changedBlocks |= maybeUpdateClusterBlock(actualIndices, blocks, block.block, block.setting, openSettings);
+                }
+                changed |= changedBlocks;
 
-                    if (changed == false) {
-                        return currentState;
-                    }
+                if (changed == false) {
+                    return currentState;
+                }
 
-                    ClusterState updatedState = ClusterState.builder(currentState)
-                        .metadata(metadataBuilder)
-                        .routingTable(routingTableBuilder == null ? currentState.routingTable() : routingTableBuilder.build())
-                        .blocks(changedBlocks ? blocks.build() : currentState.blocks())
-                        .build();
+                ClusterState updatedState = ClusterState.builder(currentState)
+                    .metadata(metadataBuilder)
+                    .routingTable(routingTableBuilder == null ? currentState.routingTable() : routingTableBuilder.build())
+                    .blocks(changedBlocks ? blocks.build() : currentState.blocks())
+                    .build();
 
-                    // now, reroute in case things change that require it (like number of replicas)
-                    updatedState = allocationService.reroute(updatedState, "settings update");
-                    try {
-                        for (Index index : openIndices) {
-                            final IndexMetadata currentMetadata = currentState.getMetadata().getIndexSafe(index);
-                            final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index);
-                            indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata);
-                        }
-                        for (Index index : closeIndices) {
-                            final IndexMetadata currentMetadata = currentState.getMetadata().getIndexSafe(index);
-                            final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index);
-                            // Verifies that the current index settings can be updated with the updated dynamic settings.
-                            indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata);
-                            // Now check that we can create the index with the updated settings (dynamic and non-dynamic).
-                            // This step is mandatory since we allow to update non-dynamic settings on closed indices.
-                            indicesService.verifyIndexMetadata(updatedMetadata, updatedMetadata);
-                        }
-                    } catch (IOException ex) {
-                        throw ExceptionsHelper.convertToElastic(ex);
+                try {
+                    for (Index index : openIndices) {
+                        final IndexMetadata currentMetadata = currentState.metadata().getIndexSafe(index);
+                        final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index);
+                        indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata);
                     }
-                    return updatedState;
+                    for (Index index : closedIndices) {
+                        final IndexMetadata currentMetadata = currentState.metadata().getIndexSafe(index);
+                        final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index);
+                        // Verifies that the current index settings can be updated with the updated dynamic settings.
+                        indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata);
+                        // Now check that we can create the index with the updated settings (dynamic and non-dynamic).
+                        // This step is mandatory since we allow to update non-dynamic settings on closed indices.
+                        indicesService.verifyIndexMetadata(updatedMetadata, updatedMetadata);
+                    }
+                } catch (IOException ex) {
+                    throw ExceptionsHelper.convertToElastic(ex);
                 }
-            },
-            ClusterStateTaskExecutor.unbatched()
-        );
+
+                return updatedState;
+            }
+        };
+
+        clusterService.submitStateUpdateTask("update-settings " + Arrays.toString(request.indices()), clusterTask, this.executor);
     }
 
     public static void updateIndexSettings(
@@ -256,7 +273,6 @@ public class MetadataUpdateSettingsService {
         BiFunction<Index, Settings.Builder, Boolean> settingUpdater,
         Boolean preserveExisting,
         IndexScopedSettings indexScopedSettings
-
     ) {
         for (Index index : indices) {
             IndexMetadata indexMetadata = metadataBuilder.getSafe(index);

+ 15 - 5
server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java

@@ -38,6 +38,7 @@ import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult;
+import org.elasticsearch.cluster.ClusterStateTaskExecutor.TaskResult;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.EmptyClusterInfoService;
 import org.elasticsearch.cluster.action.shard.ShardStateAction;
@@ -440,7 +441,7 @@ public class ClusterStateChanges {
     private <T> ClusterState runTasks(ClusterStateTaskExecutor<T> executor, ClusterState clusterState, List<T> entries) {
         try {
             ClusterTasksResult<T> result = executor.execute(clusterState, entries);
-            for (ClusterStateTaskExecutor.TaskResult taskResult : result.executionResults().values()) {
+            for (TaskResult taskResult : result.executionResults().values()) {
                 if (taskResult.isSuccess() == false) {
                     throw taskResult.getFailure();
                 }
@@ -465,16 +466,25 @@ public class ClusterStateChanges {
         });
     }
 
+    @SuppressWarnings("unchecked")
     private ClusterState executeClusterStateUpdateTask(ClusterState state, Runnable runnable) {
-        ClusterState[] result = new ClusterState[1];
+        ClusterState[] resultingState = new ClusterState[1];
         doAnswer(invocationOnMock -> {
             ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocationOnMock.getArguments()[1];
-            result[0] = task.execute(state);
+            ClusterStateTaskExecutor<ClusterStateUpdateTask> executor = (ClusterStateTaskExecutor<ClusterStateUpdateTask>) invocationOnMock
+                .getArguments()[2];
+            ClusterTasksResult<ClusterStateUpdateTask> result = executor.execute(state, List.of(task));
+            for (TaskResult taskResult : result.executionResults().values()) {
+                if (taskResult.isSuccess() == false) {
+                    throw taskResult.getFailure();
+                }
+            }
+            resultingState[0] = result.resultingState();
             return null;
         }).when(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class), any());
         runnable.run();
-        assertThat(result[0], notNullValue());
-        return result[0];
+        assertThat(resultingState[0], notNullValue());
+        return resultingState[0];
     }
 
     private ActionListener<TransportResponse.Empty> createTestListener() {