فهرست منبع

repository integrity health indicator services (#83445)

Adding implementations that will check snapshot repository corruption
flag and report health based on it.

Co-authored-by: Tanguy Leroux <tlrx.dev@gmail.com>
Ievgen Degtiarenko 3 سال پیش
والد
کامیت
6b1f748214

+ 5 - 0
docs/changelog/83445.yaml

@@ -0,0 +1,5 @@
+pr: 83445
+summary: Repository integrity health indicator services
+area: Health
+type: enhancement
+issues: []

+ 81 - 0
server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoryIntegrityHealthIndicatorServiceTestIT.java

@@ -0,0 +1,81 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.snapshots;
+
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.health.GetHealthAction;
+import org.elasticsearch.health.HealthStatus;
+import org.elasticsearch.repositories.RepositoryData;
+import org.elasticsearch.repositories.RepositoryException;
+import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import static org.elasticsearch.health.HealthStatus.GREEN;
+import static org.elasticsearch.health.HealthStatus.RED;
+import static org.elasticsearch.health.ServerHealthComponents.SNAPSHOT;
+import static org.elasticsearch.snapshots.RepositoryIntegrityHealthIndicatorService.NAME;
+import static org.elasticsearch.test.hamcrest.ThrowableAssertions.assertThatThrows;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+
+public class RepositoryIntegrityHealthIndicatorServiceTestIT extends AbstractSnapshotIntegTestCase {
+
+    public void testRepositoryIntegrityHealthIndicator() throws IOException, InterruptedException {
+
+        var client = client();
+
+        var repository = "test-repo";
+        var location = randomRepoPath();
+
+        createRepository(
+            repository,
+            "fs",
+            Settings.builder()
+                .put("location", location)
+                .put("compress", false)
+                // Don't cache repository data because the test manually modifies the repository data
+                .put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false)
+        );
+
+        assertSnapshotRepositoryHealth("Indicator should be green after empty repository is created", client, GREEN);
+
+        createIndex("test-index-1");
+        indexRandomDocs("test-index-1", randomIntBetween(1, 10));
+        createFullSnapshot(repository, "snapshot-1");
+
+        assertSnapshotRepositoryHealth("Indicator should be green after successful snapshot is taken", client, GREEN);
+
+        corruptRepository(repository, location);
+        // Currently, the health indicator is not proactively checking the repository and
+        // instead relies on other operations to detect and flag repository corruption
+        assertThatThrows(
+            () -> createFullSnapshot(repository, "snapshot-2"),
+            RepositoryException.class,
+            containsString("[" + repository + "] Could not read repository data")
+        );
+
+        assertSnapshotRepositoryHealth("Indicator should be red after file is deleted from the repository", client, RED);
+
+        deleteRepository(repository);
+    }
+
+    private void assertSnapshotRepositoryHealth(String message, Client client, HealthStatus status) {
+        var response = client.execute(GetHealthAction.INSTANCE, new GetHealthAction.Request()).actionGet();
+        assertThat(message, response.findComponent(SNAPSHOT).findIndicator(NAME).status(), equalTo(status));
+    }
+
+    private void corruptRepository(String name, Path location) throws IOException {
+        final RepositoryData repositoryData = getRepositoryData(name);
+        Files.delete(location.resolve("index-" + repositoryData.getGenId()));
+    }
+}

+ 3 - 0
server/src/main/java/org/elasticsearch/common/util/CollectionUtils.java

@@ -346,4 +346,7 @@ public class CollectionUtils {
         return list.isEmpty() ? List.of() : Collections.unmodifiableList(list);
     }
 
+    public static <E> List<E> limitSize(List<E> list, int size) {
+        return list.size() <= size ? list : list.subList(0, size);
+    }
 }

+ 1 - 1
server/src/main/java/org/elasticsearch/health/HealthIndicatorDetails.java

@@ -18,7 +18,7 @@ public interface HealthIndicatorDetails extends ToXContentObject {
     HealthIndicatorDetails EMPTY = new HealthIndicatorDetails() {
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
-            return builder;
+            return builder.startObject().endObject();
         }
     };
 }

+ 3 - 1
server/src/main/java/org/elasticsearch/node/Node.java

@@ -167,6 +167,7 @@ import org.elasticsearch.search.aggregations.support.AggregationUsageService;
 import org.elasticsearch.search.fetch.FetchPhase;
 import org.elasticsearch.shutdown.PluginShutdownService;
 import org.elasticsearch.snapshots.InternalSnapshotsInfoService;
+import org.elasticsearch.snapshots.RepositoryIntegrityHealthIndicatorService;
 import org.elasticsearch.snapshots.RestoreService;
 import org.elasticsearch.snapshots.SnapshotShardsService;
 import org.elasticsearch.snapshots.SnapshotsInfoService;
