Ver código fonte

Fix Source Only Snapshot Permanently Broken on Broken _snapshot Directory (#71459)

Best effort fix, pruning the directory in case of any trouble syncing the snapshot to it
as would be the case with e.g. existing dangling files from a previous aborted sync.
Armin Braun 4 anos atrás
pai
commit
51a1dcfe21

+ 39 - 0
x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/snapshots/SourceOnlySnapshotIT.java

@@ -30,6 +30,7 @@ import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.engine.EngineFactory;
 import org.elasticsearch.index.mapper.SeqNoFieldMapper;
 import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.plugins.EnginePlugin;
 import org.elasticsearch.plugins.Plugin;
@@ -43,6 +44,9 @@ import org.elasticsearch.test.ESIntegTestCase;
 import org.hamcrest.Matchers;
 
 import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -147,6 +151,41 @@ public class SourceOnlySnapshotIT extends ESIntegTestCase {
         assertHits(sourceIdx, builders.length, true);
     }
 
+    public void testSnapshotWithDanglingLocalSegment() throws IOException {
+        logger.info("-->  starting a master node and a data node");
+        internalCluster().startMasterOnlyNode();
+        final String dataNode = internalCluster().startDataOnlyNode();
+
+        final String repo = "test-repo";
+        logger.info("-->  creating repository");
+        assertAcked(client().admin().cluster().preparePutRepository(repo).setType("source")
+                .setSettings(Settings.builder().put("location", randomRepoPath()).put("delegate_type", "fs")
+                        .put("compress", randomBoolean())));
+
+        final String indexName = "test-idx";
+        createIndex(indexName);
+        client().prepareIndex(indexName).setSource("foo", "bar").get();
+        client().admin().cluster().prepareCreateSnapshot(repo, "snapshot-1").setWaitForCompletion(true).get();
+
+        client().prepareIndex(indexName).setSource("foo", "baz").get();
+        client().admin().cluster().prepareCreateSnapshot(repo, "snapshot-2").setWaitForCompletion(true).get();
+
+        logger.info("--> randomly deleting files from the local _snapshot path to simulate corruption");
+        Path snapshotShardPath = internalCluster().getInstance(IndicesService.class, dataNode).indexService(
+                clusterService().state().metadata().index(indexName).getIndex()).getShard(0).shardPath().getDataPath()
+                .resolve("_snapshot");
+        try (DirectoryStream<Path> localFiles = Files.newDirectoryStream(snapshotShardPath)) {
+            for (Path localFile : localFiles) {
+                if (randomBoolean()) {
+                    Files.delete(localFile);
+                }
+            }
+        }
+
+        assertEquals(SnapshotState.SUCCESS, client().admin().cluster().prepareCreateSnapshot(repo, "snapshot-3")
+                .setWaitForCompletion(true).get().getSnapshotInfo().state());
+    }
+
     private void assertMappings(String sourceIdx, boolean requireRouting, boolean useNested) throws IOException {
         GetMappingsResponse getMappingsResponse = client().admin().indices().prepareGetMappings(sourceIdx).get();
         MappingMetadata mapping = getMappingsResponse.getMappings().get(sourceIdx);

+ 19 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java

@@ -6,6 +6,10 @@
  */
 package org.elasticsearch.snapshots;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.SegmentInfos;
@@ -22,6 +26,7 @@ import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.lucene.search.Queries;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
@@ -43,6 +48,8 @@ import org.elasticsearch.repositories.ShardGenerations;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -73,6 +80,8 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
     public static final Setting<Boolean> SOURCE_ONLY = Setting.boolSetting("index.source_only", false, Setting
         .Property.IndexScope, Setting.Property.Final, Setting.Property.PrivateIndex);
 
+    private static final Logger logger = LogManager.getLogger(SourceOnlySnapshotRepository.class);
+
     private static final String SNAPSHOT_DIR_NAME = "_snapshot";
 
     SourceOnlySnapshotRepository(Repository in) {
@@ -146,8 +155,16 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
             }, Store.OnClose.EMPTY);
             Supplier<Query> querySupplier = mapperService.hasNested() ? Queries::newNestedFilter : null;
             // SourceOnlySnapshot will take care of soft- and hard-deletes no special casing needed here
-            SourceOnlySnapshot snapshot = new SourceOnlySnapshot(overlayDir, querySupplier);
-            snapshot.syncSnapshot(snapshotIndexCommit);
+            SourceOnlySnapshot snapshot;
+            snapshot = new SourceOnlySnapshot(overlayDir, querySupplier);
+            try {
+                snapshot.syncSnapshot(snapshotIndexCommit);
+            } catch (NoSuchFileException | CorruptIndexException | FileAlreadyExistsException e) {
+                logger.warn(() -> new ParameterizedMessage(
+                        "Existing staging directory [{}] appears corrupted and will be pruned and recreated.", snapPath), e);
+                Lucene.cleanLuceneIndex(overlayDir);
+                snapshot.syncSnapshot(snapshotIndexCommit);
+            }
             // we will use the lucene doc ID as the seq ID so we set the local checkpoint to maxDoc with a new index UUID
             SegmentInfos segmentInfos = tempStore.readLastCommittedSegmentsInfo();
             final long maxDoc = segmentInfos.totalMaxDoc();