Browse Source

Fix Incorrect Snapshot Shar Status for DONE Shards in Running Snapshots (#58390)

Minor bugs/inconsistencies:

If a shard hasn't changed at all we were reporting `0` for total size and total file count
while it was ongoing.

If a data node restarts/drops out during snapshot creation the fallback logic did not load the correct statistic from the repository but just created a status with `0` counts from the snapshot state in the CS. Added a fallback to reading from the repository in this case.
Armin Braun 5 years ago
parent
commit
84f7e54b48

+ 111 - 0
server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java

@@ -23,10 +23,14 @@ import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
 import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStats;
 import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
 import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.SnapshotsInProgress;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.core.internal.io.IOUtils;
@@ -189,4 +193,111 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
         assertThat(found.snapshotId(), is(snapshotInfo.snapshotId()));
         assertThat(found.state(), is(SnapshotState.SUCCESS));
     }
+
+    /**
+     * Tests the following sequence of steps:
+     * 1. Start snapshot of two shards (both located on separate data nodes).
+     * 2. Have one of the shards snapshot completely and the other block
+     * 3. Restart the data node that completed its shard snapshot
+     * 4. Make sure that snapshot status APIs show correct file-counts and -sizes
+     *
+     * @throws Exception on failure
+     */
+    public void testCorrectCountsForDoneShards() throws Exception {
+        final String indexOne = "index-1";
+        final String indexTwo = "index-2";
+        final List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
+        final String dataNodeOne = dataNodes.get(0);
+        final String dataNodeTwo = dataNodes.get(1);
+
+        createIndex(indexOne, singleShardOneNode(dataNodeOne));
+        indexDoc(indexOne, "some_doc_id", "foo", "bar");
+        createIndex(indexTwo, singleShardOneNode(dataNodeTwo));
+        indexDoc(indexTwo, "some_doc_id", "foo", "bar");
+
+        final String repoName = "test-repo";
+        createRepository(repoName, "mock", randomRepoPath());
+
+        blockDataNode(repoName, dataNodeOne);
+
+        final String snapshotOne = "snap-1";
+        // restarting a data node below so using a master client here
+        final ActionFuture<CreateSnapshotResponse> responseSnapshotOne = internalCluster().masterClient().admin()
+            .cluster().prepareCreateSnapshot(repoName, snapshotOne).setWaitForCompletion(true).execute();
+
+        assertBusy(() -> {
+            final SnapshotStatus snapshotStatusOne = getSnapshotStatus(repoName, snapshotOne);
+            final SnapshotIndexShardStatus snapshotShardState = stateFirstShard(snapshotStatusOne, indexTwo);
+            assertThat(snapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE));
+            assertThat(snapshotShardState.getStats().getTotalFileCount(), greaterThan(0));
+            assertThat(snapshotShardState.getStats().getTotalSize(), greaterThan(0L));
+        }, 30L, TimeUnit.SECONDS);
+
+        final SnapshotStats snapshotShardStats =
+            stateFirstShard(getSnapshotStatus(repoName, snapshotOne), indexTwo).getStats();
+        final int totalFiles = snapshotShardStats.getTotalFileCount();
+        final long totalFileSize = snapshotShardStats.getTotalSize();
+
+        internalCluster().restartNode(dataNodeTwo);
+
+        final SnapshotIndexShardStatus snapshotShardStateAfterNodeRestart =
+            stateFirstShard(getSnapshotStatus(repoName, snapshotOne), indexTwo);
+        assertThat(snapshotShardStateAfterNodeRestart.getStage(), is(SnapshotIndexShardStage.DONE));
+        assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalFileCount(), equalTo(totalFiles));
+        assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalSize(), equalTo(totalFileSize));
+
+        unblockAllDataNodes(repoName);
+        assertThat(responseSnapshotOne.get().getSnapshotInfo().state(), is(SnapshotState.SUCCESS));
+
+        // indexing another document to the second index so it will do writes during the snapshot and we can block on those writes
+        indexDoc(indexTwo, "some_other_doc_id", "foo", "other_bar");
+
+        blockDataNode(repoName, dataNodeTwo);
+
+        final String snapshotTwo = "snap-2";
+        final ActionFuture<CreateSnapshotResponse> responseSnapshotTwo =
+            client().admin().cluster().prepareCreateSnapshot(repoName, snapshotTwo).setWaitForCompletion(true).execute();
+
+        waitForBlock(dataNodeTwo, repoName, TimeValue.timeValueSeconds(30L));
+
+        assertBusy(() -> {
+            final SnapshotStatus snapshotStatusOne = getSnapshotStatus(repoName, snapshotOne);
+            final SnapshotStatus snapshotStatusTwo = getSnapshotStatus(repoName, snapshotTwo);
+            final SnapshotIndexShardStatus snapshotShardStateOne = stateFirstShard(snapshotStatusOne, indexOne);
+            final SnapshotIndexShardStatus snapshotShardStateTwo = stateFirstShard(snapshotStatusTwo, indexOne);
+            assertThat(snapshotShardStateOne.getStage(), is(SnapshotIndexShardStage.DONE));
+            assertThat(snapshotShardStateTwo.getStage(), is(SnapshotIndexShardStage.DONE));
+            final int totalFilesShardOne = snapshotShardStateOne.getStats().getTotalFileCount();
+            final long totalSizeShardOne = snapshotShardStateOne.getStats().getTotalSize();
+            assertThat(totalFilesShardOne, greaterThan(0));
+            assertThat(totalSizeShardOne, greaterThan(0L));
+            assertThat(totalFilesShardOne, equalTo(snapshotShardStateTwo.getStats().getTotalFileCount()));
+            assertThat(totalSizeShardOne, equalTo(snapshotShardStateTwo.getStats().getTotalSize()));
+            assertThat(snapshotShardStateTwo.getStats().getIncrementalFileCount(), equalTo(0));
+            assertThat(snapshotShardStateTwo.getStats().getIncrementalSize(), equalTo(0L));
+        }, 30L, TimeUnit.SECONDS);
+
+        unblockAllDataNodes(repoName);
+        assertThat(responseSnapshotTwo.get().getSnapshotInfo().state(), is(SnapshotState.SUCCESS));
+    }
+
+    private static SnapshotIndexShardStatus stateFirstShard(SnapshotStatus snapshotStatus, String indexName) {
+        return snapshotStatus.getIndices().get(indexName).getShards().get(0);
+    }
+
+    private static SnapshotStatus getSnapshotStatus(String repoName, String snapshotName) {
+        try {
+            return client().admin().cluster().prepareSnapshotStatus(repoName).setSnapshots(snapshotName)
+                .get().getSnapshots().get(0);
+        } catch (SnapshotMissingException e) {
+            throw new AssertionError(e);
+        }
+    }
+
+    private static Settings singleShardOneNode(String node) {
+        return Settings.builder()
+            .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
+            .put("index.routing.allocation.include._name", node).build();
+    }
 }

