Browse Source

Refactor AllocatedPersistentTask#init(), move rollup logic out of ctor (#46288)

This makes the AllocatedPersistentTask#init() method protected so that
implementing classes can perform their initialization logic there,
instead of the constructor.  Rollup's task is adjusted to use this
init method.

It also slightly refactors the methods to se a static logger in the 
AllocatedTask instead of passing it in via an argument.  This is 
simpler, logged messages come from the task instead of the 
service, and is easier for tests
Zachary Tong 6 years ago
parent
commit
d999942c6d

+ 4 - 4
server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java

@@ -18,6 +18,7 @@
  */
 package org.elasticsearch.persistent;
 
+import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
@@ -37,13 +38,13 @@ import java.util.function.Predicate;
  */
 public class AllocatedPersistentTask extends CancellableTask {
 
+    private static final Logger logger = LogManager.getLogger(AllocatedPersistentTask.class);
     private final AtomicReference<State> state;
 
     private volatile String persistentTaskId;
     private volatile long allocationId;
     private volatile @Nullable Exception failure;
     private volatile PersistentTasksService persistentTasksService;
-    private volatile Logger logger;
     private volatile TaskManager taskManager;
 
     public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask,
@@ -85,10 +86,9 @@ public class AllocatedPersistentTask extends CancellableTask {
         return persistentTaskId;
     }
 
-    void init(PersistentTasksService persistentTasksService, TaskManager taskManager, Logger logger, String persistentTaskId, long
-            allocationId) {
+    protected void init(PersistentTasksService persistentTasksService, TaskManager taskManager,
+                        String persistentTaskId, long allocationId) {
         this.persistentTasksService = persistentTasksService;
-        this.logger = logger;
         this.taskManager = taskManager;
         this.persistentTaskId = persistentTaskId;
         this.allocationId = allocationId;

+ 1 - 1
server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java

@@ -183,7 +183,7 @@ public class PersistentTasksNodeService implements ClusterStateListener {
 
         boolean processed = false;
         try {
-            task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId());
+            task.init(persistentTasksService, taskManager, taskInProgress.getId(), taskInProgress.getAllocationId());
             logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was created", task.getAction(),
                     task.getPersistentTaskId(), task.getAllocationId());
             try {

+ 33 - 16
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java

@@ -22,7 +22,9 @@ import org.elasticsearch.persistent.AllocatedPersistentTask;
 import org.elasticsearch.persistent.PersistentTaskState;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 import org.elasticsearch.persistent.PersistentTasksExecutor;
+import org.elasticsearch.persistent.PersistentTasksService;
 import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.tasks.TaskManager;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.indexing.IndexerState;
@@ -150,7 +152,10 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
     private final RollupJob job;
     private final SchedulerEngine schedulerEngine;
     private final ThreadPool threadPool;
-    private final RollupIndexer indexer;
+    private final Client client;
+    private final IndexerState initialIndexerState;
+    private final Map<String, Object> initialPosition;
+    private RollupIndexer indexer;
 
     RollupJobTask(long id, String type, String action, TaskId parentTask, RollupJob job, RollupJobStatus state,
                   Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool, Map<String, String> headers) {
@@ -158,36 +163,48 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
         this.job = job;
         this.schedulerEngine = schedulerEngine;
         this.threadPool = threadPool;
+        this.client = client;
+        if (state == null) {
+            this.initialIndexerState = null;
+            this.initialPosition = null;
+        } else {
+            this.initialIndexerState = state.getIndexerState();
+            this.initialPosition = state.getPosition();
+        }
+
+    }
 
-        // If status is not null, we are resuming rather than starting fresh.
-        Map<String, Object> initialPosition = null;
-        IndexerState initialState = IndexerState.STOPPED;
-        if (state != null) {
-            final IndexerState existingState = state.getIndexerState();
-            logger.debug("We have existing state, setting state to [" + existingState + "] " +
-                    "and current position to [" + state.getPosition() + "] for job [" + job.getConfig().getId() + "]");
-            if (existingState.equals(IndexerState.INDEXING)) {
+    @Override
+    protected void init(PersistentTasksService persistentTasksService, TaskManager taskManager,
+                        String persistentTaskId, long allocationId) {
+        super.init(persistentTasksService, taskManager, persistentTaskId, allocationId);
+
+        // If initial position is not null, we are resuming rather than starting fresh.
+        IndexerState indexerState = IndexerState.STOPPED;
+        if (initialPosition != null) {
+            logger.debug("We have existing state, setting state to [" + initialIndexerState + "] " +
+                "and current position to [" + initialPosition + "] for job [" + job.getConfig().getId() + "]");
+            if (initialIndexerState.equals(IndexerState.INDEXING)) {
                 /*
                  * If we were indexing, we have to reset back to STARTED otherwise the indexer will be "stuck" thinking
                  * it is indexing but without the actual indexing thread running.
                  */
-                initialState = IndexerState.STARTED;
+                indexerState = IndexerState.STARTED;
 
-            } else if (existingState.equals(IndexerState.ABORTING) || existingState.equals(IndexerState.STOPPING)) {
+            } else if (initialIndexerState.equals(IndexerState.ABORTING) || initialIndexerState.equals(IndexerState.STOPPING)) {
                 // It shouldn't be possible to persist ABORTING, but if for some reason it does,
                 // play it safe and restore the job as STOPPED.  An admin will have to clean it up,
                 // but it won't be running, and won't delete itself either.  Safest option.
                 // If we were STOPPING, that means it persisted but was killed before finally stopped... so ok
                 // to restore as STOPPED
-                initialState = IndexerState.STOPPED;
+                indexerState = IndexerState.STOPPED;
             } else  {
-                initialState = existingState;
+                indexerState = initialIndexerState;
             }
-            initialPosition = state.getPosition();
 
         }
-        this.indexer = new ClientRollupPageManager(job, initialState, initialPosition,
-                new ParentTaskAssigningClient(client, new TaskId(getPersistentTaskId())));
+        this.indexer = new ClientRollupPageManager(job, indexerState, initialPosition,
+            new ParentTaskAssigningClient(client, getParentTaskId()));
     }
 
     @Override

+ 56 - 19
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java

@@ -17,6 +17,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
 import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.tasks.TaskManager;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -67,8 +68,10 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
-        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
+        TaskId taskId = new TaskId("node", 123);
+        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
             status, client, schedulerEngine, pool, Collections.emptyMap());
+        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@@ -80,8 +83,10 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
-        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
+        TaskId taskId = new TaskId("node", 123);
+        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
             status, client, schedulerEngine, pool, Collections.emptyMap());
+        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@@ -93,8 +98,10 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
-        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
+        TaskId taskId = new TaskId("node", 123);
+        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
             status, client, schedulerEngine, pool, Collections.emptyMap());
+        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@@ -106,8 +113,10 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
-        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
+        TaskId taskId = new TaskId("node", 123);
+        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
             status, client, schedulerEngine, pool, Collections.emptyMap());
+        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@@ -119,8 +128,10 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
-        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
+        TaskId taskId = new TaskId("node", 123);
+        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
             status, client, schedulerEngine, pool, Collections.emptyMap());
+        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@@ -132,8 +143,10 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
-        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
+        TaskId taskId = new TaskId("node", 123);
+        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
             status, client, schedulerEngine, pool, Collections.emptyMap());
