Browse Source

Fix RollupJobTaskTests Leaking Threads on Slowness (#56438)

We are ensuring order in the two tests changed by waiting on latches.
The problem is, that 3s is a pretty short wait and on CI can randomly be exceeded
by pure chance. If that happened we wouldn't have visibility on it since we didn't
assert that the waits actually worked.
=> Fixed by asserting that the waits work and upping the timeout to our standard 10s
Also, moved to a per-test threadpool to make it simpler to identify which test failed,
should an unexpected task run on a closed client's pool afterall.
Armin Braun 5 years ago
parent
commit
c53f8b9795

+ 28 - 20
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java

@@ -33,7 +33,8 @@ import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction;
 import org.elasticsearch.xpack.core.rollup.job.RollupJob;
 import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus;
 import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
-import org.junit.AfterClass;
+import org.junit.After;
+import org.junit.Before;
 
 import java.time.Clock;
 import java.util.Collections;
@@ -56,14 +57,17 @@ public class RollupJobTaskTests extends ESTestCase {
     private static final Settings SETTINGS = Settings.builder()
             .put(Node.NODE_NAME_SETTING.getKey(), "test")
             .build();
-    private static ThreadPool pool = new TestThreadPool("test");
 
-    @AfterClass
-    public static void stopThreadPool() {
-        if (pool != null) {
-            pool.shutdownNow();
-            pool = null;
-        }
+    private ThreadPool pool;
+
+    @Before
+    public void createThreadPool() {
+        pool = new TestThreadPool("test");
+    }
+
+    @After
+    public void stopThreadPool() {
+        assertThat(ThreadPool.terminate(pool, 10L, TimeUnit.SECONDS), equalTo(true));
     }
 
     public void testInitialStatusStopped() {
@@ -248,14 +252,14 @@ public class RollupJobTaskTests extends ESTestCase {
                     fail("Should not have entered onFailure");
                 }
             });
-            latch.await(3, TimeUnit.SECONDS);
+            assertUnblockIn10s(latch);
 
             task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
             assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
             assertThat(task.getStats().getNumInvocations(), equalTo(1L));
 
             // wait until the search request is send, this is unblocked in the client
-            block.await(3, TimeUnit.SECONDS);
+            assertUnblockIn10s(block);
             task.stop(new ActionListener<StopRollupJobAction.Response>() {
                 @Override
                 public void onResponse(StopRollupJobAction.Response response) {
@@ -285,7 +289,7 @@ public class RollupJobTaskTests extends ESTestCase {
                     latch2.countDown();
                 }
             });
-            latch2.await(3, TimeUnit.SECONDS);
+            assertUnblockIn10s(latch2);
 
             // the the client answer
             unblock.countDown();
@@ -749,14 +753,14 @@ public class RollupJobTaskTests extends ESTestCase {
                     fail("Should not have entered onFailure");
                 }
             });
-            latch.await(3, TimeUnit.SECONDS);
+            assertUnblockIn10s(latch);
 
             task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
             assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
             assertThat(task.getStats().getNumInvocations(), equalTo(1L));
 
             // wait until the search request is send, this is unblocked in the client
-            block.await(3, TimeUnit.SECONDS);
+            assertUnblockIn10s(block);
 
             task.stop(new ActionListener<StopRollupJobAction.Response>() {
                 @Override
@@ -786,7 +790,7 @@ public class RollupJobTaskTests extends ESTestCase {
                     fail("Should not have entered onFailure");
                 }
             });
-            latch2.await(3, TimeUnit.SECONDS);
+            assertUnblockIn10s(latch2);
             unblock.countDown();
         }
     }
@@ -833,18 +837,22 @@ public class RollupJobTaskTests extends ESTestCase {
         latch.await(3, TimeUnit.SECONDS);
     }
 
+    private static void assertUnblockIn10s(CountDownLatch latch) {
+        try {
+            assertThat(latch.await(10, TimeUnit.SECONDS), equalTo(true));
+        } catch (InterruptedException e) {
+            throw new AssertionError("Should not have been interrupted", e);
+        }
+    }
+
     private NoOpClient getEmptySearchResponseClient(CountDownLatch unblock, CountDownLatch block) {
         return new NoOpClient(getTestName()) {
             @SuppressWarnings("unchecked")
             @Override
             protected <Request extends ActionRequest, Response extends ActionResponse>
             void doExecute(ActionType<Response> action, Request request, ActionListener<Response> listener) {
-                try {
-                    unblock.countDown();
-                    block.await(3, TimeUnit.SECONDS);
-                } catch (InterruptedException e) {
-                    fail("Should not have timed out");
-                }
+                unblock.countDown();
+                assertUnblockIn10s(block);
                 listener.onResponse((Response) mock(SearchResponse.class));
             }
         };