Browse Source

Introduce `SubscribableListener#andThenCompleteWith` (#103990)

Sometimes we want a step in a `SubscribableListener` chain which is
fully synchronous. It's a little awkward to write this using `andThen`,
so this commit introduces a more natural utility for this situation.
David Turner 1 year ago
parent
commit
cdb3439c33

+ 2 - 2
modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

@@ -628,7 +628,7 @@ class S3BlobContainer extends AbstractBlobContainer {
 
                 // Step 5: Perform the compare-and-swap by completing our upload iff the witnessed value matches the expected value.
 
-                .<OptionalBytesReference>andThen((l, currentValue) -> ActionListener.completeWith(l, () -> {
+                .andThenApply(currentValue -> {
                     if (currentValue.isPresent() && currentValue.bytesReference().equals(expected)) {
                         logger.trace("[{}] completing upload [{}]", blobKey, uploadId);
                         completeMultipartUpload(uploadId, partETag);
@@ -638,7 +638,7 @@ class S3BlobContainer extends AbstractBlobContainer {
                         safeAbortMultipartUpload(uploadId);
                     }
                     return currentValue;
-                }))
+                })
 
                 // Step 6: Complete the listener.
 

+ 36 - 0
server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java

@@ -19,6 +19,7 @@ import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
 import org.elasticsearch.core.CheckedConsumer;
+import org.elasticsearch.core.CheckedFunction;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -379,6 +380,41 @@ public class SubscribableListener<T> implements ActionListener<T> {
         return newForked(l -> addListener(l.delegateFailureAndWrap(nextStep), executor, threadContext));
     }
 
+    /**
+     * Creates and returns a new {@link SubscribableListener} {@code L} such that if this listener is completed successfully with result
+     * {@code R} then {@code fn} is invoked with argument {@code R}, and {@code L} is completed with the result of that invocation. If this
+     * listener is completed exceptionally, or {@code fn} throws an exception, then {@code L} is completed with that exception.
+     * <p>
+     * This is essentially a shorthand for a call to {@link #andThen} with a {@code nextStep} argument that is fully synchronous.
+     * <p>
+     * The threading of the {@code fn} invocation is the same as for listeners added with {@link #addListener}: if this listener is
+     * already complete then {@code fn} is invoked on the thread calling {@link #andThenApply} and in its thread context, but if this
+     * listener is incomplete then {@code fn} is invoked on the thread, and in the thread context, on which this listener is completed.
+     */
+    public <U> SubscribableListener<U> andThenApply(CheckedFunction<T, U, Exception> fn) {
+        return newForked(l -> addListener(l.map(fn)));
+    }
+
+    /**
+     * Creates and returns a new {@link SubscribableListener} {@code L} such that if this listener is completed successfully with result
+     * {@code R} then {@code consumer} is applied to argument {@code R}, and {@code L} is completed with {@code null} when {@code
+     * consumer} returns. If this listener is completed exceptionally, or {@code consumer} throws an exception, then {@code L} is
+     * completed with that exception.
+     * <p>
+     * This is essentially a shorthand for a call to {@link #andThen} with a {@code nextStep} argument that is fully synchronous.
+     * <p>
+     * The threading of the {@code consumer} invocation is the same as for listeners added with {@link #addListener}: if this listener is
+     * already complete then {@code consumer} is invoked on the thread calling {@link #andThenAccept} and in its thread context, but if
+     * this listener is incomplete then {@code consumer} is invoked on the thread, and in the thread context, on which this listener is
+     * completed.
+     */
+    public SubscribableListener<Void> andThenAccept(CheckedConsumer<T, Exception> consumer) {
+        return newForked(l -> addListener(l.map(r -> {
+            consumer.accept(r);
+            return null;
+        })));
+    }
+
     /**
      * Adds a timeout to this listener, such that if the timeout elapses before the listener is completed then it will be completed with an
      * {@link ElasticsearchTimeoutException}.

+ 18 - 22
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -565,19 +565,17 @@ public class RecoverySourceHandler {
                     // but we must still create a retention lease
                     .<RetentionLease>newForked(leaseListener -> createRetentionLease(startingSeqNo, leaseListener))
                     // and then compute the result of sending no files
-                    .<SendFileResult>andThen((l, ignored) -> {
+                    .andThenApply(ignored -> {
                         final TimeValue took = stopWatch.totalTime();
                         logger.trace("recovery [phase1]: took [{}]", took);
-                        l.onResponse(
-                            new SendFileResult(
-                                Collections.emptyList(),
-                                Collections.emptyList(),
-                                0L,
-                                Collections.emptyList(),
-                                Collections.emptyList(),
-                                0L,
-                                took
-                            )
+                        return new SendFileResult(
+                            Collections.emptyList(),
+                            Collections.emptyList(),
+                            0L,
+                            Collections.emptyList(),
+                            Collections.emptyList(),
+                            0L,
+                            took
                         );
                     })
                     // and finally respond
@@ -751,19 +749,17 @@ public class RecoverySourceHandler {
                     cleanFiles(store, recoverySourceMetadata, () -> translogOps, lastKnownGlobalCheckpoint, finalRecoveryPlanListener);
                 })
                 // compute the result
-                .<SendFileResult>andThen((resultListener, ignored) -> {
+                .andThenApply(ignored -> {
                     final TimeValue took = stopWatch.totalTime();
                     logger.trace("recovery [phase1]: took [{}]", took);
-                    resultListener.onResponse(
-                        new SendFileResult(
-                            shardRecoveryPlan.getFilesToRecoverNames(),
-                            shardRecoveryPlan.getFilesToRecoverSizes(),
-                            shardRecoveryPlan.getTotalSize(),
-                            shardRecoveryPlan.getFilesPresentInTargetNames(),
-                            shardRecoveryPlan.getFilesPresentInTargetSizes(),
-                            shardRecoveryPlan.getExistingSize(),
-                            took
-                        )
+                    return new SendFileResult(
+                        shardRecoveryPlan.getFilesToRecoverNames(),
+                        shardRecoveryPlan.getFilesToRecoverSizes(),
+                        shardRecoveryPlan.getTotalSize(),
+                        shardRecoveryPlan.getFilesPresentInTargetNames(),
+                        shardRecoveryPlan.getFilesPresentInTargetSizes(),
+                        shardRecoveryPlan.getExistingSize(),
+                        took
                     );
                 })
                 // and finally respond

+ 105 - 1
server/src/test/java/org/elasticsearch/action/support/SubscribableListenerTests.java

@@ -409,6 +409,26 @@ public class SubscribableListenerTests extends ESTestCase {
         assertFalse(chainedListener.isDone());
     }
 
+    public void testAndThenThrowException() {
+        final var initialListener = new SubscribableListener<>();
+        final var forked = new AtomicReference<ActionListener<Object>>();
+        final var result = new AtomicReference<>();
+
+        final var chainedListener = initialListener.andThen((l, o) -> {
+            forked.set(l);
+            result.set(o);
+            throw new ElasticsearchException("simulated");
+        });
+        assertNull(forked.get());
+        assertNull(result.get());
+
+        final var o1 = new Object();
+        initialListener.onResponse(o1);
+        assertSame(o1, result.get());
+        assertSame(chainedListener, forked.get());
+        assertComplete(chainedListener, "simulated");
+    }
+
     public void testAndThenFailure() {
         final var initialListener = new SubscribableListener<>();
 
@@ -488,7 +508,7 @@ public class SubscribableListenerTests extends ESTestCase {
         assertTrue(isComplete.get());
     }
 
-    private static void assertComplete(SubscribableListener<Object> listener, @Nullable String expectedFailureMessage) {
+    private static <T> void assertComplete(SubscribableListener<T> listener, @Nullable String expectedFailureMessage) {
         assertTrue(listener.isDone());
         if (expectedFailureMessage == null) {
             try {
@@ -500,4 +520,88 @@ public class SubscribableListenerTests extends ESTestCase {
             assertEquals(expectedFailureMessage, expectThrows(ElasticsearchException.class, listener::rawResult).getMessage());
         }
     }
+
+    public void testAndThenApplySuccess() throws Exception {
+        final var initialListener = new SubscribableListener<>();
+        final var result = new AtomicReference<>();
+
+        final var oResult = new Object();
+        final var chainedListener = initialListener.andThenApply(o -> {
+            result.set(o);
+            return oResult;
+        });
+        assertNull(result.get());
+
+        final var o1 = new Object();
+        initialListener.onResponse(o1);
+        assertSame(o1, result.get());
+        assertTrue(chainedListener.isDone());
+        assertSame(oResult, chainedListener.rawResult());
+    }
+
+    public void testAndThenApplyThrowException() {
+        final var initialListener = new SubscribableListener<>();
+        final var result = new AtomicReference<>();
+
+        final var chainedListener = initialListener.andThenApply(o -> {
+            result.set(o);
+            throw new ElasticsearchException("simulated exception in fn");
+        });
+        assertNull(result.get());
+
+        final var o1 = new Object();
+        initialListener.onResponse(o1);
+        assertSame(o1, result.get());
+        assertComplete(chainedListener, "simulated exception in fn");
+    }
+
+    public void testAndThenApplyFailure() {
+        final var initialListener = new SubscribableListener<>();
+
+        final var chainedListener = initialListener.andThenApply(o -> fail(null, "should not be called"));
+        assertFalse(chainedListener.isDone());
+
+        initialListener.onFailure(new ElasticsearchException("simulated"));
+        assertComplete(chainedListener, "simulated");
+    }
+
+    public void testAndThenAcceptSuccess() throws Exception {
+        final var initialListener = new SubscribableListener<>();
+        final var result = new AtomicReference<>();
+
+        final var chainedListener = initialListener.andThenAccept(result::set);
+        assertNull(result.get());
+
+        final var o1 = new Object();
+        initialListener.onResponse(o1);
+        assertSame(o1, result.get());
+        assertTrue(chainedListener.isDone());
+        assertNull(chainedListener.rawResult());
+    }
+
+    public void testAndThenAcceptThrowException() {
+        final var initialListener = new SubscribableListener<>();
+        final var result = new AtomicReference<>();
+
+        final var chainedListener = initialListener.andThenAccept(o -> {
+            result.set(o);
+            throw new ElasticsearchException("simulated exception in fn");
+        });
+        assertNull(result.get());
+
+        final var o1 = new Object();
+        initialListener.onResponse(o1);
+        assertSame(o1, result.get());
+        assertComplete(chainedListener, "simulated exception in fn");
+    }
+
+    public void testAndThenAcceptFailure() {
+        final var initialListener = new SubscribableListener<>();
+
+        final var chainedListener = initialListener.andThenAccept(o -> fail(null, "should not be called"));
+        assertFalse(chainedListener.isDone());
+
+        initialListener.onFailure(new ElasticsearchException("simulated"));
+        assertComplete(chainedListener, "simulated");
+    }
 }

+ 2 - 3
x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SLMGetExpiredSnapshotsAction.java

@@ -134,13 +134,12 @@ public class SLMGetExpiredSnapshotsAction extends ActionType<SLMGetExpiredSnapsh
                                 )
 
                                 // Compute snapshots to delete for each (relevant) policy
-                                .<Void>andThen((l, snapshotDetailsByPolicy) -> ActionListener.completeWith(l, () -> {
+                                .andThenAccept(snapshotDetailsByPolicy -> {
                                     resultsBuilder.addResult(
                                         repositoryName,
                                         getSnapshotsToDelete(repositoryName, request.policies(), snapshotDetailsByPolicy)
                                     );
-                                    return null;
-                                }))
+                                })
 
                                 // And notify this repository's listener on completion
                                 .addListener(perRepositoryListener.delegateResponse((l, e) -> {

+ 3 - 2
x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SLMGetExpiredSnapshotsActionTests.java

@@ -175,7 +175,7 @@ public class SLMGetExpiredSnapshotsActionTests extends ESTestCase {
             .map(si -> new SeenSnapshotInfo(si.snapshotId(), RepositoryData.SnapshotDetails.fromSnapshotInfo(si).getSlmPolicy()))
             .collect(Collectors.toSet());
 
-        SubscribableListener
+        final var testListener = SubscribableListener
 
             .<RepositoryData>newForked(l -> repository.getRepositoryData(EsExecutors.DIRECT_EXECUTOR_SERVICE, l))
 
@@ -183,7 +183,7 @@ public class SLMGetExpiredSnapshotsActionTests extends ESTestCase {
                 (l, rd) -> SLMGetExpiredSnapshotsAction.getSnapshotDetailsByPolicy(EsExecutors.DIRECT_EXECUTOR_SERVICE, repository, rd, l)
             )
 
-            .andThen((l, snapshotDetailsByPolicy) -> {
+            .andThenAccept(snapshotDetailsByPolicy -> {
                 snapshotDetailsByPolicy.flatMap((policyId, snapshotsMap) -> snapshotsMap.entrySet().stream().map(entry -> {
                     assertThat(policyId, oneOf(policyNames));
                     assertEquals(policyId, entry.getValue().getSlmPolicy());
@@ -192,6 +192,7 @@ public class SLMGetExpiredSnapshotsActionTests extends ESTestCase {
             });
 
         deterministicTaskQueue.runAllTasks();
+        assertTrue(testListener.isDone());
         assertThat(seenSnapshotInfos, empty());
     }