Browse Source

Ensure shard is mutable before proceeding with updates (#122392)


An update operations should make sure
that the shard is mutable before proceeding
further with a Get to retrieve the document
and then any of the create/update/delete/noop
operation.

Relates ES-10708
Tanguy Leroux 8 months ago
parent
commit
6ff8bedafd

+ 139 - 123
server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java

@@ -23,6 +23,7 @@ import org.elasticsearch.action.delete.DeleteResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.AutoCreateIndex;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.action.support.TransportActions;
 import org.elasticsearch.action.support.single.instance.TransportInstanceSingleOperationAction;
 import org.elasticsearch.client.internal.node.NodeClient;
@@ -96,7 +97,10 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
 
     @Override
     protected Executor executor(ShardId shardId) {
-        final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
+        return executor(indicesService.indexServiceSafe(shardId.getIndex()));
+    }
+
+    private Executor executor(IndexService indexService) {
         return threadPool.executor(indexService.getIndexSettings().getIndexMetadata().isSystem() ? Names.SYSTEM_WRITE : Names.WRITE);
     }
 
@@ -189,136 +193,148 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
         final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
         final IndexShard indexShard = indexService.getShard(shardId.getId());
         final MappingLookup mappingLookup = indexShard.mapperService().mappingLookup();
-        final UpdateHelper.Result result = deleteInferenceResults(
-            request,
-            updateHelper.prepare(request, indexShard, threadPool::absoluteTimeInMillis),
-            indexService.getMetadata(),
-            mappingLookup
-        );
-
-        switch (result.getResponseResult()) {
-            case CREATED -> {
-                IndexRequest upsertRequest = result.action();
-                // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
-                final BytesReference upsertSourceBytes = upsertRequest.source();
-                client.bulk(
-                    toSingleItemBulkRequest(upsertRequest),
-                    unwrappingSingleItemBulkResponse(ActionListener.<DocWriteResponse>wrap(response -> {
-                        UpdateResponse update = new UpdateResponse(
-                            response.getShardInfo(),
-                            response.getShardId(),
-                            response.getId(),
-                            response.getSeqNo(),
-                            response.getPrimaryTerm(),
-                            response.getVersion(),
-                            response.getResult()
-                        );
-                        if (request.fetchSource() != null && request.fetchSource().fetchSource()) {
-                            Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(
-                                upsertSourceBytes,
-                                true,
-                                upsertRequest.getContentType()
-                            );
-                            update.setGetResult(
-                                UpdateHelper.extractGetResult(
-                                    request,
-                                    request.concreteIndex(),
-                                    mappingLookup,
+
+        var executor = executor(indexService);
+        assert ThreadPool.assertCurrentThreadPool(Names.SYSTEM_WRITE, Names.WRITE);
+
+        SubscribableListener.newForked(indexShard::ensureMutable)
+        // Make sure to fork back to a `write` thread pool if necessary
+        .<UpdateHelper.Result>andThen(executor, threadPool.getThreadContext(), (l, unused) -> ActionListener.completeWith(l, () -> {
+            assert ThreadPool.assertCurrentThreadPool(Names.SYSTEM_WRITE, Names.WRITE);
+            return deleteInferenceResults(
+                request,
+                updateHelper.prepare(request, indexShard, threadPool::absoluteTimeInMillis), // Gets the doc using the engine
+                indexService.getMetadata(),
+                mappingLookup
+            );
+        }))
+            // Proceed with a single item bulk request
+            .<UpdateResponse>andThen((l, result) -> {
+                switch (result.getResponseResult()) {
+                    case CREATED -> {
+                        IndexRequest upsertRequest = result.action();
+                        // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
+                        final BytesReference upsertSourceBytes = upsertRequest.source();
+                        client.bulk(
+                            toSingleItemBulkRequest(upsertRequest),
+                            unwrappingSingleItemBulkResponse(ActionListener.<DocWriteResponse>wrap(response -> {
+                                UpdateResponse update = new UpdateResponse(
+                                    response.getShardInfo(),
+                                    response.getShardId(),
+                                    response.getId(),
                                     response.getSeqNo(),
                                     response.getPrimaryTerm(),
                                     response.getVersion(),
-                                    sourceAndContent.v2(),
-                                    sourceAndContent.v1(),
-                                    upsertSourceBytes
-                                )
-                            );
-                        } else {
-                            update.setGetResult(null);
-                        }
-                        update.setForcedRefresh(response.forcedRefresh());
-                        listener.onResponse(update);
-                    }, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount)))
-                );
-            }
-            case UPDATED -> {
-                IndexRequest indexRequest = result.action();
-                // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
-                final BytesReference indexSourceBytes = indexRequest.source();
-                client.bulk(
-                    toSingleItemBulkRequest(indexRequest),
-                    unwrappingSingleItemBulkResponse(ActionListener.<DocWriteResponse>wrap(response -> {
-                        UpdateResponse update = new UpdateResponse(
-                            response.getShardInfo(),
-                            response.getShardId(),
-                            response.getId(),
-                            response.getSeqNo(),
-                            response.getPrimaryTerm(),
-                            response.getVersion(),
-                            response.getResult()
-                        );
-                        update.setGetResult(
-                            UpdateHelper.extractGetResult(
-                                request,
-                                request.concreteIndex(),
-                                mappingLookup,
-                                response.getSeqNo(),
-                                response.getPrimaryTerm(),
-                                response.getVersion(),
-                                result.updatedSourceAsMap(),
-                                result.updateSourceContentType(),
-                                indexSourceBytes
-                            )
+                                    response.getResult()
+                                );
+                                if (request.fetchSource() != null && request.fetchSource().fetchSource()) {
+                                    Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(
+                                        upsertSourceBytes,
+                                        true,
+                                        upsertRequest.getContentType()
+                                    );
+                                    update.setGetResult(
+                                        UpdateHelper.extractGetResult(
+                                            request,
+                                            request.concreteIndex(),
+                                            mappingLookup,
+                                            response.getSeqNo(),
+                                            response.getPrimaryTerm(),
+                                            response.getVersion(),
+                                            sourceAndContent.v2(),
+                                            sourceAndContent.v1(),
+                                            upsertSourceBytes
+                                        )
+                                    );
+                                } else {
+                                    update.setGetResult(null);
+                                }
+                                update.setForcedRefresh(response.forcedRefresh());
+                                l.onResponse(update);
+                            }, exception -> handleUpdateFailureWithRetry(l, request, exception, retryCount)))
                         );
-                        update.setForcedRefresh(response.forcedRefresh());
-                        listener.onResponse(update);
-                    }, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount)))
-                );
-            }
-            case DELETED -> {
-                DeleteRequest deleteRequest = result.action();
-                client.bulk(
-                    toSingleItemBulkRequest(deleteRequest),
-                    unwrappingSingleItemBulkResponse(ActionListener.<DeleteResponse>wrap(response -> {
-                        UpdateResponse update = new UpdateResponse(
-                            response.getShardInfo(),
-                            response.getShardId(),
-                            response.getId(),
-                            response.getSeqNo(),
-                            response.getPrimaryTerm(),
-                            response.getVersion(),
-                            response.getResult()
+                    }
+                    case UPDATED -> {
+                        IndexRequest indexRequest = result.action();
+                        // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
+                        final BytesReference indexSourceBytes = indexRequest.source();
+                        client.bulk(
+                            toSingleItemBulkRequest(indexRequest),
+                            unwrappingSingleItemBulkResponse(ActionListener.<DocWriteResponse>wrap(response -> {
+                                UpdateResponse update = new UpdateResponse(
+                                    response.getShardInfo(),
+                                    response.getShardId(),
+                                    response.getId(),
+                                    response.getSeqNo(),
+                                    response.getPrimaryTerm(),
+                                    response.getVersion(),
+                                    response.getResult()
+                                );
+                                update.setGetResult(
+                                    UpdateHelper.extractGetResult(
+                                        request,
+                                        request.concreteIndex(),
+                                        mappingLookup,
+                                        response.getSeqNo(),
+                                        response.getPrimaryTerm(),
+                                        response.getVersion(),
+                                        result.updatedSourceAsMap(),
+                                        result.updateSourceContentType(),
+                                        indexSourceBytes
+                                    )
+                                );
+                                update.setForcedRefresh(response.forcedRefresh());
+                                l.onResponse(update);
+                            }, exception -> handleUpdateFailureWithRetry(l, request, exception, retryCount)))
                         );
-                        update.setGetResult(
-                            UpdateHelper.extractGetResult(
-                                request,
-                                request.concreteIndex(),
-                                mappingLookup,
-                                response.getSeqNo(),
-                                response.getPrimaryTerm(),
-                                response.getVersion(),
-                                result.updatedSourceAsMap(),
-                                result.updateSourceContentType(),
-                                null
-                            )
+                    }
+                    case DELETED -> {
+                        DeleteRequest deleteRequest = result.action();
+                        client.bulk(
+                            toSingleItemBulkRequest(deleteRequest),
+                            unwrappingSingleItemBulkResponse(ActionListener.<DeleteResponse>wrap(response -> {
+                                UpdateResponse update = new UpdateResponse(
+                                    response.getShardInfo(),
+                                    response.getShardId(),
+                                    response.getId(),
+                                    response.getSeqNo(),
+                                    response.getPrimaryTerm(),
+                                    response.getVersion(),
+                                    response.getResult()
+                                );
+                                update.setGetResult(
+                                    UpdateHelper.extractGetResult(
+                                        request,
+                                        request.concreteIndex(),
+                                        mappingLookup,
+                                        response.getSeqNo(),
+                                        response.getPrimaryTerm(),
+                                        response.getVersion(),
+                                        result.updatedSourceAsMap(),
+                                        result.updateSourceContentType(),
+                                        null
+                                    )
+                                );
+                                update.setForcedRefresh(response.forcedRefresh());
+                                l.onResponse(update);
+                            }, exception -> handleUpdateFailureWithRetry(l, request, exception, retryCount)))
                         );
-                        update.setForcedRefresh(response.forcedRefresh());
-                        listener.onResponse(update);
-                    }, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount)))
-                );
-            }
-            case NOOP -> {
-                UpdateResponse update = result.action();
-                IndexService indexServiceOrNull = indicesService.indexService(shardId.getIndex());
-                if (indexServiceOrNull != null) {
-                    IndexShard shard = indexService.getShardOrNull(shardId.getId());
-                    if (shard != null) {
-                        shard.noopUpdate();
                     }
+                    case NOOP -> {
+                        UpdateResponse update = result.action();
+                        IndexService indexServiceOrNull = indicesService.indexService(shardId.getIndex());
+                        if (indexServiceOrNull != null) {
+                            IndexShard shard = indexService.getShardOrNull(shardId.getId());
+                            if (shard != null) {
+                                shard.noopUpdate();
+                            }
+                        }
+                        l.onResponse(update);
+                    }
+                    default -> throw new IllegalStateException("Illegal result " + result.getResponseResult());
                 }
-                listener.onResponse(update);
-            }
-            default -> throw new IllegalStateException("Illegal result " + result.getResponseResult());
-        }
+            })
+            .addListener(listener);
     }
 
     private void handleUpdateFailureWithRetry(