Browse Source

Prepare ShardFollowNodeTask to bootstrap when it fall behind leader shard (#37562)

* Changed `LuceneSnapshot` to throw an `OperationsMissingException` if the requested ops are missing.
* Changed the shard changes api to handle the `OperationsMissingException` and wrap the exception into `ResourceNotFound` exception and include metadata to indicate the requested range can no longer be retrieved.
* Changed `ShardFollowNodeTask` to handle this `ResourceNotFound` exception with the included metdata header.

Relates to #35975
Martijn van Groningen 6 years ago
parent
commit
4e1a779773

+ 2 - 2
server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java

@@ -146,13 +146,13 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
     private void rangeCheck(Translog.Operation op) {
         if (op == null) {
             if (lastSeenSeqNo < toSeqNo) {
-                throw new IllegalStateException("Not all operations between from_seqno [" + fromSeqNo + "] " +
+                throw new MissingHistoryOperationsException("Not all operations between from_seqno [" + fromSeqNo + "] " +
                     "and to_seqno [" + toSeqNo + "] found; prematurely terminated last_seen_seqno [" + lastSeenSeqNo + "]");
             }
         } else {
             final long expectedSeqNo = lastSeenSeqNo + 1;
             if (op.seqNo() != expectedSeqNo) {
-                throw new IllegalStateException("Not all operations between from_seqno [" + fromSeqNo + "] " +
+                throw new MissingHistoryOperationsException("Not all operations between from_seqno [" + fromSeqNo + "] " +
                     "and to_seqno [" + toSeqNo + "] found; expected seqno [" + expectedSeqNo + "]; found [" + op + "]");
             }
         }

+ 31 - 0
server/src/main/java/org/elasticsearch/index/engine/MissingHistoryOperationsException.java

@@ -0,0 +1,31 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.engine;
+
+/**
+ * Exception indicating that not all requested operations from {@link LuceneChangesSnapshot}
+ * are available.
+ */
+public final class MissingHistoryOperationsException extends IllegalStateException {
+
+    MissingHistoryOperationsException(String message) {
+        super(message);
+    }
+}

+ 2 - 0
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

@@ -118,6 +118,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
     public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY = "leader_index_name";
     public static final String CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY = "remote_cluster_name";
 
+    public static final String REQUESTED_OPS_MISSING_METADATA_KEY = "es.requested_operations_missing";
+
     private final boolean enabled;
     private final Settings settings;
     private final CcrLicenseChecker ccrLicenseChecker;

+ 11 - 18
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

@@ -6,8 +6,7 @@
 package org.elasticsearch.xpack.ccr.action;
 
 import org.apache.logging.log4j.message.ParameterizedMessage;
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.Action;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequestValidationException;
@@ -27,6 +26,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.engine.MissingHistoryOperationsException;
 import org.elasticsearch.index.seqno.SeqNoStats;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardNotStartedException;
@@ -34,9 +34,9 @@ import org.elasticsearch.index.shard.IndexShardState;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.indices.IndicesService;
-import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.ccr.Ccr;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -392,21 +392,6 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
             }
         }
 
-        @Override
-        protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
-            ActionListener<Response> wrappedListener = ActionListener.wrap(listener::onResponse, e -> {
-                Throwable cause = ExceptionsHelper.unwrapCause(e);
-                if (cause instanceof IllegalStateException && cause.getMessage().contains("Not all operations between from_seqno [")) {
-                    String message = "Operations are no longer available for replicating. Maybe increase the retention setting [" +
-                        IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey() + "]?";
-                    listener.onFailure(new ElasticsearchException(message, e));
-                } else {
-                    listener.onFailure(e);
-                }
-            });
-            super.doExecute(task, request, wrappedListener);
-        }
-
         private void globalCheckpointAdvanced(
                 final ShardId shardId,
                 final long globalCheckpoint,
@@ -525,6 +510,14 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
                     break;
                 }
             }
+        } catch (MissingHistoryOperationsException e) {
+            String message = "Operations are no longer available for replicating. Maybe increase the retention setting [" +
+                IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey() + "]?";
+            // Make it easy to detect this error in ShardFollowNodeTask:
+            // (adding a metadata header instead of introducing a new exception that extends ElasticsearchException)
+            ResourceNotFoundException wrapper = new ResourceNotFoundException(message, e);
+            wrapper.addMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY, Long.toString(fromSeqNo), Long.toString(toSeqNo));
+            throw wrapper;
         }
         return operations.toArray(EMPTY_OPERATIONS_ARRAY);
     }

+ 22 - 0
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

@@ -13,6 +13,7 @@ import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchSecurityException;
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.NoShardAvailableActionException;
 import org.elasticsearch.action.UnavailableShardsException;
 import org.elasticsearch.cluster.block.ClusterBlockException;
