Browse Source

Revert "Refactor AllocatedPersistentTask#init(), move rollup logic out of ctor (#46288)"

This reverts commit d999942c6dfd931266d01db24d3fb26b29cf8f64.
Zachary Tong 6 years ago
parent
commit
f336c74788

+ 4 - 4
server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java

@@ -18,7 +18,6 @@
  */
  */
 package org.elasticsearch.persistent;
 package org.elasticsearch.persistent;
 
 
-import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListener;
@@ -38,13 +37,13 @@ import java.util.function.Predicate;
  */
  */
 public class AllocatedPersistentTask extends CancellableTask {
 public class AllocatedPersistentTask extends CancellableTask {
 
 
-    private static final Logger logger = LogManager.getLogger(AllocatedPersistentTask.class);
     private final AtomicReference<State> state;
     private final AtomicReference<State> state;
 
 
     private volatile String persistentTaskId;
     private volatile String persistentTaskId;
     private volatile long allocationId;
     private volatile long allocationId;
     private volatile @Nullable Exception failure;
     private volatile @Nullable Exception failure;
     private volatile PersistentTasksService persistentTasksService;
     private volatile PersistentTasksService persistentTasksService;
+    private volatile Logger logger;
     private volatile TaskManager taskManager;
     private volatile TaskManager taskManager;
 
 
     public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask,
     public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask,
@@ -86,9 +85,10 @@ public class AllocatedPersistentTask extends CancellableTask {
         return persistentTaskId;
         return persistentTaskId;
     }
     }
 
 
-    protected void init(PersistentTasksService persistentTasksService, TaskManager taskManager,
-                        String persistentTaskId, long allocationId) {
+    void init(PersistentTasksService persistentTasksService, TaskManager taskManager, Logger logger, String persistentTaskId, long
+            allocationId) {
         this.persistentTasksService = persistentTasksService;
         this.persistentTasksService = persistentTasksService;
+        this.logger = logger;
         this.taskManager = taskManager;
         this.taskManager = taskManager;
         this.persistentTaskId = persistentTaskId;
         this.persistentTaskId = persistentTaskId;
         this.allocationId = allocationId;
         this.allocationId = allocationId;

+ 1 - 1
server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java

@@ -183,7 +183,7 @@ public class PersistentTasksNodeService implements ClusterStateListener {
 
 
         boolean processed = false;
         boolean processed = false;
         try {
         try {
-            task.init(persistentTasksService, taskManager, taskInProgress.getId(), taskInProgress.getAllocationId());
+            task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId());
             logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was created", task.getAction(),
             logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was created", task.getAction(),
                     task.getPersistentTaskId(), task.getAllocationId());
                     task.getPersistentTaskId(), task.getAllocationId());
             try {
             try {

+ 16 - 33
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java

@@ -22,9 +22,7 @@ import org.elasticsearch.persistent.AllocatedPersistentTask;
 import org.elasticsearch.persistent.PersistentTaskState;
 import org.elasticsearch.persistent.PersistentTaskState;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 import org.elasticsearch.persistent.PersistentTasksExecutor;
 import org.elasticsearch.persistent.PersistentTasksExecutor;
-import org.elasticsearch.persistent.PersistentTasksService;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.tasks.TaskId;
-import org.elasticsearch.tasks.TaskManager;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.elasticsearch.xpack.core.indexing.IndexerState;
@@ -152,10 +150,7 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
     private final RollupJob job;
     private final RollupJob job;
     private final SchedulerEngine schedulerEngine;
     private final SchedulerEngine schedulerEngine;
     private final ThreadPool threadPool;
     private final ThreadPool threadPool;
-    private final Client client;
-    private final IndexerState initialIndexerState;
-    private final Map<String, Object> initialPosition;
-    private RollupIndexer indexer;
+    private final RollupIndexer indexer;
 
 
     RollupJobTask(long id, String type, String action, TaskId parentTask, RollupJob job, RollupJobStatus state,
     RollupJobTask(long id, String type, String action, TaskId parentTask, RollupJob job, RollupJobStatus state,
                   Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool, Map<String, String> headers) {
                   Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool, Map<String, String> headers) {
@@ -163,48 +158,36 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
         this.job = job;
         this.job = job;
         this.schedulerEngine = schedulerEngine;
         this.schedulerEngine = schedulerEngine;
         this.threadPool = threadPool;
         this.threadPool = threadPool;
-        this.client = client;
-        if (state == null) {
-            this.initialIndexerState = null;
-            this.initialPosition = null;
-        } else {
-            this.initialIndexerState = state.getIndexerState();
-            this.initialPosition = state.getPosition();
-        }
-
-    }
 
 
-    @Override
-    protected void init(PersistentTasksService persistentTasksService, TaskManager taskManager,
-                        String persistentTaskId, long allocationId) {
-        super.init(persistentTasksService, taskManager, persistentTaskId, allocationId);
-
-        // If initial position is not null, we are resuming rather than starting fresh.
-        IndexerState indexerState = IndexerState.STOPPED;
-        if (initialPosition != null) {
-            logger.debug("We have existing state, setting state to [" + initialIndexerState + "] " +
-                "and current position to [" + initialPosition + "] for job [" + job.getConfig().getId() + "]");
-            if (initialIndexerState.equals(IndexerState.INDEXING)) {
+        // If status is not null, we are resuming rather than starting fresh.
+        Map<String, Object> initialPosition = null;
+        IndexerState initialState = IndexerState.STOPPED;
+        if (state != null) {
+            final IndexerState existingState = state.getIndexerState();
+            logger.debug("We have existing state, setting state to [" + existingState + "] " +
+                    "and current position to [" + state.getPosition() + "] for job [" + job.getConfig().getId() + "]");
+            if (existingState.equals(IndexerState.INDEXING)) {
                 /*
                 /*
                  * If we were indexing, we have to reset back to STARTED otherwise the indexer will be "stuck" thinking
                  * If we were indexing, we have to reset back to STARTED otherwise the indexer will be "stuck" thinking
                  * it is indexing but without the actual indexing thread running.
                  * it is indexing but without the actual indexing thread running.
                  */
                  */
-                indexerState = IndexerState.STARTED;
+                initialState = IndexerState.STARTED;
 
 
-            } else if (initialIndexerState.equals(IndexerState.ABORTING) || initialIndexerState.equals(IndexerState.STOPPING)) {
+            } else if (existingState.equals(IndexerState.ABORTING) || existingState.equals(IndexerState.STOPPING)) {
                 // It shouldn't be possible to persist ABORTING, but if for some reason it does,
                 // It shouldn't be possible to persist ABORTING, but if for some reason it does,
                 // play it safe and restore the job as STOPPED.  An admin will have to clean it up,
                 // play it safe and restore the job as STOPPED.  An admin will have to clean it up,
                 // but it won't be running, and won't delete itself either.  Safest option.
                 // but it won't be running, and won't delete itself either.  Safest option.
                 // If we were STOPPING, that means it persisted but was killed before finally stopped... so ok
                 // If we were STOPPING, that means it persisted but was killed before finally stopped... so ok
                 // to restore as STOPPED
                 // to restore as STOPPED
-                indexerState = IndexerState.STOPPED;
+                initialState = IndexerState.STOPPED;
             } else  {
             } else  {
-                indexerState = initialIndexerState;
+                initialState = existingState;
             }
             }
+            initialPosition = state.getPosition();
 
 
         }
         }
-        this.indexer = new ClientRollupPageManager(job, indexerState, initialPosition,
-            new ParentTaskAssigningClient(client, getParentTaskId()));
+        this.indexer = new ClientRollupPageManager(job, initialState, initialPosition,
+                new ParentTaskAssigningClient(client, new TaskId(getPersistentTaskId())));
     }
     }
 
 
     @Override
     @Override

+ 19 - 56
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java

@@ -17,7 +17,6 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.tasks.TaskId;
-import org.elasticsearch.tasks.TaskManager;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -68,10 +67,8 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
-        TaskId taskId = new TaskId("node", 123);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
+        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
             status, client, schedulerEngine, pool, Collections.emptyMap());
             status, client, schedulerEngine, pool, Collections.emptyMap());
-        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@@ -83,10 +80,8 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
-        TaskId taskId = new TaskId("node", 123);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
+        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
             status, client, schedulerEngine, pool, Collections.emptyMap());
             status, client, schedulerEngine, pool, Collections.emptyMap());
-        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@@ -98,10 +93,8 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
-        TaskId taskId = new TaskId("node", 123);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
+        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
             status, client, schedulerEngine, pool, Collections.emptyMap());
             status, client, schedulerEngine, pool, Collections.emptyMap());
