瀏覽代碼

Simplify Blobstore Consistency Check in Tests (#73992)

With work to make repo APIs more async incoming in #73570
we need a non-blocking way to run this check. This adds that async
check and removes the need to manually pass executors around as well.
Armin Braun 4 年之前
父節點
當前提交
5249540a5c

+ 1 - 2
plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsTests.java

@@ -22,7 +22,6 @@ import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
 import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
 import org.elasticsearch.snapshots.SnapshotState;
 import org.elasticsearch.test.ESSingleNodeTestCase;
-import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.Collection;
 
@@ -140,7 +139,7 @@ public class HdfsTests extends ESSingleNodeTestCase {
         assertThat(clusterState.getMetadata().hasIndex("test-idx-2"), equalTo(false));
         final BlobStoreRepository repo =
             (BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo");
-        BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC));
+        BlobStoreTestUtil.assertConsistency(repo);
     }
 
     public void testMissingUri() {

+ 2 - 2
plugins/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java

@@ -77,10 +77,10 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTes
     }
 
     @Override
-    protected void assertConsistentRepository(BlobStoreRepository repo, Executor executor) throws Exception {
+    protected void assertConsistentRepository(BlobStoreRepository repo) throws Exception {
         // S3 is only eventually consistent for the list operations used by this assertions so we retry for 10 minutes assuming that
         // listing operations will become consistent within these 10 minutes.
-        assertBusy(() -> super.assertConsistentRepository(repo, executor), 10L, TimeUnit.MINUTES);
+        assertBusy(() -> super.assertConsistentRepository(repo), 10L, TimeUnit.MINUTES);
     }
 
     protected void assertBlobsByPrefix(BlobPath path, String prefix, Map<String, BlobMetadata> blobs) throws Exception {

+ 1 - 1
server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java

@@ -167,6 +167,6 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase
         logger.info("--> cleanup repository");
         client().admin().cluster().prepareCleanupRepository(repoName).get();
 
-        BlobStoreTestUtil.assertConsistency(repository, repository.threadPool().generic());
+        BlobStoreTestUtil.assertConsistency(repository);
     }
 }

+ 4 - 3
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -272,10 +272,11 @@ public class SnapshotResiliencyTests extends ESTestCase {
             if (blobStoreContext != null) {
                 blobStoreContext.forceConsistent();
             }
-            BlobStoreTestUtil.assertConsistency(
-                (BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo"),
-                Runnable::run
+            final PlainActionFuture<AssertionError> future = BlobStoreTestUtil.assertConsistencyAsync(
+                (BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo")
             );
+            deterministicTaskQueue.runAllRunnableTasks();
+            assertNull(future.actionGet(0));
         } finally {
             testClusterNodes.nodes.values().forEach(TestClusterNodes.TestClusterNode::stop);
         }

+ 3 - 3
test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java

@@ -206,7 +206,7 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
         logger.info("--> deleting a snapshot to trigger repository cleanup");
         client().admin().cluster().deleteSnapshot(new DeleteSnapshotRequest("test-repo", snapshotName)).actionGet();
 
-        assertConsistentRepository(repo, genericExec);
+        assertConsistentRepository(repo);
 
         logger.info("--> Create dangling index");
         createDanglingIndex(repo, genericExec);
@@ -247,8 +247,8 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
         return future.actionGet();
     }
 
-    protected void assertConsistentRepository(BlobStoreRepository repo, Executor executor) throws Exception {
-        BlobStoreTestUtil.assertConsistency(repo, executor);
+    protected void assertConsistentRepository(BlobStoreRepository repo) throws Exception {
+        BlobStoreTestUtil.assertConsistency(repo);
     }
 
     protected void assertDeleted(BlobPath path, String name) throws Exception {

+ 17 - 21
test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java

@@ -34,12 +34,10 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.repositories.IndexId;
-import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.RepositoryData;
 import org.elasticsearch.repositories.ShardGenerations;
 import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.snapshots.SnapshotInfo;
-import org.elasticsearch.test.InternalTestCluster;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.DataInputStream;
@@ -55,7 +53,6 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -78,23 +75,25 @@ import static org.mockito.Mockito.when;
 
 public final class BlobStoreTestUtil {
 
-    public static void assertRepoConsistency(InternalTestCluster testCluster, String repoName) {
-        final BlobStoreRepository repo =
-            (BlobStoreRepository) testCluster.getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
-        BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC));
-    }
-
     /**
      * Assert that there are no unreferenced indices or unreferenced root-level metadata blobs in any repository.
      * TODO: Expand the logic here to also check for unreferenced segment blobs and shard level metadata
      * @param repository BlobStoreRepository to check
-     * @param executor Executor to run all repository calls on. This is needed since the production {@link BlobStoreRepository}
-     *                 implementations assert that all IO inducing calls happen on the generic or snapshot thread-pools and hence callers
-     *                 of this assertion must pass an executor on those when using such an implementation.
      */
-    public static void assertConsistency(BlobStoreRepository repository, Executor executor) {
-        final PlainActionFuture<AssertionError> listener = PlainActionFuture.newFuture();
-        executor.execute(ActionRunnable.supply(listener, () -> {
+    public static void assertConsistency(BlobStoreRepository repository) {
+        final PlainActionFuture<AssertionError> listener = assertConsistencyAsync(repository);
+        final AssertionError err = listener.actionGet(TimeValue.timeValueMinutes(1L));
+        if (err != null) {
+            throw new AssertionError(err);
+        }
+    }
+
+    /**
+     * Same as {@link #assertConsistency(BlobStoreRepository)} but async so it can be used in tests that don't allow blocking.
+     */
+    public static PlainActionFuture<AssertionError> assertConsistencyAsync(BlobStoreRepository repository) {
+        final PlainActionFuture<AssertionError> future = PlainActionFuture.newFuture();
+        repository.threadPool().generic().execute(ActionRunnable.wrap(future, listener -> {
             try {
                 final BlobContainer blobContainer = repository.blobContainer();
                 final long latestGen;
@@ -113,15 +112,12 @@ public final class BlobStoreTestUtil {
                 assertIndexUUIDs(repository, repositoryData);
                 assertSnapshotUUIDs(repository, repositoryData);
                 assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations());
-                return null;
+                listener.onResponse(null);
             } catch (AssertionError e) {
-                return e;
+                listener.onResponse(e);
             }
         }));
-        final AssertionError err = listener.actionGet(TimeValue.timeValueMinutes(1L));
-        if (err != null) {
-            throw new AssertionError(err);
-        }
+        return future;
     }
 
     private static void assertIndexGenerations(BlobContainer repoRoot, long latestGen) throws IOException {

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

@@ -135,7 +135,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
                     clusterAdmin().prepareDeleteSnapshot(name, OLD_VERSION_SNAPSHOT_PREFIX + "*").get();
                     clusterAdmin().prepareCleanupRepository(name).get();
                 }
-                BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name);
+                BlobStoreTestUtil.assertConsistency(getRepositoryOnMaster(name));
             });
         } else {
             logger.info("--> skipped repo consistency checks because [{}]", skipRepoConsistencyCheckReason);