Sfoglia il codice sorgente

Reindex: wait for cleanup before responding (#23677)

Changes reindex and friends to wait until the entire request has
been "cleaned up" before responding. "Clean up" in this context
is clearing the scroll and (for reindex-from-remote) shutting
down the client. Failures to clean up are still only logged, not
returned to the user.

Closes #23653
Nik Everett 8 anni fa
parent
commit
bc65be2a65

+ 12 - 8
core/src/main/java/org/elasticsearch/action/bulk/byscroll/AbstractAsyncBulkByScrollAction.java

@@ -465,14 +465,18 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
      * @param searchFailures any search failures accumulated during the request
      * @param timedOut have any of the sub-requests timed out?
      */
-    protected void finishHim(Exception failure, List<Failure> indexingFailures, List<SearchFailure> searchFailures, boolean timedOut) {
-        scrollSource.close();
-        if (failure == null) {
-            listener.onResponse(
-                    buildResponse(timeValueNanos(System.nanoTime() - startTime.get()), indexingFailures, searchFailures, timedOut));
-        } else {
-            listener.onFailure(failure);
-        }
+    protected void finishHim(Exception failure, List<Failure> indexingFailures,
+            List<SearchFailure> searchFailures, boolean timedOut) {
+        scrollSource.close(() -> {
+            if (failure == null) {
+                BulkByScrollResponse response = buildResponse(
+                        timeValueNanos(System.nanoTime() - startTime.get()),
+                        indexingFailures, searchFailures, timedOut);
+                listener.onResponse(response);
+            } else {
+                listener.onFailure(failure);
+            }
+        });
     }
 
     /**

+ 2 - 2
core/src/main/java/org/elasticsearch/action/bulk/byscroll/ClientScrollableHitSource.java

@@ -113,8 +113,8 @@ public class ClientScrollableHitSource extends ScrollableHitSource {
     }
 
     @Override
-    protected void cleanup() {
-        // Nothing to do
+    protected void cleanup(Runnable onCompletion) {
+        onCompletion.run();
     }
 
     /**

+ 14 - 8
core/src/main/java/org/elasticsearch/action/bulk/byscroll/ScrollableHitSource.java

@@ -47,7 +47,7 @@ import static java.util.Objects.requireNonNull;
 /**
  * A scrollable source of results.
  */
-public abstract class ScrollableHitSource implements Closeable {
+public abstract class ScrollableHitSource {
     private final AtomicReference<String> scrollId = new AtomicReference<>();
 
     protected final Logger logger;
@@ -82,25 +82,31 @@ public abstract class ScrollableHitSource implements Closeable {
     }
     protected abstract void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, Consumer<? super Response> onResponse);
 
-    @Override
-    public final void close() {
+    public final void close(Runnable onCompletion) {
         String scrollId = this.scrollId.get();
         if (Strings.hasLength(scrollId)) {
-            clearScroll(scrollId, this::cleanup);
+            clearScroll(scrollId, () -> cleanup(onCompletion));
         } else {
-            cleanup();
+            cleanup(onCompletion);
         }
     }
+
     /**
      * Called to clear a scroll id.
+     *
      * @param scrollId the id to clear
-     * @param onCompletion implementers must call this after completing the clear whether they are successful or not
+     * @param onCompletion implementers must call this after completing the clear whether they are
+     *        successful or not
      */
     protected abstract void clearScroll(String scrollId, Runnable onCompletion);
     /**
-     * Called after the process has been totally finished to clean up any resources the process needed like remote connections.
+     * Called after the process has been totally finished to clean up any resources the process
+     * needed like remote connections.
+     *
+     * @param onCompletion implementers must call this after completing the cleanup whether they are
+     *        successful or not
      */
-    protected abstract void cleanup();
+    protected abstract void cleanup(Runnable onCompletion);
 
     /**
      * Set the id of the last scroll. Used for debugging.

+ 6 - 3
modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java

@@ -141,15 +141,18 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
     }
 
     @Override
-    protected void cleanup() {
-        /* This is called on the RestClient's thread pool and attempting to close the client on its own threadpool causes it to fail to
-         * close. So we always shutdown the RestClient asynchronously on a thread in Elasticsearch's generic thread pool. */
+    protected void cleanup(Runnable onCompletion) {
+        /* This is called on the RestClient's thread pool and attempting to close the client on its
+         * own threadpool causes it to fail to close. So we always shutdown the RestClient
+         * asynchronously on a thread in Elasticsearch's generic thread pool. */
         threadPool.generic().submit(() -> {
             try {
                 client.close();
                 logger.debug("Shut down remote connection");
             } catch (IOException e) {
                 logger.error("Failed to shutdown the remote connection", e);
+            } finally {
+                onCompletion.run();
             }
         });
     }

+ 21 - 0
modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java

@@ -80,7 +80,9 @@ import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class RemoteScrollableHitSourceTests extends ESTestCase {
@@ -478,6 +480,25 @@ public class RemoteScrollableHitSourceTests extends ESTestCase {
                 e.getCause().getCause().getCause().getMessage());
     }
 
+    public void testCleanupSuccessful() throws Exception {
+        AtomicBoolean cleanupCallbackCalled = new AtomicBoolean();
+        RestClient client = mock(RestClient.class);
+        TestRemoteScrollableHitSource hitSource = new TestRemoteScrollableHitSource(client);
+        hitSource.cleanup(() -> cleanupCallbackCalled.set(true));
+        verify(client).close();
+        assertTrue(cleanupCallbackCalled.get());
+    }
+
+    public void testCleanupFailure() throws Exception {
+        AtomicBoolean cleanupCallbackCalled = new AtomicBoolean();
+        RestClient client = mock(RestClient.class);
+        doThrow(new RuntimeException("test")).when(client).close();
+        TestRemoteScrollableHitSource hitSource = new TestRemoteScrollableHitSource(client);
+        hitSource.cleanup(() -> cleanupCallbackCalled.set(true));
+        verify(client).close();
+        assertTrue(cleanupCallbackCalled.get());
+    }
+
     private RemoteScrollableHitSource sourceWithMockedRemoteCall(String... paths) throws Exception {
         return sourceWithMockedRemoteCall(true, ContentType.APPLICATION_JSON, paths);
     }