Răsfoiți Sursa

[CCR] Retry when no index shard stats can be found (#34852)

Index shard stats for the follower shard are fetched, when a shard follow task is started.
This is needed in order to bootstap the shard follow task with the follower global checkpoint.

Sometimes index shard stats are not available (e.g. during a restart) and
we fail now, while it is very likely that these stats will be available some time later.
Martijn van Groningen 7 ani în urmă
părinte
comite
306f1d78f8

+ 6 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

@@ -205,7 +205,12 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
         client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> {
             IndexStats indexStats = r.getIndex(shardId.getIndexName());
             if (indexStats == null) {
-                errorHandler.accept(new IndexNotFoundException(shardId.getIndex()));
+                IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
+                if (indexMetaData != null) {
+                    errorHandler.accept(new ShardNotFoundException(shardId));
+                } else {
+                    errorHandler.accept(new IndexNotFoundException(shardId.getIndex()));
+                }
                 return;
             }
 

+ 94 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java

@@ -8,6 +8,9 @@ package org.elasticsearch.xpack;
 
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
+import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
+import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
 import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
 import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
 import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
@@ -24,9 +27,11 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.index.Index;
@@ -35,6 +40,7 @@ import org.elasticsearch.license.LicenseService;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.tasks.TaskInfo;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.InternalTestCluster;
@@ -48,6 +54,9 @@ import org.elasticsearch.xpack.core.XPackSettings;
 import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
 import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
 import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
+import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
+import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
+import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -59,14 +68,17 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Locale;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING;
 import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 
@@ -279,6 +291,88 @@ public abstract class CcrIntegTestCase extends ESTestCase {
         });
     }
 
+    protected void pauseFollow(String... indices) throws Exception {
+        for (String index : indices) {
+            final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request(index);
+            followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).get();
+        }
+        ensureNoCcrTasks();
+    }
+
+    protected void ensureNoCcrTasks() throws Exception {
+        assertBusy(() -> {
+            final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState();
+            final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
+            assertThat(tasks.tasks(), empty());
+
+            ListTasksRequest listTasksRequest = new ListTasksRequest();
+            listTasksRequest.setDetailed(true);
+            ListTasksResponse listTasksResponse = followerClient().admin().cluster().listTasks(listTasksRequest).get();
+            int numNodeTasks = 0;
+            for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
+                if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) {
+                    numNodeTasks++;
+                }
+            }
+            assertThat(numNodeTasks, equalTo(0));
+        }, 30, TimeUnit.SECONDS);
+    }
+
+    protected String getIndexSettings(final int numberOfShards, final int numberOfReplicas,
+                                    final Map<String, String> additionalIndexSettings) throws IOException {
+        final String settings;
+        try (XContentBuilder builder = jsonBuilder()) {
+            builder.startObject();
+            {
+                builder.startObject("settings");
+                {
+                    builder.field("index.number_of_shards", numberOfShards);
+                    builder.field("index.number_of_replicas", numberOfReplicas);
+                    for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
+                        builder.field(additionalSetting.getKey(), additionalSetting.getValue());
+                    }
+                }
+                builder.endObject();
+                builder.startObject("mappings");
+                {
+                    builder.startObject("doc");
+                    {
+                        builder.startObject("properties");
+                        {
+                            builder.startObject("f");
+                            {
+                                builder.field("type", "integer");
+                            }
+                            builder.endObject();
+                        }
+                        builder.endObject();
+                    }
+                    builder.endObject();
+                }
+                builder.endObject();
+            }
+            builder.endObject();
+            settings = BytesReference.bytes(builder).utf8ToString();
+        }
+        return settings;
+    }
+
+    public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex) {
+        PutFollowAction.Request request = new PutFollowAction.Request();
+        request.setRemoteCluster("leader_cluster");
+        request.setLeaderIndex(leaderIndex);
+        request.setFollowRequest(resumeFollow(followerIndex));
+        return request;
+    }
+
+    public static ResumeFollowAction.Request resumeFollow(String followerIndex) {
+        ResumeFollowAction.Request request = new ResumeFollowAction.Request();
+        request.setFollowerIndex(followerIndex);
+        request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
+        request.setReadPollTimeout(TimeValue.timeValueMillis(10));
+        return request;
+    }
+
     static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception {
         CountDownLatch latch = new CountDownLatch(1);
         clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() {

+ 0 - 82
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java

@@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ccr;
 
 import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
 import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
 import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
 import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
@@ -756,33 +755,6 @@ public class IndexFollowingIT extends CcrIntegTestCase {
         };
     }
 
-    private void pauseFollow(String... indices) throws Exception {
-        for (String index : indices) {
-            final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request(index);
-            followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).get();
-        }
-        ensureNoCcrTasks();
-    }
-
-    private void ensureNoCcrTasks() throws Exception {
-        assertBusy(() -> {
-            final ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState();
-            final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
-            assertThat(tasks.tasks(), empty());
-
-            ListTasksRequest listTasksRequest = new ListTasksRequest();
-            listTasksRequest.setDetailed(true);
-            ListTasksResponse listTasksResponse = followerClient().admin().cluster().listTasks(listTasksRequest).get();
-            int numNodeTasks = 0;
-            for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
-                if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) {
-                    numNodeTasks++;
-                }
-            }
-            assertThat(numNodeTasks, equalTo(0));
-        }, 30, TimeUnit.SECONDS);
-    }
-
     private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int value) {
         return () -> {
             final GetResponse getResponse = followerClient().prepareGet("index2", "doc", Integer.toString(value)).get();
@@ -792,45 +764,6 @@ public class IndexFollowingIT extends CcrIntegTestCase {
         };
     }
 
-    private String getIndexSettings(final int numberOfShards, final int numberOfReplicas,
-                                    final Map<String, String> additionalIndexSettings) throws IOException {
-        final String settings;
-        try (XContentBuilder builder = jsonBuilder()) {
-            builder.startObject();
-            {
-                builder.startObject("settings");
-                {
-                    builder.field("index.number_of_shards", numberOfShards);
-                    builder.field("index.number_of_replicas", numberOfReplicas);
-                    for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {
-                        builder.field(additionalSetting.getKey(), additionalSetting.getValue());
-                    }
-                }
-                builder.endObject();
-                builder.startObject("mappings");
-                {
-                    builder.startObject("doc");
-                    {
-                        builder.startObject("properties");
-                        {
-                            builder.startObject("f");
-                            {
-                                builder.field("type", "integer");
-                            }
-                            builder.endObject();
-                        }
-                        builder.endObject();
-                    }
-                    builder.endObject();
-                }
-                builder.endObject();
-            }
-            builder.endObject();
-            settings = BytesReference.bytes(builder).utf8ToString();
-        }
-        return settings;
-    }
-
     private String getIndexSettingsWithNestedMapping(final int numberOfShards, final int numberOfReplicas,
                                                      final Map<String, String> additionalIndexSettings) throws IOException {
         final String settings;
@@ -968,19 +901,4 @@ public class IndexFollowingIT extends CcrIntegTestCase {
         });
     }
 
