Browse Source

Tweak reindex cancel logic and add many debug logs (#25256)

I'm still trying to hunt down rare failures in the cancelation tests
for reindex and friends. Here is the latest:
https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+5.x+multijob-unix-compatibility/os=ubuntu/876/console

It doesn't show much, other than that one of the tasks didn't kill
itself when asked to cancel.

So I'm going a bit crazy with debug logging so that the next time this
comes up I can trace exactly what happened.

Additionally, this tweaks the logic around how rethrottles were
performed around cancel. Previously we set the `requestsPerSecond`
to `0` when we cancelled the task. That was the "old way" to set them
to inifity which was the intent. This switches that from `0` to
`Float.MAX_VALUE` which is the "new way" to set the `requestsPerSecond`
to infinity. I don't know that this is much better, but it feels better.
Nik Everett 8 years ago
parent
commit
3261586cac

+ 18 - 13
core/src/main/java/org/elasticsearch/index/reindex/WorkingBulkByScrollTask.java

@@ -90,8 +90,9 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
 
     @Override
     protected void onCancelled() {
-        // Drop the throttle to 0, immediately rescheduling all outstanding tasks so the task will wake up and cancel itself.
-        rethrottle(0);
+        /* Drop the throttle to 0, immediately rescheduling any throttled
+         * operation so it will wake up and cancel itself. */
+        rethrottle(Float.POSITIVE_INFINITY);
     }
 
     @Override
@@ -179,6 +180,7 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
         // Synchronize so we are less likely to schedule the same request twice.
         synchronized (delayedPrepareBulkRequestReference) {
             TimeValue delay = throttleWaitTime(lastBatchStartTime, timeValueNanos(System.nanoTime()), lastBatchSize);
+            logger.debug("[{}]: preparing bulk request for [{}]", getId(), delay);
             delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(),
                     delay, new RunOnce(prepareBulkRequestRunnable)));
         }
@@ -205,6 +207,9 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
     }
 
     private void setRequestsPerSecond(float requestsPerSecond) {
+        if (requestsPerSecond <= 0) {
+            throw new IllegalArgumentException("requests per second must be more than 0 but was [" + requestsPerSecond + "]");
+        }
         this.requestsPerSecond = requestsPerSecond;
     }
 
@@ -216,8 +221,8 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
 
             DelayedPrepareBulkRequest delayedPrepareBulkRequest = this.delayedPrepareBulkRequestReference.get();
             if (delayedPrepareBulkRequest == null) {
+                // No request has been queued so nothing to reschedule.
                 logger.debug("[{}]: skipping rescheduling because there is no scheduled task", getId());
-                // No request has been queued yet so nothing to reschedule.
                 return;
             }
 