-        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@@ -113,10 +106,8 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
-        TaskId taskId = new TaskId("node", 123);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
+        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
             status, client, schedulerEngine, pool, Collections.emptyMap());
             status, client, schedulerEngine, pool, Collections.emptyMap());
-        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@@ -128,10 +119,8 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
-        TaskId taskId = new TaskId("node", 123);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
+        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
             status, client, schedulerEngine, pool, Collections.emptyMap());
             status, client, schedulerEngine, pool, Collections.emptyMap());
-        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@@ -143,10 +132,8 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
-        TaskId taskId = new TaskId("node", 123);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
+        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
             status, client, schedulerEngine, pool, Collections.emptyMap());
             status, client, schedulerEngine, pool, Collections.emptyMap());
-        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@@ -157,10 +144,8 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
-        TaskId taskId = new TaskId("node", 123);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
+        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
             null, client, schedulerEngine, pool, Collections.emptyMap());
             null, client, schedulerEngine, pool, Collections.emptyMap());
-        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
     }
     }
@@ -171,10 +156,8 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
-        TaskId taskId = new TaskId("node", 123);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
+        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
             status, client, schedulerEngine, pool, Collections.emptyMap());
             status, client, schedulerEngine, pool, Collections.emptyMap());
-        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@@ -203,9 +186,8 @@ public class RollupJobTaskTests extends ESTestCase {
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
 
 
         AtomicInteger counter = new AtomicInteger(0);
         AtomicInteger counter = new AtomicInteger(0);
-        TaskId taskId = new TaskId("node", 123);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
-             null, client, schedulerEngine, pool, Collections.emptyMap()) {
+        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
+            null, client, schedulerEngine, pool, Collections.emptyMap()) {
             @Override
             @Override
             public void updatePersistentTaskState(PersistentTaskState taskState,
             public void updatePersistentTaskState(PersistentTaskState taskState,
                                                   ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
                                                   ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
@@ -223,7 +205,6 @@ public class RollupJobTaskTests extends ESTestCase {
                 counter.incrementAndGet();
                 counter.incrementAndGet();
             }
             }
         };
         };
