浏览代码

Retry get_from_translog during relocations (#104579)

During a promotable relocation, a `get_from_translog` sent by the
unpromotable  shard to handle a real-time get might encounter
`ShardNotFoundException` or  `IndexNotFoundException`. In these cases,
we should retry.

This is just for `GET`. I'll open a second PR for `mGET`.  The relevant
IT is in the  Stateless PR.

Relates ES-5727
Pooya Salehi 1 年之前
父节点
当前提交
dbefb32bd7

+ 95 - 34
server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

@@ -8,17 +8,23 @@
 
 package org.elasticsearch.action.get;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.NoShardAvailableActionException;
+import org.elasticsearch.action.UnavailableShardsException;
 import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.replication.BasicReplicationRequest;
 import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateObserver;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.OperationRouting;
@@ -27,15 +33,17 @@ import org.elasticsearch.cluster.routing.ShardIterator;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.get.GetResult;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardNotFoundException;
 import org.elasticsearch.indices.ExecutorSelector;
 import org.elasticsearch.indices.IndicesService;
-import org.elasticsearch.logging.LogManager;
-import org.elasticsearch.logging.Logger;
+import org.elasticsearch.node.NodeClosedException;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
@@ -184,8 +192,8 @@ public class TransportGetAction extends TransportSingleShardAction<GetRequest, G
     private void handleGetOnUnpromotableShard(GetRequest request, IndexShard indexShard, ActionListener<GetResponse> listener)
         throws IOException {
         ShardId shardId = indexShard.shardId();
-        var node = getCurrentNodeOfPrimary(clusterService.state(), shardId);
         if (request.refresh()) {
+            var node = getCurrentNodeOfPrimary(clusterService.state(), shardId);
             logger.trace("send refresh action for shard {} to node {}", shardId, node.getId());
             var refreshRequest = new BasicReplicationRequest(shardId);
             refreshRequest.setParentTask(request.getParentTask());
@@ -194,44 +202,97 @@ public class TransportGetAction extends TransportSingleShardAction<GetRequest, G
                 refreshRequest,
                 listener.delegateFailureAndWrap((l, replicationResponse) -> super.asyncShardOperation(request, shardId, l))
             );
-        } else if (request.realtime()) {
-            TransportGetFromTranslogAction.Request getFromTranslogRequest = new TransportGetFromTranslogAction.Request(request, shardId);
-            getFromTranslogRequest.setParentTask(request.getParentTask());
-            transportService.sendRequest(
-                node,
-                TransportGetFromTranslogAction.NAME,
-                getFromTranslogRequest,
-                new ActionListenerResponseHandler<>(listener.delegateFailure((l, r) -> {
-                    if (r.getResult() != null) {
-                        logger.debug("received result for real-time get for id '{}' from promotable shard", request.id());
-                        l.onResponse(new GetResponse(r.getResult()));
-                    } else {
-                        logger.debug(
-                            "no result for real-time get for id '{}' from promotable shard (segment generation to wait for: {})",
-                            request.id(),
-                            r.segmentGeneration()
-                        );
-                        if (r.segmentGeneration() == -1) {
-                            // Nothing to wait for (no previous unsafe generation), just handle the Get locally.
-                            ActionRunnable.supply(l, () -> shardOperation(request, shardId)).run();
-                        } else {
-                            assert r.segmentGeneration() > -1L;
-                            assert r.primaryTerm() > Engine.UNKNOWN_PRIMARY_TERM;
-                            indexShard.waitForPrimaryTermAndGeneration(
-                                r.primaryTerm(),
-                                r.segmentGeneration(),
-                                listener.delegateFailureAndWrap((ll, aLong) -> super.asyncShardOperation(request, shardId, ll))
-                            );
-                        }
-                    }
-                }), TransportGetFromTranslogAction.Response::new, getExecutor(request, shardId))
+            return;
+        }
+        if (request.realtime()) {
+            final var state = clusterService.state();
+            final var observer = new ClusterStateObserver(
+                state,
+                clusterService,
+                TimeValue.timeValueSeconds(60),
+                logger,
+                threadPool.getThreadContext()
             );
+            getFromTranslog(request, indexShard, state, observer, listener);
         } else {
             // A non-real-time get with no explicit refresh requested.
             super.asyncShardOperation(request, shardId, listener);
         }
     }
 
+    private void getFromTranslog(
+        GetRequest request,
+        IndexShard indexShard,
+        ClusterState state,
+        ClusterStateObserver observer,
+        ActionListener<GetResponse> listener
+    ) {
+        tryGetFromTranslog(request, indexShard, state, listener.delegateResponse((l, e) -> {
+            final var cause = ExceptionsHelper.unwrapCause(e);
+            logger.debug("get_from_translog failed", cause);
+            if (cause instanceof ShardNotFoundException
+                || cause instanceof IndexNotFoundException
+                || cause instanceof NoShardAvailableActionException
+                || cause instanceof UnavailableShardsException) {
+                logger.debug("retrying get_from_translog");
+                observer.waitForNextChange(new ClusterStateObserver.Listener() {
+                    @Override
+                    public void onNewClusterState(ClusterState state) {
+                        getFromTranslog(request, indexShard, state, observer, l);
+                    }
+
+                    @Override
+                    public void onClusterServiceClose() {
+                        l.onFailure(new NodeClosedException(clusterService.localNode()));
+                    }
+
+                    @Override
+                    public void onTimeout(TimeValue timeout) {
+                        l.onFailure(new ElasticsearchException("Timed out retrying get_from_translog", cause));
+                    }
+                });
+            } else {
+                l.onFailure(e);
+            }
+        }));
+    }
+
+    private void tryGetFromTranslog(GetRequest request, IndexShard indexShard, ClusterState state, ActionListener<GetResponse> listener) {
+        ShardId shardId = indexShard.shardId();
+        var node = getCurrentNodeOfPrimary(state, shardId);
+        TransportGetFromTranslogAction.Request getFromTranslogRequest = new TransportGetFromTranslogAction.Request(request, shardId);
+        getFromTranslogRequest.setParentTask(request.getParentTask());
+        transportService.sendRequest(
+            node,
+            TransportGetFromTranslogAction.NAME,
+            getFromTranslogRequest,
+            new ActionListenerResponseHandler<>(listener.delegateFailure((l, r) -> {
+                if (r.getResult() != null) {
+                    logger.debug("received result for real-time get for id '{}' from promotable shard", request.id());
+                    l.onResponse(new GetResponse(r.getResult()));
+                } else {
+                    logger.debug(
+                        "no result for real-time get for id '{}' from promotable shard (segment generation to wait for: {})",
+                        request.id(),
+                        r.segmentGeneration()
+                    );
+                    if (r.segmentGeneration() == -1) {
+                        // Nothing to wait for (no previous unsafe generation), just handle the Get locally.
+                        ActionRunnable.supply(l, () -> shardOperation(request, shardId)).run();
+                    } else {
+                        assert r.segmentGeneration() > -1L;
+                        assert r.primaryTerm() > Engine.UNKNOWN_PRIMARY_TERM;
+                        indexShard.waitForPrimaryTermAndGeneration(
+                            r.primaryTerm(),
+                            r.segmentGeneration(),
+                            listener.delegateFailureAndWrap((ll, aLong) -> super.asyncShardOperation(request, shardId, ll))
+                        );
+                    }
+                }
+            }), TransportGetFromTranslogAction.Response::new, getExecutor(request, shardId))
+        );
+    }
+
     static DiscoveryNode getCurrentNodeOfPrimary(ClusterState clusterState, ShardId shardId) {
         var shardRoutingTable = clusterState.routingTable().shardRoutingTable(shardId);
         if (shardRoutingTable.primaryShard() == null || shardRoutingTable.primaryShard().active() == false) {

+ 0 - 1
server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java

@@ -40,7 +40,6 @@ import org.elasticsearch.transport.TransportService;
 import java.io.IOException;
 import java.util.Objects;
 
-// TODO(ES-5727): add a retry mechanism to TransportGetFromTranslogAction
 public class TransportGetFromTranslogAction extends HandledTransportAction<
     TransportGetFromTranslogAction.Request,
     TransportGetFromTranslogAction.Response> {