Ver código fonte

Share IT Infrastructure between Core Snapshot and SLM ITs (#59082)

For #58994 it would be useful to be able to share test infrastructure.
This PR shares `AbstractSnapshotIntegTestCase` for that purpose, dries up SLM tests
accordingly and adds a shared and efficient (compared to the previous implementations)
way of waiting for no running snapshot operations to the test infrastructure to dry things up further.
Armin Braun 5 anos atrás
pai
commit
4ed6c0e05d

+ 4 - 25
server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java

@@ -24,9 +24,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes
 import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.cluster.ClusterChangedEvent;
-import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateListener;
-import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
 import org.elasticsearch.cluster.SnapshotsInProgress;
 import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
@@ -77,7 +75,7 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
     public void testDisruptionAfterFinalization() throws Exception {
         final String idxName = "test";
         internalCluster().startMasterOnlyNodes(3);
-        internalCluster().startDataOnlyNode();
+        final String dataNode = internalCluster().startDataOnlyNode();
         ensureStableCluster(4);
 
         createRandomIndex(idxName);
@@ -122,7 +120,7 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
         logger.info("--> waiting for disruption to start");
         assertTrue(disruptionStarted.await(1, TimeUnit.MINUTES));
 
-        assertAllSnapshotsCompleted();
+        awaitNoMoreRunningOperations(dataNode);
 
         logger.info("--> verify that snapshot was successful or no longer exist");
         assertBusy(() -> {
@@ -150,7 +148,7 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
             assertThat(sne.getSnapshotName(), is(snapshot));
         }
 
-        assertAllSnapshotsCompleted();
+        awaitNoMoreRunningOperations(dataNode);
     }
 
     public void testDisruptionAfterShardFinalization() throws Exception {
@@ -239,7 +237,7 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
         unblockNode(repoName, dataNode);
 
         networkDisruption.stopDisrupting();
-        assertAllSnapshotsCompleted();
+        awaitNoMoreRunningOperations(dataNode);
 
         logger.info("--> make sure isolated master responds to snapshot request");
         final SnapshotException sne =
@@ -247,25 +245,6 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
         assertThat(sne.getMessage(), endsWith("no longer master"));
     }
 
-    private void assertAllSnapshotsCompleted() throws Exception {
-        logger.info("--> wait until the snapshot is done");
-        assertBusy(() -> {
-            ClusterState state = dataNodeClient().admin().cluster().prepareState().get().getState();
-            SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
-            SnapshotDeletionsInProgress snapshotDeletionsInProgress =
-                state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY);
-            if (snapshots.entries().isEmpty() == false) {
-                logger.info("Current snapshot state [{}]", snapshots.entries().get(0).state());
-                fail("Snapshot is still running");
-            } else if (snapshotDeletionsInProgress.hasDeletionsInProgress()) {
-                logger.info("Current snapshot deletion state [{}]", snapshotDeletionsInProgress);
-                fail("Snapshot deletion is still running");
-            } else {
-                logger.info("Snapshot is no longer in the cluster state");
-            }
-        }, 1L, TimeUnit.MINUTES);
-    }
-
     private void assertSnapshotExists(String repository, String snapshot) {
         GetSnapshotsResponse snapshotsStatusResponse = dataNodeClient().admin().cluster().prepareGetSnapshots(repository)
                 .setSnapshots(snapshot).get();

+ 40 - 0
server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java → test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

@@ -24,9 +24,13 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateObserver;
+import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
 import org.elasticsearch.cluster.SnapshotsInProgress;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
@@ -37,6 +41,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.node.NodeClosedException;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
@@ -47,6 +52,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.snapshots.mockstore.MockRepository;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.VersionUtils;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.junit.After;
 
 import java.io.IOException;
@@ -61,7 +67,9 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.hamcrest.Matchers.empty;
@@ -366,4 +374,36 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
     protected void assertDocCount(String index, long count) {
         assertEquals(getCountForIndex(index), count);
     }
+
+    protected void awaitNoMoreRunningOperations(String viaNode) throws Exception {
+        logger.info("--> verify no more operations in the cluster state");
+        awaitClusterState(viaNode, state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().isEmpty() &&
+                state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).hasDeletionsInProgress() == false);
+    }
+
+    private void awaitClusterState(String viaNode, Predicate<ClusterState> statePredicate) throws Exception {
+        final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, viaNode);
+        final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, viaNode);
+        final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext());
+        if (statePredicate.test(observer.setAndGetObservedState()) == false) {
+            final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
+            observer.waitForNextChange(new ClusterStateObserver.Listener() {
+                @Override
+                public void onNewClusterState(ClusterState state) {
+                    future.onResponse(null);
+                }
+
+                @Override
+                public void onClusterServiceClose() {
+                    future.onFailure(new NodeClosedException(clusterService.localNode()));
+                }
+
+                @Override
+                public void onTimeout(TimeValue timeout) {
+                    future.onFailure(new TimeoutException());
+                }
+            }, statePredicate);
+            future.get(30L, TimeUnit.SECONDS);
+        }
+    }
 }