@@ -901,7 +902,8 @@ public class Node implements Closeable {
             );
 
             List<HealthIndicatorService> serverHealthIndicatorServices = List.of(
-                new InstanceHasMasterHealthIndicatorService(clusterService)
+                new InstanceHasMasterHealthIndicatorService(clusterService),
+                new RepositoryIntegrityHealthIndicatorService(clusterService)
             );
             List<HealthIndicatorService> pluginHealthIndicatorServices = pluginsService.filterPlugins(HealthPlugin.class)
                 .stream()

+ 103 - 0
server/src/main/java/org/elasticsearch/snapshots/RepositoryIntegrityHealthIndicatorService.java

@@ -0,0 +1,103 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.snapshots;
+
+import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
+import org.elasticsearch.cluster.metadata.RepositoryMetadata;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.health.HealthIndicatorDetails;
+import org.elasticsearch.health.HealthIndicatorResult;
+import org.elasticsearch.health.HealthIndicatorService;
+import org.elasticsearch.health.SimpleHealthIndicatorDetails;
+import org.elasticsearch.repositories.RepositoryData;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.common.Strings.collectionToDelimitedStringWithLimit;
+import static org.elasticsearch.common.util.CollectionUtils.limitSize;
+import static org.elasticsearch.health.HealthStatus.GREEN;
+import static org.elasticsearch.health.HealthStatus.RED;
+import static org.elasticsearch.health.ServerHealthComponents.SNAPSHOT;
+
+/**
+ * This indicator reports health for snapshot repositories.
+ *
+ * Indicator will report RED status when any of snapshot repositories is marked as corrupted.
+ * Data might not be backed up in such cases.
+ *
+ * Corrupted repository most likely need to be manually cleaned and a new snapshot needs to be created from scratch.
+ */
+public class RepositoryIntegrityHealthIndicatorService implements HealthIndicatorService {
+
+    public static final String NAME = "repository_integrity";
+
+    private final ClusterService clusterService;
+
+    public RepositoryIntegrityHealthIndicatorService(ClusterService clusterService) {
+        this.clusterService = clusterService;
+    }
+
+    @Override
+    public String name() {
+        return NAME;
+    }
+
+    @Override
+    public String component() {
+        return SNAPSHOT;
+    }
+
+    @Override
+    public HealthIndicatorResult calculate() {
+        var snapshotMetadata = clusterService.state().metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY);
+
+        if (snapshotMetadata.repositories().isEmpty()) {
+            return createIndicator(GREEN, "No repositories configured.", HealthIndicatorDetails.EMPTY);
+        }
+
+        var corrupted = snapshotMetadata.repositories()
+            .stream()
+            .filter(repository -> repository.generation() == RepositoryData.CORRUPTED_REPO_GEN)
+            .map(RepositoryMetadata::name)
+            .toList();
+
+        var totalRepositories = snapshotMetadata.repositories().size();
+        var corruptedRepositories = corrupted.size();
+
+        if (corrupted.isEmpty()) {
+            return createIndicator(
+                GREEN,
+                "No corrupted repositories.",
+                new SimpleHealthIndicatorDetails(Map.of("total_repositories", totalRepositories))
+            );
+        }
+
+        return createIndicator(
+            RED,
+            createCorruptedRepositorySummary(corrupted),
+            new SimpleHealthIndicatorDetails(
+                Map.of(
+                    "total_repositories",
+                    totalRepositories,
+                    "corrupted_repositories",
+                    corruptedRepositories,
+                    "corrupted",
+                    limitSize(corrupted, 10)
+                )
+            )
+        );
+    }
+
+    private static String createCorruptedRepositorySummary(List<String> corrupted) {
+        var message = new StringBuilder().append("Detected [").append(corrupted.size()).append("] corrupted repositories: ");
+        collectionToDelimitedStringWithLimit(corrupted, ",", "[", "].", 1024, message);
+        return message.toString();
+    }
+}

+ 11 - 0
server/src/test/java/org/elasticsearch/common/util/CollectionUtilsTests.java

@@ -27,6 +27,7 @@ import java.util.TreeSet;
 
 import static java.util.Collections.emptyMap;
 import static org.elasticsearch.common.util.CollectionUtils.eagerPartition;
+import static org.elasticsearch.common.util.CollectionUtils.limitSize;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -177,4 +178,14 @@ public class CollectionUtilsTests extends ESTestCase {
         }
 
     }
+
+    public void testLimitSizeOfShortList() {
+        var shortList = randomList(0, 10, () -> "item");
+        assertThat(limitSize(shortList, 10), equalTo(shortList));
+    }
+
+    public void testLimitSizeOfLongList() {
+        var longList = randomList(10, 100, () -> "item");
+        assertThat(limitSize(longList, 10), equalTo(longList.subList(0, 10)));
+    }
 }