-    public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex) {
-        PutFollowAction.Request request = new PutFollowAction.Request();
-        request.setRemoteCluster("leader_cluster");
-        request.setLeaderIndex(leaderIndex);
-        request.setFollowRequest(resumeFollow(followerIndex));
-        return request;
-    }
-
-    public static ResumeFollowAction.Request resumeFollow(String followerIndex) {
-        ResumeFollowAction.Request request = new ResumeFollowAction.Request();
-        request.setFollowerIndex(followerIndex);
-        request.setMaxRetryDelay(TimeValue.timeValueMillis(10));
-        request.setReadPollTimeout(TimeValue.timeValueMillis(10));
-        return request;
-    }
 }

+ 62 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java

@@ -0,0 +1,62 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.ccr;
+
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.xpack.CcrIntegTestCase;
+import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
+
+import java.util.Locale;
+
+import static java.util.Collections.singletonMap;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.equalTo;
+
+public class RestartIndexFollowingIT extends CcrIntegTestCase {
+
+    @Override
+    protected int numberOfNodesPerCluster() {
+        return 1;
+    }
+
+    public void testFollowIndex() throws Exception {
+        final String leaderIndexSettings = getIndexSettings(1, 0,
+            singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
+            singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true");
+        assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
+        ensureLeaderGreen("index1");
+
+        final PutFollowAction.Request followRequest = putFollow("index1", "index2");
+        followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
+
+        final long firstBatchNumDocs = randomIntBetween(2, 64);
+        logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
+        for (int i = 0; i < firstBatchNumDocs; i++) {
+            final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
+            leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
+        }
+
+        assertBusy(() -> {
+            assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits, equalTo(firstBatchNumDocs));
+        });
+
+        getFollowerCluster().fullRestart();
+        ensureFollowerGreen("index2");
+
+        final long secondBatchNumDocs = randomIntBetween(2, 64);
+        for (int i = 0; i < secondBatchNumDocs; i++) {
+            leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
+        }
+
+        assertBusy(() -> {
+            assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits,
+                equalTo(firstBatchNumDocs + secondBatchNumDocs));
+        });
+    }
+
+}