Browse Source

Fix stalled send translog ops request (#57859)

Currently, the translog ops request is reentrent when there is a mapping
update. The impact of this is that a translog ops ends up waiting on the
pre-existing listener and it is never completed. This commit fixes this
by introducing a new code path to avoid the idempotency logic.
Tim Brooks 5 years ago
parent
commit
db584c07a2

+ 57 - 48
server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

@@ -348,59 +348,68 @@ public class PeerRecoveryTargetService implements IndexEventListener {
                     return;
                 }
 
-                final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
-                final Consumer<Exception> retryOnMappingException = exception -> {
-                    // in very rare cases a translog replay from primary is processed before a mapping update on this node
-                    // which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node.
-                    logger.debug("delaying recovery due to missing mapping changes", exception);
-                    // we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be
-                    // canceled)
-                    observer.waitForNextChange(new ClusterStateObserver.Listener() {
-                        @Override
-                        public void onNewClusterState(ClusterState state) {
-                            try {
-                                messageReceived(request, channel, task);
-                            } catch (Exception e) {
-                                listener.onFailure(e);
+                performTranslogOps(request, listener, recoveryRef);
+            }
+        }
+
+        private void performTranslogOps(final RecoveryTranslogOperationsRequest request, final ActionListener<Void> listener,
+                                        final RecoveryRef recoveryRef) {
+            final RecoveryTarget recoveryTarget = recoveryRef.target();
+
+            final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
+            final Consumer<Exception> retryOnMappingException = exception -> {
+                // in very rare cases a translog replay from primary is processed before a mapping update on this node
+                // which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node.
+                logger.debug("delaying recovery due to missing mapping changes", exception);
+                // we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be
+                // canceled)
+                observer.waitForNextChange(new ClusterStateObserver.Listener() {
+                    @Override
+                    public void onNewClusterState(ClusterState state) {
+                        try {
+                            try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
+                                performTranslogOps(request, listener, recoveryRef);
                             }
+                        } catch (Exception e) {
+                            listener.onFailure(e);
                         }
+                    }
 
-                        @Override
-                        public void onClusterServiceClose() {
-                            listener.onFailure(new ElasticsearchException(
-                                "cluster service was closed while waiting for mapping updates"));
-                        }
+                    @Override
+                    public void onClusterServiceClose() {
+                        listener.onFailure(new ElasticsearchException(
+                            "cluster service was closed while waiting for mapping updates"));
+                    }
 
-                        @Override
-                        public void onTimeout(TimeValue timeout) {
-                            // note that we do not use a timeout (see comment above)
-                            listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates " +
-                                "(timeout [" + timeout + "])"));
+                    @Override
+                    public void onTimeout(TimeValue timeout) {
+                        // note that we do not use a timeout (see comment above)
+                        listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates " +
+                            "(timeout [" + timeout + "])"));
+                    }
+                });
+            };
+            final IndexMetadata indexMetadata = clusterService.state().metadata().index(request.shardId().getIndex());
+            final long mappingVersionOnTarget = indexMetadata != null ? indexMetadata.getMappingVersion() : 0L;
+            recoveryTarget.indexTranslogOperations(
+                request.operations(),
+                request.totalTranslogOps(),
+                request.maxSeenAutoIdTimestampOnPrimary(),
+                request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
+                request.retentionLeases(),
+                request.mappingVersionOnPrimary(),
+                ActionListener.wrap(
+                    checkpoint -> listener.onResponse(null),
+                    e -> {
+                        // do not retry if the mapping on replica is at least as recent as the mapping
+                        // that the primary used to index the operations in the request.
+                        if (mappingVersionOnTarget < request.mappingVersionOnPrimary() && e instanceof MapperException) {
+                            retryOnMappingException.accept(e);
+                        } else {
+                            listener.onFailure(e);
                         }
-                    });
-                };
-                final IndexMetadata indexMetadata = clusterService.state().metadata().index(request.shardId().getIndex());
-                final long mappingVersionOnTarget = indexMetadata != null ? indexMetadata.getMappingVersion() : 0L;
-                recoveryTarget.indexTranslogOperations(
-                        request.operations(),
-                        request.totalTranslogOps(),
-                        request.maxSeenAutoIdTimestampOnPrimary(),
-                        request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
-                        request.retentionLeases(),
-                        request.mappingVersionOnPrimary(),
-                        ActionListener.wrap(
-                                checkpoint -> listener.onResponse(null),
-                                e -> {
-                                    // do not retry if the mapping on replica is at least as recent as the mapping
-                                    // that the primary used to index the operations in the request.
-                                    if (mappingVersionOnTarget < request.mappingVersionOnPrimary() && e instanceof MapperException) {
-                                        retryOnMappingException.accept(e);
-                                    } else {
-                                        listener.onFailure(e);
-                                    }
-                                })
-                );
-            }
+                    })
+            );
         }
     }