@@ -250,11 +255,11 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
         }
 
         DelayedPrepareBulkRequest rethrottle(float newRequestsPerSecond) {
-            if (newRequestsPerSecond != 0 && newRequestsPerSecond < requestsPerSecond) {
-                /*
-                 * The user is attempting to slow the request down. We'll let the change in throttle take effect the next time we delay
-                 * prepareBulkRequest. We can't just reschedule the request further out in the future the bulk context might time out.
-                 */
+            if (newRequestsPerSecond < requestsPerSecond) {
+                /* The user is attempting to slow the request down. We'll let the
+                 * change in throttle take effect the next time we delay
+                 * prepareBulkRequest. We can't just reschedule the request further
+                 * out in the future because the bulk context might time out. */
                 logger.debug("[{}]: skipping rescheduling because the new throttle [{}] is slower than the old one [{}]", getId(),
                         newRequestsPerSecond, requestsPerSecond);
                 return this;
@@ -268,10 +273,10 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
                 return this;
             }
 
-            /*
-             * Strangely enough getting here doesn't mean that you actually cancelled the request, just that you probably did. If you stress
-             * test it you'll find that requests sneak through. So each request is given a runOnce boolean to prevent that.
-             */
+            /* Strangely enough getting here doesn't mean that you actually
+             * cancelled the request, just that you probably did. If you stress
+             * test it you'll find that requests sneak through. So each request
+             * is given a runOnce boolean to prevent that. */
             TimeValue newDelay = newDelay(remainingDelay, newRequestsPerSecond);
             logger.debug("[{}]: rescheduling for [{}] in the future", getId(), newDelay);
             return new DelayedPrepareBulkRequest(threadPool, requestsPerSecond, newDelay, command);
@@ -281,7 +286,7 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
          * Scale back remaining delay to fit the new delay.
          */
         TimeValue newDelay(long remainingDelay, float newRequestsPerSecond) {
-            if (remainingDelay < 0 || newRequestsPerSecond == 0) {
+            if (remainingDelay < 0) {
                 return timeValueNanos(0);
             }
             return timeValueNanos(round(remainingDelay * requestsPerSecond / newRequestsPerSecond));

+ 1 - 1
core/src/main/java/org/elasticsearch/tasks/CancellableTask.java

@@ -52,7 +52,7 @@ public abstract class CancellableTask extends Task {
     }
 
     /**
-     * Returns true if this task should can potentially have children that needs to be cancelled when the parent is cancelled.
+     * Returns true if this task can potentially have children that need to be cancelled when it parent is cancelled.
      */
     public abstract boolean shouldCancelChildrenOnCancellation();
 

+ 17 - 4
modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.index.reindex;
 
 import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.DocWriteResponse;
@@ -232,7 +233,9 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
      * Start the action by firing the initial search request.
      */
     public void start() {
+        logger.debug("[{}]: starting", task.getId());
         if (task.isCancelled()) {
+            logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
             finishHim(null);
             return;
         }
@@ -251,7 +254,9 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
      * @param response the scroll response to process
      */
     void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, ScrollableHitSource.Response response) {
+        logger.debug("[{}]: got scroll response with [{}] hits", task.getId(), response.getHits().size());
         if (task.isCancelled()) {
+            logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
             finishHim(null);
             return;
         }
@@ -293,7 +298,9 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
      * thread may be blocked by the user script.
      */
     void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.Response response) {
+        logger.debug("[{}]: preparing bulk request", task.getId());
         if (task.isCancelled()) {
+            logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
             finishHim(null);
             return;
         }
@@ -320,10 +327,6 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
         }
         request.timeout(mainRequest.getTimeout());
         request.waitForActiveShards(mainRequest.getWaitForActiveShards());
-        if (logger.isDebugEnabled()) {
-            logger.debug("sending [{}] entry, [{}] bulk request", request.requests().size(),
-                    new ByteSizeValue(request.estimatedSizeInBytes()));
-        }
         sendBulkRequest(thisBatchStartTime, request);
     }
 
@@ -331,7 +334,12 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
      * Send a bulk request, handling retries.
      */
     void sendBulkRequest(TimeValue thisBatchStartTime, BulkRequest request) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("[{}]: sending [{}] entry, [{}] bulk request", task.getId(), request.requests().size(),
+                    new ByteSizeValue(request.estimatedSizeInBytes()));
+        }
         if (task.isCancelled()) {
+            logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
             finishHim(null);
             return;
         }
@@ -381,6 +389,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
             }
 
             if (task.isCancelled()) {
+                logger.debug("[{}]: Finishing early because the task was cancelled", task.getId());
                 finishHim(null);
                 return;
             }
@@ -412,6 +421,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
      */
     void startNextScroll(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) {
         if (task.isCancelled()) {
+            logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
             finishHim(null);
             return;
         }
@@ -442,6 +452,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
         }
         RefreshRequest refresh = new RefreshRequest();
         refresh.indices(destinationIndices.toArray(new String[destinationIndices.size()]));
+        logger.debug("[{}]: refreshing", task.getId());
         client.admin().indices().refresh(refresh, new ActionListener<RefreshResponse>() {
             @Override
             public void onResponse(RefreshResponse response) {
@@ -461,6 +472,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
      * @param failure if non null then the request failed catastrophically with this exception
      */
     protected void finishHim(Exception failure) {
+        logger.debug(() -> new ParameterizedMessage("[{}]: finishing with a catastrophic failure", task.getId()), failure);
         finishHim(failure, emptyList(), emptyList(), false);
     }
 
@@ -473,6 +485,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
      */
     protected void finishHim(Exception failure, List<Failure> indexingFailures,
             List<SearchFailure> searchFailures, boolean timedOut) {
+        logger.debug("[{}]: finishing without any catastrophic failures", task.getId());
         scrollSource.close(() -> {
             if (failure == null) {
                 BulkByScrollResponse response = buildResponse(

+ 2 - 3
modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java

@@ -561,13 +561,12 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
     }
 
     public void testCancelBeforeScrollResponse() throws Exception {
-        // We bail so early we don't need to pass in a half way valid response.
         cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1,
-                null));
+                new ScrollableHitSource.Response(false, emptyList(), between(1, 100000), emptyList(), null)));
     }
 
     public void testCancelBeforeSendBulkRequest() throws Exception {
-        // We bail so early we don't need to pass in a half way valid request.
+        // We bail so early we don't need to pass in a half way valid response.
         cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.sendBulkRequest(timeValueNanos(System.nanoTime()), null));
     }
 

+ 11 - 9
modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java

@@ -56,9 +56,9 @@ import static org.hamcrest.Matchers.hasSize;
 /**
  * Test that you can actually cancel a reindex/update-by-query/delete-by-query request and all the plumbing works. Doesn't test all of the
  * different cancellation places - that is the responsibility of AsyncBulkByScrollActionTests which have more precise control to
- * simulate failures but do not exercise important portion of the stack like transport and task management.
+ * simulate failures but does not exercise important portion of the stack like transport and task management.
  */
-@TestLogging("org.elasticsearch.action.bulk.byscroll:DEBUG,org.elasticsearch.index.reindex:DEBUG")
+@TestLogging("org.elasticsearch.index.reindex:DEBUG,org.elasticsearch.action.bulk:DEBUG")
 public class CancelTests extends ReindexTestCase {
 
     protected static final String INDEX = "reindex-cancel-index";
@@ -87,7 +87,7 @@ public class CancelTests extends ReindexTestCase {
                             Matcher<String> taskDescriptionMatcher) throws Exception {
         createIndex(INDEX);
 
-        // Total number of documents created for this test (~10 per primary shard per shard)
+        // Total number of documents created for this test (~10 per primary shard per slice)
         int numDocs = getNumShards(INDEX).numPrimaries * 10 * builder.request().getSlices();
         ALLOWED_OPERATIONS.release(numDocs);
 
@@ -231,12 +231,14 @@ public class CancelTests extends ReindexTestCase {
     }
 
     public void testReindexCancelWithWorkers() throws Exception {
-        testCancel(ReindexAction.NAME, reindex().source(INDEX).destination("dest", TYPE).setSlices(5), (response, total, modified) -> {
-            assertThat(response, matcher().created(modified).reasonCancelled(equalTo("by user request")).slices(hasSize(5)));
-
-            refresh("dest");
-            assertHitCount(client().prepareSearch("dest").setTypes(TYPE).setSize(0).get(), modified);
-        }, equalTo("reindex from [" + INDEX + "] to [dest][" + TYPE + "]"));
+        testCancel(ReindexAction.NAME,
+                reindex().source(INDEX).filter(QueryBuilders.matchAllQuery()).destination("dest", TYPE).setSlices(5),
+                (response, total, modified) -> {
+                    assertThat(response, matcher().created(modified).reasonCancelled(equalTo("by user request")).slices(hasSize(5)));
+                    refresh("dest");
+                    assertHitCount(client().prepareSearch("dest").setTypes(TYPE).setSize(0).get(), modified);
+                },
+                equalTo("reindex from [" + INDEX + "] to [dest][" + TYPE + "]"));
     }
 
     public void testUpdateByQueryCancelWithWorkers() throws Exception {

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java

@@ -37,7 +37,7 @@ public abstract class AbstractAsyncBulkByScrollActionTestCase<
     @Before
     public void setupForTest() {
         threadPool = new TestThreadPool(getTestName());
-        task = new WorkingBulkByScrollTask(1, "test", "test", "test", TaskId.EMPTY_TASK_ID, null, 0);
+        task = new WorkingBulkByScrollTask(1, "test", "test", "test", TaskId.EMPTY_TASK_ID, null, Float.MAX_VALUE);
     }
 
     @After