-        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
 
 
@@ -282,8 +263,7 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
-        TaskId taskId = new TaskId("node", 123);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
+        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
             status, client, schedulerEngine, pool, Collections.emptyMap()) {
             status, client, schedulerEngine, pool, Collections.emptyMap()) {
             @Override
             @Override
             public void updatePersistentTaskState(PersistentTaskState taskState,
             public void updatePersistentTaskState(PersistentTaskState taskState,
@@ -294,7 +274,6 @@ public class RollupJobTaskTests extends ESTestCase {
                     new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
                     new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
             }
             }
         };
         };
-        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@@ -322,8 +301,7 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
-        TaskId taskId = new TaskId("node", 123);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
+        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
             status, client, schedulerEngine, pool, Collections.emptyMap()) {
             status, client, schedulerEngine, pool, Collections.emptyMap()) {
             @Override
             @Override
             public void updatePersistentTaskState(PersistentTaskState taskState,
             public void updatePersistentTaskState(PersistentTaskState taskState,
@@ -334,7 +312,6 @@ public class RollupJobTaskTests extends ESTestCase {
                     new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
                     new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
             }
             }
         };
         };
-        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@@ -365,8 +342,7 @@ public class RollupJobTaskTests extends ESTestCase {
         when(client.settings()).thenReturn(Settings.EMPTY);
         when(client.settings()).thenReturn(Settings.EMPTY);
         when(client.threadPool()).thenReturn(pool);
         when(client.threadPool()).thenReturn(pool);
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
-        TaskId taskId = new TaskId("node", 123);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
+        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
             null, client, schedulerEngine, pool, Collections.emptyMap()) {
             null, client, schedulerEngine, pool, Collections.emptyMap()) {
             @Override
             @Override
             public void updatePersistentTaskState(PersistentTaskState taskState,
             public void updatePersistentTaskState(PersistentTaskState taskState,
@@ -377,7 +353,6 @@ public class RollupJobTaskTests extends ESTestCase {
                     new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
                     new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
             }
             }
         };
         };
