Browse Source

[Transform] Scheduler concurrency fix (#89716)

fix concurrency issue in transform scheduler, by combining the empty check
and the task getter

fixes #88991
Hendrik Muhs 3 years ago
parent
commit
8694393c0a

+ 6 - 0
docs/changelog/89716.yaml

@@ -0,0 +1,6 @@
+pr: 89716
+summary: Scheduler concurrency fix
+area: Transform
+type: bug
+issues:
+ - 88991

+ 6 - 8
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskQueue.java

@@ -44,16 +44,14 @@ class TransformScheduledTaskQueue {
     }
 
     /**
-     * @return whether the queue is empty.
-     */
-    public synchronized boolean isEmpty() {
-        return tasks.isEmpty();
-    }
-
-    /**
-     * @return the task with the *lowest* priority.
+     * @return the task with the *lowest* priority or null if the queue is empty.
      */
     public synchronized TransformScheduledTask first() {
+        // gh#88991 concurrent access: the empty check must run within the synchronized context
+        if (tasks.isEmpty()) {
+            return null;
+        }
+
         return tasks.first();
     }
 

+ 4 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java

@@ -129,12 +129,14 @@ public final class TransformScheduler {
     }
 
     private boolean processScheduledTasksInternal() {
-        if (scheduledTasks.isEmpty()) {
+        TransformScheduledTask scheduledTask = scheduledTasks.first();
+
+        if (scheduledTask == null) {
             // There are no scheduled tasks, hence, nothing to do
             return false;
         }
         long currentTimeMillis = clock.millis();
-        TransformScheduledTask scheduledTask = scheduledTasks.first();
+
         // Check if the task is eligible for processing
         if (currentTimeMillis < scheduledTask.getNextScheduledTimeMillis()) {
             // It is too early to process this task.

+ 10 - 11
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskQueueTests.java

@@ -17,7 +17,6 @@ import org.junit.Before;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
@@ -27,7 +26,8 @@ import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.fail;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 
 public class TransformScheduledTaskQueueTests extends ESTestCase {
 
@@ -52,14 +52,14 @@ public class TransformScheduledTaskQueueTests extends ESTestCase {
 
     public void testNonEmptyQueue() {
         queue.add(createTask("task-1", 5));
-        assertThat(queue.isEmpty(), is(false));
+        assertThat(queue.first(), is(notNullValue()));
     }
 
     public void testAddAndRemove() {
         queue.add(createTask("task-1", 5));
         queue.add(createTask("task-2", 1));
         queue.add(createTask("task-3", 9));
-        assertThat(queue.isEmpty(), is(false));
+        assertThat(queue.first(), is(notNullValue()));
         assertThat(queue.getTransformIds(), containsInAnyOrder("task-1", "task-2", "task-3"));
         assertThat(queue.first(), is(equalTo(createTask("task-2", 1))));
 
@@ -83,7 +83,7 @@ public class TransformScheduledTaskQueueTests extends ESTestCase {
                 assertThat(taskAdded, is(true));
             }
         }
-        assertThat(queue.isEmpty(), is(false));
+        assertThat(queue.first(), is(notNullValue()));
         assertThat(queue.getTransformIds(), hasSize(100));
 
         {
@@ -117,7 +117,7 @@ public class TransformScheduledTaskQueueTests extends ESTestCase {
         queue.add(createTask("task-1", 5));
         queue.remove("task-non-existent");
         // Verify that the remove operation had no effect
-        assertThat(queue.isEmpty(), is(false));
+        assertThat(queue.first(), is(notNullValue()));
         assertThat(queue.getTransformIds(), containsInAnyOrder("task-1"));
         assertThat(queue.first(), is(equalTo(createTask("task-1", 5))));
     }
@@ -126,7 +126,7 @@ public class TransformScheduledTaskQueueTests extends ESTestCase {
         queue.add(createTask("task-1", 5));
         queue.update("task-non-existent", task -> createTask(task.getTransformId(), -999));
         // Verify that the update operation had no effect
-        assertThat(queue.isEmpty(), is(false));
+        assertThat(queue.first(), is(notNullValue()));
         assertThat(queue.getTransformIds(), containsInAnyOrder("task-1"));
         assertThat(queue.first(), is(equalTo(createTask("task-1", 5))));
     }
@@ -147,7 +147,7 @@ public class TransformScheduledTaskQueueTests extends ESTestCase {
         queue.add(createTask("task-7", 0));
         queue.add(createTask("task-8", 2));
         queue.add(createTask("task-9", 4));
-        assertThat(queue.isEmpty(), is(false));
+        assertThat(queue.first(), is(notNullValue()));
         assertThat(
             queue.getTransformIds(),
             containsInAnyOrder("task-1", "task-2", "task-3", "task-4", "task-5", "task-6", "task-7", "task-8", "task-9")
@@ -155,7 +155,7 @@ public class TransformScheduledTaskQueueTests extends ESTestCase {
         assertThat(queue.first(), is(equalTo(createTask("task-7", 0))));
 
         List<TransformScheduledTask> tasksByPriority = new ArrayList<>();
-        while (queue.isEmpty() == false) {
+        while (queue.first() != null) {
             TransformScheduledTask task = queue.first();
             tasksByPriority.add(task);
             queue.remove(task.getTransformId());
@@ -210,8 +210,7 @@ public class TransformScheduledTaskQueueTests extends ESTestCase {
     }
 
     private void assertThatQueueIsEmpty() {
-        assertThat(queue.isEmpty(), is(true));
+        assertThat(queue.first(), is(nullValue()));
         assertThat(queue.getTransformIds(), is(empty()));
-        expectThrows(NoSuchElementException.class, () -> queue.first());
     }
 }