+ 109 - 0
server/src/test/java/org/elasticsearch/snapshots/RepositoryIntegrityHealthIndicatorServiceTests.java

@@ -0,0 +1,109 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.snapshots;
+
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
+import org.elasticsearch.cluster.metadata.RepositoryMetadata;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.health.HealthIndicatorDetails;
+import org.elasticsearch.health.HealthIndicatorResult;
+import org.elasticsearch.health.SimpleHealthIndicatorDetails;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.common.util.CollectionUtils.appendToCopy;
+import static org.elasticsearch.health.HealthStatus.GREEN;
+import static org.elasticsearch.health.HealthStatus.RED;
+import static org.elasticsearch.health.ServerHealthComponents.SNAPSHOT;
+import static org.elasticsearch.repositories.RepositoryData.CORRUPTED_REPO_GEN;
+import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
+import static org.elasticsearch.snapshots.RepositoryIntegrityHealthIndicatorService.NAME;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RepositoryIntegrityHealthIndicatorServiceTests extends ESTestCase {
+
+    public void testIsGreenWhenAllRepositoriesAreNotCorrupted() {
+        var repos = randomList(1, 10, () -> createRepositoryMetadata("healthy-repo", false));
+        var clusterState = createClusterStateWith(new RepositoriesMetadata(repos));
+        var service = createRepositoryCorruptionHealthIndicatorService(clusterState);
+
+        assertThat(
+            service.calculate(),
+            equalTo(
+                new HealthIndicatorResult(
+                    NAME,
+                    SNAPSHOT,
+                    GREEN,
+                    "No corrupted repositories.",
+                    new SimpleHealthIndicatorDetails(Map.of("total_repositories", repos.size()))
+                )
+            )
+        );
+    }
+
+    public void testIsRedWhenAtLeastOneRepoIsCorrupted() {
+        var repos = appendToCopy(
+            randomList(1, 10, () -> createRepositoryMetadata("healthy-repo", false)),
+            createRepositoryMetadata("corrupted-repo", true)
+        );
+        var clusterState = createClusterStateWith(new RepositoriesMetadata(repos));
+        var service = createRepositoryCorruptionHealthIndicatorService(clusterState);
+
+        assertThat(
+            service.calculate(),
+            equalTo(
+                new HealthIndicatorResult(
+                    NAME,
+                    SNAPSHOT,
+                    RED,
+                    "Detected [1] corrupted repositories: [corrupted-repo].",
+                    new SimpleHealthIndicatorDetails(
+                        Map.of("total_repositories", repos.size(), "corrupted_repositories", 1, "corrupted", List.of("corrupted-repo"))
+                    )
+                )
+            )
+        );
+    }
+
+    public void testIsGreenWhenNoMetadata() {
+        var clusterState = createClusterStateWith(null);
+        var service = createRepositoryCorruptionHealthIndicatorService(clusterState);
+
+        assertThat(
+            service.calculate(),
+            equalTo(new HealthIndicatorResult(NAME, SNAPSHOT, GREEN, "No repositories configured.", HealthIndicatorDetails.EMPTY))
+        );
+    }
+
+    private static ClusterState createClusterStateWith(RepositoriesMetadata metadata) {
+        var builder = ClusterState.builder(new ClusterName("test-cluster"));
+        if (metadata != null) {
+            builder.metadata(Metadata.builder().putCustom(RepositoriesMetadata.TYPE, metadata));
+        }
+        return builder.build();
+    }
+
+    private static RepositoryMetadata createRepositoryMetadata(String name, boolean corrupted) {
+        return new RepositoryMetadata(name, "uuid", "s3", Settings.EMPTY, corrupted ? CORRUPTED_REPO_GEN : EMPTY_REPO_GEN, EMPTY_REPO_GEN);
+    }
+
+    private static RepositoryIntegrityHealthIndicatorService createRepositoryCorruptionHealthIndicatorService(ClusterState clusterState) {
+        var clusterService = mock(ClusterService.class);
+        when(clusterService.state()).thenReturn(clusterState);
+        return new RepositoryIntegrityHealthIndicatorService(clusterService);
+    }
+}

+ 4 - 0
test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

@@ -334,6 +334,10 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
         createRepository(logger, repoName, type, randomRepositorySettings(), true);
     }
 
+    protected void deleteRepository(String repoName) {
+        assertAcked(client().admin().cluster().prepareDeleteRepository(repoName));
+    }
+
     public static Settings.Builder randomRepositorySettings() {
         final Settings.Builder settings = Settings.builder();
         settings.put("location", randomRepoPath()).put("compress", randomBoolean());