Explorar el Código

Cleaner Handling of Store Refcount in BlobStoreRepository (#47560)

If a shard gets closed we properly abort its snapshot
before closing it. We should in thise case make sure to
not throw a confusing exception about trying to increment
the reference on an already closed shard in the async tasks
if the snapshot is already aborted.
Also, added an assertion to make sure that aborts are in
fact the only situation in which we run into a concurrently
closed store.
Armin Braun hace 6 años
padre
commit
9141e05466

+ 12 - 4
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -1115,7 +1115,18 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                     protected void doRun() {
                         try {
                             if (alreadyFailed.get() == false) {
-                                snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store);
+                                if (store.tryIncRef()) {
+                                    try {
+                                        snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store);
+                                    } finally {
+                                        store.decRef();
+                                    }
+                                } else if (snapshotStatus.isAborted()) {
+                                    throw new IndexShardSnapshotFailedException(shardId, "Aborted");
+                                } else {
+                                    assert false : "Store was closed before aborting the snapshot";
+                                    throw new IllegalStateException("Store is closed already");
+                                }
                             }
                             filesListener.onResponse(null);
                         } catch (IOException e) {
@@ -1316,7 +1327,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                               IndexShardSnapshotStatus snapshotStatus, Store store) throws IOException {
         final BlobContainer shardContainer = shardContainer(indexId, shardId);
         final String file = fileInfo.physicalName();
-        store.incRef();
         try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) {
             for (int i = 0; i < fileInfo.numberOfParts(); i++) {
                 final long partBytes = fileInfo.partBytes(i);
@@ -1356,8 +1366,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
             failStoreIfCorrupted(store, t);
             snapshotStatus.addProcessedFile(0);
             throw t;
-        } finally {
-            store.decRef();
         }
     }