Browse Source

SNAPSHOT: Improve Resilience SnapshotShardService (#36113)

* Resolve the index in the snapshotting thread
* Added test for routing table - snapshot state mismatch
Armin Braun 6 years ago
parent
commit
433a506d06

+ 2 - 2
server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

@@ -323,15 +323,15 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
 
                 for (final Map.Entry<ShardId, IndexShardSnapshotStatus> shardEntry : entry.getValue().entrySet()) {
                     final ShardId shardId = shardEntry.getKey();
-                    final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
                     final IndexId indexId = indicesMap.get(shardId.getIndexName());
-                    assert indexId != null;
                     executor.execute(new AbstractRunnable() {
 
                         final SetOnce<Exception> failure = new SetOnce<>();
 
                         @Override
                         public void doRun() {
+                            final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
+                            assert indexId != null;
                             snapshot(indexShard, snapshot, indexId, shardEntry.getValue());
                         }
 

+ 46 - 0
server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java

@@ -78,6 +78,8 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
 import org.elasticsearch.test.ESIntegTestCase.Scope;
 import org.elasticsearch.test.InternalTestCluster;
 import org.elasticsearch.test.TestCustomMetaData;
+import org.elasticsearch.test.disruption.BusyMasterServiceDisruption;
+import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
 import org.elasticsearch.test.rest.FakeRestRequest;
 
 import java.io.IOException;
@@ -1151,6 +1153,50 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
         assertThat(anotherStats.getTotalSize(), is(snapshot1FileSize));
     }
 
+    public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception {
+        logger.info("-->  starting a master node and two data nodes");
+        internalCluster().startMasterOnlyNode();
+        internalCluster().startDataOnlyNodes(2);
+        logger.info("-->  creating repository");
+        assertAcked(client().admin().cluster().preparePutRepository("test-repo")
+            .setType("mock").setSettings(Settings.builder()
+                .put("location", randomRepoPath())
+                .put("compress", randomBoolean())
+                .put("max_snapshot_bytes_per_sec", "1000b")
+                .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
+        assertAcked(prepareCreate("test-idx", 0, Settings.builder()
+            .put("number_of_shards", 5).put("number_of_replicas", 0)));
+        ensureGreen();
+        logger.info("-->  indexing some data");
+        final int numdocs = randomIntBetween(50, 100);
+        IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs];
+        for (int i = 0; i < builders.length; i++) {
+            builders[i] = client().prepareIndex("test-idx", "type1",
+                Integer.toString(i)).setSource("field1", "bar " + i);
+        }
+        indexRandom(true, builders);
+        flushAndRefresh();
+        final String dataNode = blockNodeWithIndex("test-repo", "test-idx");
+        logger.info("-->  snapshot");
+        client(internalCluster().getMasterName()).admin().cluster()
+            .prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
+        ServiceDisruptionScheme disruption = new BusyMasterServiceDisruption(random(), Priority.HIGH);
+        setDisruptionScheme(disruption);
+        disruption.startDisrupting();
+        logger.info("-->  restarting data node, which should cause primary shards to be failed");
+        internalCluster().restartNode(dataNode, InternalTestCluster.EMPTY_CALLBACK);
+        unblockNode("test-repo", dataNode);
+        disruption.stopDisrupting();
+        // check that snapshot completes
+        assertBusy(() -> {
+            GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster()
+                .prepareGetSnapshots("test-repo").setSnapshots("test-snap").setIgnoreUnavailable(true).get();
+            assertEquals(1, snapshotsStatusResponse.getSnapshots().size());
+            SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
+            assertTrue(snapshotInfo.state().toString(), snapshotInfo.state().completed());
+        }, 30, TimeUnit.SECONDS);
+    }
+
     private long calculateTotalFilesSize(List<Path> files) {
         return files.stream().mapToLong(f -> {
             try {

+ 89 - 0
test/framework/src/main/java/org/elasticsearch/test/disruption/BusyMasterServiceDisruption.java

@@ -0,0 +1,89 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.test.disruption;
+
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.test.InternalTestCluster;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class BusyMasterServiceDisruption extends SingleNodeDisruption {
+    private final AtomicBoolean active = new AtomicBoolean();
+    private final Priority priority;
+
+    public BusyMasterServiceDisruption(Random random, Priority priority) {
+        super(random);
+        this.priority = priority;
+    }
+
+    @Override
+    public void startDisrupting() {
+        disruptedNode = cluster.getMasterName();
+        final String disruptionNodeCopy = disruptedNode;
+        if (disruptionNodeCopy == null) {
+            return;
+        }
+        ClusterService clusterService = cluster.getInstance(ClusterService.class, disruptionNodeCopy);
+        if (clusterService == null) {
+            return;
+        }
+        logger.info("making master service busy on node [{}] at priority [{}]", disruptionNodeCopy, priority);
+        active.set(true);
+        submitTask(clusterService);
+    }
+
+    private void submitTask(ClusterService clusterService) {
+        clusterService.getMasterService().submitStateUpdateTask(
+            "service_disruption_block",
+            new ClusterStateUpdateTask(priority) {
+                @Override
+                public ClusterState execute(ClusterState currentState) {
+                    if (active.get()) {
+                        submitTask(clusterService);
+                    }
+                    return currentState;
+                }
+
+                @Override
+                public void onFailure(String source, Exception e) {
+                    logger.error("unexpected error during disruption", e);
+                }
+            }
+        );
+    }
+
+    @Override
+    public void stopDisrupting() {
+        active.set(false);
+    }
+
+    @Override
+    public void removeAndEnsureHealthy(InternalTestCluster cluster) {
+        removeFromCluster(cluster);
+    }
+
+    @Override
+    public TimeValue expectedTimeToHeal() {
+        return TimeValue.timeValueMinutes(0);
+    }
+}