+        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@@ -144,8 +157,10 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
-        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
+        TaskId taskId = new TaskId("node", 123);
+        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
             null, client, schedulerEngine, pool, Collections.emptyMap());
+        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
     }
@@ -156,8 +171,10 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
-        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
+        TaskId taskId = new TaskId("node", 123);
+        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
             status, client, schedulerEngine, pool, Collections.emptyMap());
+        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@@ -186,8 +203,9 @@ public class RollupJobTaskTests extends ESTestCase {
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
 
         AtomicInteger counter = new AtomicInteger(0);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
-            null, client, schedulerEngine, pool, Collections.emptyMap()) {
+        TaskId taskId = new TaskId("node", 123);
+        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
+             null, client, schedulerEngine, pool, Collections.emptyMap()) {
             @Override
             public void updatePersistentTaskState(PersistentTaskState taskState,
                                                   ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
@@ -205,6 +223,7 @@ public class RollupJobTaskTests extends ESTestCase {
                 counter.incrementAndGet();
             }
         };
+        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
 
@@ -263,7 +282,8 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
+        TaskId taskId = new TaskId("node", 123);
+        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
             status, client, schedulerEngine, pool, Collections.emptyMap()) {
             @Override
             public void updatePersistentTaskState(PersistentTaskState taskState,
@@ -274,6 +294,7 @@ public class RollupJobTaskTests extends ESTestCase {
                     new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
             }
         };
+        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@@ -301,7 +322,8 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
+        TaskId taskId = new TaskId("node", 123);
+        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
             status, client, schedulerEngine, pool, Collections.emptyMap()) {
             @Override
             public void updatePersistentTaskState(PersistentTaskState taskState,
@@ -312,6 +334,7 @@ public class RollupJobTaskTests extends ESTestCase {
                     new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
             }
         };
+        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
         assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
@@ -342,7 +365,8 @@ public class RollupJobTaskTests extends ESTestCase {
         when(client.settings()).thenReturn(Settings.EMPTY);
         when(client.threadPool()).thenReturn(pool);
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
+        TaskId taskId = new TaskId("node", 123);
+        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
             null, client, schedulerEngine, pool, Collections.emptyMap()) {
             @Override
             public void updatePersistentTaskState(PersistentTaskState taskState,
@@ -353,6 +377,7 @@ public class RollupJobTaskTests extends ESTestCase {
                     new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
             }
         };
+        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
 
@@ -410,7 +435,8 @@ public class RollupJobTaskTests extends ESTestCase {
         }).when(client).execute(anyObject(), anyObject(), anyObject());
 
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
+        TaskId taskId = new TaskId("node", 123);
+        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
             null, client, schedulerEngine, pool, Collections.emptyMap()) {
             @Override
             public void updatePersistentTaskState(PersistentTaskState taskState,
@@ -427,6 +453,7 @@ public class RollupJobTaskTests extends ESTestCase {
 
             }
         };
+        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
 
@@ -494,7 +521,8 @@ public class RollupJobTaskTests extends ESTestCase {
         }).when(client).execute(anyObject(), anyObject(), anyObject());
 
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
+        TaskId taskId = new TaskId("node", 123);
+        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
             null, client, schedulerEngine, pool, Collections.emptyMap()) {
             @Override
             public void updatePersistentTaskState(PersistentTaskState taskState,
@@ -511,6 +539,7 @@ public class RollupJobTaskTests extends ESTestCase {
 
             }
         };
+        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
 
@@ -579,7 +608,8 @@ public class RollupJobTaskTests extends ESTestCase {
 
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
         RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
+        TaskId taskId = new TaskId("node", 123);
+        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
             status, client, schedulerEngine, pool, Collections.emptyMap()) {
             @Override
             public void updatePersistentTaskState(PersistentTaskState taskState,
@@ -596,6 +626,7 @@ public class RollupJobTaskTests extends ESTestCase {
 
             }
         };
+        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
 
@@ -630,8 +661,10 @@ public class RollupJobTaskTests extends ESTestCase {
         Client client = mock(Client.class);
         when(client.settings()).thenReturn(Settings.EMPTY);
         SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
-        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
+        TaskId taskId = new TaskId("node", 123);
+        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
             status, client, schedulerEngine, pool, Collections.emptyMap());
+        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
 
         CountDownLatch latch = new CountDownLatch(1);
@@ -658,7 +691,8 @@ public class RollupJobTaskTests extends ESTestCase {
         SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);
 
         AtomicInteger counter = new AtomicInteger(0);
-        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
+        TaskId taskId = new TaskId("node", 123);
+        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
             null, client, schedulerEngine, pool, Collections.emptyMap()) {
             @Override
             public void updatePersistentTaskState(PersistentTaskState taskState,
@@ -680,6 +714,7 @@ public class RollupJobTaskTests extends ESTestCase {
 
             }
         };
+        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
         assertNull(((RollupJobStatus)task.getStatus()).getPosition());
 
@@ -744,13 +779,15 @@ public class RollupJobTaskTests extends ESTestCase {
         // the task would end before stop could be called.  But to help test out all pathways,
         // just in case, we can override markAsCompleted so it's a no-op and test how stop
         // handles the situation
-        RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
+        TaskId taskId = new TaskId("node", 123);
+        RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
             status, client, schedulerEngine, pool, Collections.emptyMap()) {
             @Override
             public void markAsCompleted() {
                 latch.countDown();
             }
         };
+        task.init(null, mock(TaskManager.class), taskId.toString(), 123);
         assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
 
         task.onCancelled();