|
|
@@ -112,27 +112,35 @@ public final class BlobStoreTestUtil {
|
|
|
* of this assertion must pass an executor on those when using such an implementation.
|
|
|
*/
|
|
|
public static void assertConsistency(BlobStoreRepository repository, Executor executor) {
|
|
|
- final PlainActionFuture<Void> listener = PlainActionFuture.newFuture();
|
|
|
- executor.execute(ActionRunnable.run(listener, () -> {
|
|
|
- final BlobContainer blobContainer = repository.blobContainer();
|
|
|
- final long latestGen;
|
|
|
- try (DataInputStream inputStream = new DataInputStream(blobContainer.readBlob("index.latest"))) {
|
|
|
- latestGen = inputStream.readLong();
|
|
|
- } catch (NoSuchFileException e) {
|
|
|
- throw new AssertionError("Could not find index.latest blob for repo [" + repository + "]");
|
|
|
- }
|
|
|
- assertIndexGenerations(blobContainer, latestGen);
|
|
|
- final RepositoryData repositoryData;
|
|
|
- try (InputStream blob = blobContainer.readBlob("index-" + latestGen);
|
|
|
- XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
|
|
|
- LoggingDeprecationHandler.INSTANCE, blob)) {
|
|
|
- repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen);
|
|
|
+ final PlainActionFuture<AssertionError> listener = PlainActionFuture.newFuture();
|
|
|
+ executor.execute(ActionRunnable.supply(listener, () -> {
|
|
|
+ try {
|
|
|
+ final BlobContainer blobContainer = repository.blobContainer();
|
|
|
+ final long latestGen;
|
|
|
+ try (DataInputStream inputStream = new DataInputStream(blobContainer.readBlob("index.latest"))) {
|
|
|
+ latestGen = inputStream.readLong();
|
|
|
+ } catch (NoSuchFileException e) {
|
|
|
+ throw new AssertionError("Could not find index.latest blob for repo [" + repository + "]");
|
|
|
+ }
|
|
|
+ assertIndexGenerations(blobContainer, latestGen);
|
|
|
+ final RepositoryData repositoryData;
|
|
|
+ try (InputStream blob = blobContainer.readBlob("index-" + latestGen);
|
|
|
+ XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
|
|
|
+ LoggingDeprecationHandler.INSTANCE, blob)) {
|
|
|
+ repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen);
|
|
|
+ }
|
|
|
+ assertIndexUUIDs(blobContainer, repositoryData);
|
|
|
+ assertSnapshotUUIDs(repository, repositoryData);
|
|
|
+ assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations());
|
|
|
+ return null;
|
|
|
+ } catch (AssertionError e) {
|
|
|
+ return e;
|
|
|
}
|
|
|
- assertIndexUUIDs(blobContainer, repositoryData);
|
|
|
- assertSnapshotUUIDs(repository, repositoryData);
|
|
|
- assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations());
|
|
|
}));
|
|
|
- listener.actionGet(TimeValue.timeValueMinutes(1L));
|
|
|
+ final AssertionError err = listener.actionGet(TimeValue.timeValueMinutes(1L));
|
|
|
+ if (err != null) {
|
|
|
+ throw new AssertionError(err);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private static void assertIndexGenerations(BlobContainer repoRoot, long latestGen) throws IOException {
|