+ 17 - 97
x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java

@@ -13,9 +13,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotR
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
 import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.SnapshotsInProgress;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.common.Strings;
@@ -23,8 +21,8 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.plugins.Plugin;
-import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
 import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException;
 import org.elasticsearch.snapshots.SnapshotInfo;
 import org.elasticsearch.snapshots.SnapshotMissingException;
@@ -63,7 +61,7 @@ import static org.hamcrest.Matchers.greaterThan;
  * Tests for Snapshot Lifecycle Management that require a slow or blocked snapshot repo (using {@link MockRepository}
  */
 @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
-public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
+public class SLMSnapshotBlockingIntegTests extends AbstractSnapshotIntegTestCase {
     private static final String NEVER_EXECUTE_CRON_SCHEDULE = "* * * 31 FEB ? *";
 
     static final String REPO = "my-repo";
@@ -96,9 +94,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
         for (int i = 0; i < docCount; i++) {
             index(indexName, i + "", Collections.singletonMap("foo", "bar"));
         }
-
-        // Create a snapshot repo
-        initializeRepo(REPO);
+        createRepository(REPO, "mock");
 
         logger.info("--> creating policy {}", policyName);
         createSnapshotPolicy(policyName, "snap", NEVER_EXECUTE_CRON_SCHEDULE, REPO, indexName, true);
@@ -127,7 +123,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
         });
 
         logger.info("--> unblocking snapshots");
-        unblockRepo(REPO);
+        unblockNode(REPO, internalCluster().getMasterName());
 
         // Cancel/delete the snapshot
         try {
@@ -144,8 +140,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
         for (int i = 0; i < docCount; i++) {
             index(indexName, null, Collections.singletonMap("foo", "bar"));
         }
-
-        initializeRepo(REPO);
+        createRepository(REPO, "mock");
 
         logger.info("--> creating policy {}", policyId);
         createSnapshotPolicy(policyId, "snap", NEVER_EXECUTE_CRON_SCHEDULE, REPO, indexName, true,
@@ -167,12 +162,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
             }
         });
 
-        // Wait for all running snapshots to be cleared from cluster state
-        assertBusy(() -> {
-            logger.info("--> waiting for cluster state to be clear of snapshots");
-            ClusterState state = client().admin().cluster().prepareState().setCustoms(true).get().getState();
-            assertTrue("cluster state was not ready for deletion " + state, SnapshotRetentionTask.okayToDeleteSnapshots(state));
-        });
+        awaitNoMoreRunningOperations(randomFrom(dataNodeNames));
 
         logger.info("--> indexing more docs to force new segment files");
         for (int i = 0; i < docCount; i++) {
@@ -188,11 +178,10 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
             final String secondSnapName = executePolicy(policyId);
             logger.info("--> executed policy, got snapname [{}]", secondSnapName);
 
-
             // Check that the executed snapshot shows up in the SLM output as in_progress
+            logger.info("--> Waiting for at least one data node to hit the block");
+            waitForBlockOnAnyDataNode(REPO, TimeValue.timeValueSeconds(30L));
             assertBusy(() -> {
-                logger.info("--> Waiting for at least one data node to hit the block");
-                assertTrue(dataNodeNames.stream().anyMatch(node -> checkBlocked(node, REPO)));
                 logger.info("--> at least one data node has hit the block");
                 GetSnapshotLifecycleAction.Response getResp =
                     client().execute(GetSnapshotLifecycleAction.INSTANCE, new GetSnapshotLifecycleAction.Request(policyId)).get();
@@ -215,7 +204,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
                 new ExecuteSnapshotRetentionAction.Request()).get().isAcknowledged());
 
             logger.info("--> unblocking snapshots");
-            unblockRepo(REPO);
+            unblockNode(REPO, internalCluster().getMasterName());
             unblockAllDataNodes(REPO);
 
             // Check that the snapshot created by the policy has been removed by retention
@@ -252,7 +241,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
                 assertThat(resp.getHits().getTotalHits().value, equalTo(2L));
             });
         } finally {
-            unblockRepo(REPO);
+            unblockNode(REPO, internalCluster().getMasterName());
             unblockAllDataNodes(REPO);
         }
     }
