Просмотр исходного кода

Recycle buffers used for file-based recovery (#74117)

Adds a missing `decRef()` that prevents buffer recycling during
recovery. Relates #65921 which introduced the extra retention.
David Turner 4 лет назад
Родитель
Сommit
e4e4393f70

+ 5 - 0
server/src/main/java/org/elasticsearch/indices/recovery/MultiChunkTransfer.java

@@ -123,11 +123,16 @@ public abstract class MultiChunkTransfer<Source, Request extends MultiChunkTrans
         }
     }
 
+    protected boolean assertOnSuccess() {
+        return true;
+    }
+
     private void onCompleted(Exception failure) {
         if (Assertions.ENABLED && status != Status.PROCESSING) {
             throw new AssertionError("invalid status: expected [" + Status.PROCESSING + "] actual [" + status + "]", failure);
         }
         status = failure == null ? Status.SUCCESS : Status.FAILED;
+        assert status != Status.SUCCESS || assertOnSuccess();
         try {
             IOUtils.close(failure, this);
         } catch (Exception e) {

+ 12 - 1
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -890,6 +890,7 @@ public class RecoverySourceHandler {
                 new MultiChunkTransfer<>(logger, threadPool.getThreadContext(), listener, maxConcurrentFileChunks, Arrays.asList(files)) {
 
                     final Deque<byte[]> buffers = new ConcurrentLinkedDeque<>();
+                    final AtomicInteger liveBufferCount = new AtomicInteger(); // only used in assertions to verify proper recycling
                     IndexInput currentInput = null;
                     long offset = 0;
 
@@ -905,11 +906,15 @@ public class RecoverySourceHandler {
                         assert Transports.assertNotTransportThread("read file chunk");
                         cancellableThreads.checkForCancel();
                         final byte[] buffer = Objects.requireNonNullElseGet(buffers.pollFirst(), () -> new byte[chunkSizeInBytes]);
+                        assert liveBufferCount.incrementAndGet() > 0;
                         final int toRead = Math.toIntExact(Math.min(md.length() - offset, buffer.length));
                         currentInput.readBytes(buffer, 0, toRead, false);
                         final boolean lastChunk = offset + toRead == md.length();
                         final FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, toRead), offset, lastChunk,
-                            () -> buffers.addFirst(buffer));
+                            () -> {
+                                assert liveBufferCount.decrementAndGet() >= 0;
+                                buffers.addFirst(buffer);
+                            });
                         offset += toRead;
                         return chunk;
                     }
@@ -932,6 +937,12 @@ public class RecoverySourceHandler {
                     public void close() throws IOException {
                         IOUtils.close(currentInput, storeRef);
                     }
+
+                    @Override
+                    protected boolean assertOnSuccess() {
+                        assert liveBufferCount.get() == 0 : "leaked [" + liveBufferCount + "] buffers";
+                        return true;
+                    }
                 };
             resources.add(multiFileSender);
             temporaryStoreRef = null; // now owned by multiFileSender, tracked in resources, so won't be leaked

+ 6 - 1
server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

@@ -202,7 +202,12 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
         final RecoveryFileChunkRequest request = new RecoveryFileChunkRequest(
             recoveryId, requestSeqNo, shardId, fileMetadata, position, content, lastChunk, totalTranslogOps, throttleTimeInNanos);
         final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
-        executeRetryableAction(action, request, fileChunkRequestOptions, listener.map(r -> null), reader);
+        executeRetryableAction(
+            action,
+            request,
+            fileChunkRequestOptions,
+            ActionListener.runBefore(listener.map(r -> null), request::decRef),
+            reader);
     }
 
     @Override