-        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
 
 
@@ -435,8 +410,7 @@ public class RollupJobTaskTests extends ESTestCase {
         }).when(client).execute(anyObject(), anyObject(), anyObject());
         }).when(client).execute(anyObject(), anyObject(), anyObject());
 
 
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
-        TaskId taskId = new TaskId("node", 123);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
+        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
             null, client, schedulerEngine, pool, Collections.emptyMap()) {
             null, client, schedulerEngine, pool, Collections.emptyMap()) {
             @Override
             @Override
             public void updatePersistentTaskState(PersistentTaskState taskState,
             public void updatePersistentTaskState(PersistentTaskState taskState,
@@ -453,7 +427,6 @@ public class RollupJobTaskTests extends ESTestCase {
 
 
             }
             }
         };
         };
-        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
 
 
@@ -521,8 +494,7 @@ public class RollupJobTaskTests extends ESTestCase {
         }).when(client).execute(anyObject(), anyObject(), anyObject());
         }).when(client).execute(anyObject(), anyObject(), anyObject());
 
 
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
-        TaskId taskId = new TaskId("node", 123);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
+        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
             null, client, schedulerEngine, pool, Collections.emptyMap()) {
             null, client, schedulerEngine, pool, Collections.emptyMap()) {
             @Override
             @Override
             public void updatePersistentTaskState(PersistentTaskState taskState,
             public void updatePersistentTaskState(PersistentTaskState taskState,
@@ -539,7 +511,6 @@ public class RollupJobTaskTests extends ESTestCase {
 
 
             }
             }
         };
         };
-        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
 
 
@@ -608,8 +579,7 @@ public class RollupJobTaskTests extends ESTestCase {
 
 
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
         RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null);
         RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null);
-        TaskId taskId = new TaskId("node", 123);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
+        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
             status, client, schedulerEngine, pool, Collections.emptyMap()) {
             status, client, schedulerEngine, pool, Collections.emptyMap()) {
             @Override
             @Override
             public void updatePersistentTaskState(PersistentTaskState taskState,
             public void updatePersistentTaskState(PersistentTaskState taskState,
@@ -626,7 +596,6 @@ public class RollupJobTaskTests extends ESTestCase {
 
 
             }
             }
         };
         };
-        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
 
 
@@ -661,10 +630,8 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
-        TaskId taskId = new TaskId("node", 123);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
+        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
             status, client, schedulerEngine, pool, Collections.emptyMap());
             status, client, schedulerEngine, pool, Collections.emptyMap());
-        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
 
 
         CountDownLatch latch = new CountDownLatch(1);
         CountDownLatch latch = new CountDownLatch(1);
@@ -691,8 +658,7 @@ public class RollupJobTaskTests extends ESTestCase {
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
 
 
         AtomicInteger counter = new AtomicInteger(0);
         AtomicInteger counter = new AtomicInteger(0);
-        TaskId taskId = new TaskId("node", 123);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
+        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
             null, client, schedulerEngine, pool, Collections.emptyMap()) {
             null, client, schedulerEngine, pool, Collections.emptyMap()) {
             @Override
             @Override
             public void updatePersistentTaskState(PersistentTaskState taskState,
             public void updatePersistentTaskState(PersistentTaskState taskState,
@@ -714,7 +680,6 @@ public class RollupJobTaskTests extends ESTestCase {
 
 
             }
             }
         };
         };
-        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
 
 
@@ -779,15 +744,13 @@ public class RollupJobTaskTests extends ESTestCase {
         // the task would end before stop could be called.  But to help test out all pathways,
         // the task would end before stop could be called.  But to help test out all pathways,
         // just in case, we can override markAsCompleted so it's a no-op and test how stop
         // just in case, we can override markAsCompleted so it's a no-op and test how stop
         // handles the situation
         // handles the situation
-        TaskId taskId = new TaskId("node", 123);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
+        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
             status, client, schedulerEngine, pool, Collections.emptyMap()) {
             status, client, schedulerEngine, pool, Collections.emptyMap()) {
             @Override
             @Override
             public void markAsCompleted() {
             public void markAsCompleted() {
                 latch.countDown();
                 latch.countDown();
             }
             }
         };
         };
-        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
 
 
         task.onCancelled();
         task.onCancelled();