@@ -271,9 +260,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
         final SnapshotState expectedUnsuccessfulState = partialSuccess ? SnapshotState.PARTIAL : SnapshotState.FAILED;
         // Setup
         createAndPopulateIndex(indexName);
-
-        // Create a snapshot repo
-        initializeRepo(REPO);
+        createRepository(REPO, "mock");
 
         createSnapshotPolicy(policyId, "snap", NEVER_EXECUTE_CRON_SCHEDULE, REPO, indexName, true,
             partialSuccess, new SnapshotRetentionConfiguration(null, 1, 2));
@@ -331,7 +318,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
             createAndPopulateIndex(indexName);
 
             logger.info("--> unblocking snapshots");
-            unblockRepo(REPO);
+            unblockNode(REPO, internalCluster().getMasterName());
             unblockAllDataNodes(REPO);
 
             logger.info("--> taking new snapshot");
@@ -381,18 +368,14 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
                 assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
             });
         }
+        awaitNoMoreRunningOperations(internalCluster().getMasterName());
     }
 
     public void testSLMRetentionAfterRestore() throws Exception {
         final String indexName = "test";
         final String policyName = "test-policy";
-        int docCount = 20;
-        for (int i = 0; i < docCount; i++) {
-            index(indexName, i + "", Collections.singletonMap("foo", "bar"));
-        }
-
-        // Create a snapshot repo
-        initializeRepo(REPO);
+        indexRandomDocs(indexName, 20);
+        createRepository(REPO, "mock");
 
         logger.info("--> creating policy {}", policyName);
         createSnapshotPolicy(policyName, "snap", NEVER_EXECUTE_CRON_SCHEDULE, REPO, indexName, true, false,
@@ -451,27 +434,9 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
 
     private void createAndPopulateIndex(String indexName) throws InterruptedException {
         logger.info("--> creating and populating index [{}]", indexName);
-        assertAcked(prepareCreate(indexName, 0, Settings.builder()
-            .put("number_of_shards", 6).put("number_of_replicas", 0)));
+        assertAcked(prepareCreate(indexName, 0, indexSettingsNoReplicas(6)));
         ensureGreen();
-
-        final int numdocs = randomIntBetween(50, 100);
-        IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs];
-        for (int i = 0; i < builders.length; i++) {
-            builders[i] = client().prepareIndex(indexName).setId(Integer.toString(i)).setSource("field1", "bar " + i);
-        }
-        indexRandom(true, builders);
-        flushAndRefresh();
-    }
-
-    private void initializeRepo(String repoName) {
-        client().admin().cluster().preparePutRepository(repoName)
-            .setType("mock")
-            .setSettings(Settings.builder()
-                .put("compress", randomBoolean())
-                .put("location", randomAlphaOfLength(6))
-                .build())
-            .get();
+        indexRandomDocs(indexName, randomIntBetween(50, 100));
     }
 
     private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String repoId,
@@ -522,49 +487,4 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
             return "bad";
         }
     }
-
-    public static String blockMasterFromFinalizingSnapshotOnIndexFile(final String repositoryName) {
-        final String masterName = internalCluster().getMasterName();
-        ((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName)
-            .repository(repositoryName)).setBlockOnWriteIndexFile(true);
-        return masterName;
-    }
-
-    public static String unblockRepo(final String repositoryName) {
-        final String masterName = internalCluster().getMasterName();
-        ((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName)
-            .repository(repositoryName)).unblock();
-        return masterName;
-    }
-
-    public static void blockAllDataNodes(String repository) {
-        for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
-            ((MockRepository)repositoriesService.repository(repository)).blockOnDataFiles(true);
-        }
-    }
-
-    public static void unblockAllDataNodes(String repository) {
-        for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
-            ((MockRepository)repositoriesService.repository(repository)).unblock();
-        }
-    }
-
-    public void waitForBlock(String node, String repository, TimeValue timeout) throws InterruptedException {
-        long start = System.currentTimeMillis();
-        RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, node);
-        MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository);
-        while (System.currentTimeMillis() - start < timeout.millis()) {
-            if (mockRepository.blocked()) {
-                return;
-            }
-            Thread.sleep(100);
-        }
-        fail("Timeout waiting for node [" + node + "] to be blocked");
-    }
-
-    public boolean checkBlocked(String node, String repository) {
-        RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, node);
-        MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository);
-        return mockRepository.blocked();
-    }
 }