+ 19 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

@@ -165,6 +165,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
             for (SnapshotsInProgress.Entry entry : currentSnapshotEntries) {
                 currentSnapshotNames.add(entry.snapshot().getSnapshotId().getName());
                 List<SnapshotIndexShardStatus> shardStatusBuilder = new ArrayList<>();
+                Map<String, IndexId> indexIdLookup = null;
                 for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shardEntry : entry.shards()) {
                     SnapshotsInProgress.ShardSnapshotStatus status = shardEntry.value;
                     if (status.nodeId() != null) {
@@ -182,6 +183,10 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
                             }
                         }
                     }
+                    // We failed to find the status of the shard from the responses we received from data nodes.
+                    // This can happen if nodes drop out of the cluster completely or restart during the snapshot.
+                    // We rebuild the information they would have provided from their in memory state from the cluster
+                    // state and the repository contents in the below logic
                     final SnapshotIndexShardStage stage;
                     switch (shardEntry.value.state()) {
                         case FAILED:
@@ -199,7 +204,20 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
                         default:
                             throw new IllegalArgumentException("Unknown snapshot state " + shardEntry.value.state());
                     }
-                    SnapshotIndexShardStatus shardStatus = new SnapshotIndexShardStatus(shardEntry.key, stage);
+                    final SnapshotIndexShardStatus shardStatus;
+                    if (stage == SnapshotIndexShardStage.DONE) {
+                        // Shard snapshot completed successfully so we should be able to load the exact statistics for this
+                        // shard from the repository already.
+                        if (indexIdLookup == null) {
+                            indexIdLookup = entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()));
+                        }
+                        final ShardId shardId = shardEntry.key;
+                        shardStatus = new SnapshotIndexShardStatus(shardId, repositoriesService.repository(entry.repository())
+                            .getShardSnapshotStatus(entry.snapshot().getSnapshotId(), indexIdLookup.get(shardId.getIndexName()),
+                                shardId).asCopy());
+                    } else {
+                        shardStatus = new SnapshotIndexShardStatus(shardEntry.key, stage);
+                    }
                     shardStatusBuilder.add(shardStatus);
                 }
                 builder.add(new SnapshotStatus(entry.snapshot(), entry.state(),

+ 4 - 0
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -1796,6 +1796,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                     }
                 }
             } else {
+                for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesFromSegmentInfos) {
+                    indexTotalNumberOfFiles++;
+                    indexTotalFileSize += fileInfo.length();
+                }
                 indexCommitPointFiles = filesFromSegmentInfos;
             }
 

+ 7 - 0
test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

@@ -1606,6 +1606,13 @@ public final class InternalTestCluster extends TestCluster {
         }
     }
 
+    /**
+     * Restarts a node.
+     */
+    public void restartNode(String nodeName) throws Exception {
+        restartNode(nodeName, EMPTY_CALLBACK);
+    }
+
     /**
      * Restarts a node and calls the callback during restart.
      */