@@ -30,6 +31,7 @@ import org.elasticsearch.node.NodeClosedException;
 import org.elasticsearch.persistent.AllocatedPersistentTask;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.transport.ConnectTransportException;
+import org.elasticsearch.xpack.ccr.Ccr;
 import org.elasticsearch.transport.NoSuchRemoteClusterException;
 import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
 import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
@@ -275,6 +277,14 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
                         failedReadRequests++;
                         fetchExceptions.put(from, Tuple.tuple(retryCounter, ExceptionsHelper.convertToElastic(e)));
                     }
+                    Throwable cause = ExceptionsHelper.unwrapCause(e);
+                    if (cause instanceof ResourceNotFoundException) {
+                        ResourceNotFoundException resourceNotFoundException = (ResourceNotFoundException) cause;
+                        if (resourceNotFoundException.getMetadataKeys().contains(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY)) {
+                            handleFallenBehindLeaderShard(e, from, maxOperationCount, maxRequiredSeqNo, retryCounter);
+                            return;
+                        }
+                    }
                     handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter));
                 });
     }
@@ -291,6 +301,18 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
         maybeUpdateSettings(response.getSettingsVersion(), updateMappingsTask);
     }
 
+    void handleFallenBehindLeaderShard(Exception e, long from, int maxOperationCount, long maxRequiredSeqNo, AtomicInteger retryCounter) {
+        // Do restore from repository here and after that
+        // start() should be invoked and stats should be reset
+
+        // For now handle like any other failure:
+        // need a more robust approach to avoid the scenario where an outstanding request
+        // can trigger another restore while the shard was restored already.
+        // https://github.com/elastic/elasticsearch/pull/37562#discussion_r250009367
+
+        handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter));
+    }
+
     /** Called when some operations are fetched from the leading */
     protected void onOperationsFetched(Translog.Operation[] operations) {
 

+ 37 - 4
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java

@@ -6,6 +6,9 @@
 package org.elasticsearch.xpack.ccr.action;
 
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.LatchedActionListener;
 import org.elasticsearch.action.admin.indices.flush.FlushRequest;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
@@ -16,12 +19,17 @@ import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESSingleNodeTestCase;
+import org.elasticsearch.xpack.ccr.Ccr;
 import org.elasticsearch.xpack.ccr.LocalStateCcr;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
 
 public class ShardChangesTests extends ESSingleNodeTestCase {
 
@@ -88,7 +96,7 @@ public class ShardChangesTests extends ESSingleNodeTestCase {
         assertThat(operation.id(), equalTo("5"));
     }
 
-    public void testMissingOperations() {
+    public void testMissingOperations() throws Exception {
         client().admin().indices().prepareCreate("index")
             .setSettings(Settings.builder()
                 .put("index.soft_deletes.enabled", true)
@@ -113,9 +121,34 @@ public class ShardChangesTests extends ESSingleNodeTestCase {
         request.setFromSeqNo(0L);
         request.setMaxOperationCount(1);
 
-        Exception e = expectThrows(ElasticsearchException.class, () -> client().execute(ShardChangesAction.INSTANCE, request).actionGet());
-        assertThat(e.getMessage(), equalTo("Operations are no longer available for replicating. Maybe increase the retention setting " +
-            "[index.soft_deletes.retention.operations]?"));
+        {
+            ResourceNotFoundException e =
+                expectThrows(ResourceNotFoundException.class, () -> client().execute(ShardChangesAction.INSTANCE, request).actionGet());
+            assertThat(e.getMessage(), equalTo("Operations are no longer available for replicating. Maybe increase the retention setting " +
+                "[index.soft_deletes.retention.operations]?"));
+
+            assertThat(e.getMetadataKeys().size(), equalTo(1));
+            assertThat(e.getMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY), notNullValue());
+            assertThat(e.getMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY), contains("0", "0"));
+        }
+        {
+            AtomicReference<Exception> holder = new AtomicReference<>();
+            CountDownLatch latch = new CountDownLatch(1);
+            client().execute(ShardChangesAction.INSTANCE, request,
+                new LatchedActionListener<>(ActionListener.wrap(r -> fail("expected an exception"), holder::set), latch));
+            latch.await();
+
+            ElasticsearchException e = (ElasticsearchException) holder.get();
+            assertThat(e, notNullValue());
+            assertThat(e.getMetadataKeys().size(), equalTo(0));
+
+            ResourceNotFoundException cause = (ResourceNotFoundException) e.getCause();
+            assertThat(cause.getMessage(), equalTo("Operations are no longer available for replicating. " +
+                "Maybe increase the retention setting [index.soft_deletes.retention.operations]?"));
+            assertThat(cause.getMetadataKeys().size(), equalTo(1));
+            assertThat(cause.getMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY), notNullValue());
+            assertThat(cause.getMetadata(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY), contains("0", "0"));
+        }
     }
 
 }