|
|
@@ -11,6 +11,7 @@ import org.elasticsearch.action.search.ShardSearchFailure;
|
|
|
import org.elasticsearch.client.Client;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
|
|
+import org.elasticsearch.persistent.PersistentTaskState;
|
|
|
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
|
|
import org.elasticsearch.search.aggregations.Aggregations;
|
|
|
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
|
|
|
@@ -64,7 +65,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
|
|
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
|
|
status, client, schedulerEngine, pool, Collections.emptyMap());
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
|
|
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
|
|
|
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
|
|
|
}
|
|
|
@@ -77,7 +78,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
|
|
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
|
|
status, client, schedulerEngine, pool, Collections.emptyMap());
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
|
|
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
|
|
|
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
|
|
|
}
|
|
|
@@ -90,7 +91,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
|
|
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
|
|
status, client, schedulerEngine, pool, Collections.emptyMap());
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
|
|
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
|
|
|
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
|
|
|
}
|
|
|
@@ -103,7 +104,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
|
|
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
|
|
status, client, schedulerEngine, pool, Collections.emptyMap());
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
|
|
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
|
|
|
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
|
|
|
}
|
|
|
@@ -116,7 +117,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
|
|
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
|
|
status, client, schedulerEngine, pool, Collections.emptyMap());
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
|
|
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
|
|
|
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
|
|
|
}
|
|
|
@@ -128,7 +129,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
|
|
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
|
|
null, client, schedulerEngine, pool, Collections.emptyMap());
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
|
|
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
|
|
|
}
|
|
|
|
|
|
@@ -140,7 +141,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
|
|
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
|
|
status, client, schedulerEngine, pool, Collections.emptyMap());
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
|
|
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
|
|
|
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
|
|
|
|
|
|
@@ -172,13 +173,14 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
|
|
null, client, schedulerEngine, pool, Collections.emptyMap()) {
|
|
|
@Override
|
|
|
- public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
|
|
- assertThat(status, instanceOf(RollupJobStatus.class));
|
|
|
+ public void updatePersistentTaskState(PersistentTaskState taskState,
|
|
|
+ ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
|
|
+ assertThat(taskState, instanceOf(RollupJobStatus.class));
|
|
|
int c = counter.get();
|
|
|
if (c == 0) {
|
|
|
- assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED));
|
|
|
+ assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
|
|
|
} else if (c == 1) {
|
|
|
- assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STOPPED));
|
|
|
+ assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
|
|
|
} else {
|
|
|
fail("Should not have updated persistent statuse > 2 times");
|
|
|
}
|
|
|
@@ -187,7 +189,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
counter.incrementAndGet();
|
|
|
}
|
|
|
};
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
|
|
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
|
|
|
|
|
|
CountDownLatch latch = new CountDownLatch(1);
|
|
|
@@ -195,7 +197,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
@Override
|
|
|
public void onResponse(StartRollupJobAction.Response response) {
|
|
|
assertTrue(response.isStarted());
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
|
|
latch.countDown();
|
|
|
}
|
|
|
|
|
|
@@ -207,7 +209,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
latch.await(3, TimeUnit.SECONDS);
|
|
|
|
|
|
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.INDEXING));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
|
|
|
assertThat(task.getStats().getNumInvocations(), equalTo(1L));
|
|
|
|
|
|
task.stop(new ActionListener<StopRollupJobAction.Response>() {
|
|
|
@@ -248,14 +250,15 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
|
|
status, client, schedulerEngine, pool, Collections.emptyMap()) {
|
|
|
@Override
|
|
|
- public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
|
|
- assertThat(status, instanceOf(RollupJobStatus.class));
|
|
|
- assertThat(((RollupJobStatus)status).getState(), equalTo(IndexerState.STARTED));
|
|
|
+ public void updatePersistentTaskState(PersistentTaskState taskState,
|
|
|
+ ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
|
|
+ assertThat(taskState, instanceOf(RollupJobStatus.class));
|
|
|
+ assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
|
|
|
listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
|
|
|
new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
|
|
|
}
|
|
|
};
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
|
|
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
|
|
|
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
|
|
|
|
|
|
@@ -264,7 +267,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
@Override
|
|
|
public void onResponse(StartRollupJobAction.Response response) {
|
|
|
assertTrue(response.isStarted());
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
|
|
latch.countDown();
|
|
|
}
|
|
|
|
|
|
@@ -285,14 +288,15 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
|
|
status, client, schedulerEngine, pool, Collections.emptyMap()) {
|
|
|
@Override
|
|
|
- public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
|
|
- assertThat(status, instanceOf(RollupJobStatus.class));
|
|
|
- assertThat(((RollupJobStatus)status).getState(), equalTo(IndexerState.STARTED));
|
|
|
+ public void updatePersistentTaskState(PersistentTaskState taskState,
|
|
|
+ ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
|
|
+ assertThat(taskState, instanceOf(RollupJobStatus.class));
|
|
|
+ assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
|
|
|
listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
|
|
|
new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
|
|
|
}
|
|
|
};
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
|
|
assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1));
|
|
|
assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo"));
|
|
|
|
|
|
@@ -301,7 +305,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
@Override
|
|
|
public void onResponse(StartRollupJobAction.Response response) {
|
|
|
assertTrue(response.isStarted());
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
|
|
latch.countDown();
|
|
|
}
|
|
|
|
|
|
@@ -313,7 +317,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
latch.await(3, TimeUnit.SECONDS);
|
|
|
|
|
|
task.triggered(new SchedulerEngine.Event("unrelated", 123, 123));
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED)); // Should still be started, not INDEXING
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
|
|
}
|
|
|
|
|
|
public void testTrigger() throws InterruptedException {
|
|
|
@@ -325,14 +329,15 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
|
|
null, client, schedulerEngine, pool, Collections.emptyMap()) {
|
|
|
@Override
|
|
|
- public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
|
|
- assertThat(status, instanceOf(RollupJobStatus.class));
|
|
|
- assertThat(((RollupJobStatus)status).getState(), equalTo(IndexerState.STARTED));
|
|
|
+ public void updatePersistentTaskState(PersistentTaskState taskState,
|
|
|
+ ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
|
|
+ assertThat(taskState, instanceOf(RollupJobStatus.class));
|
|
|
+ assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
|
|
|
listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
|
|
|
new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
|
|
|
}
|
|
|
};
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
|
|
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
|
|
|
|
|
|
CountDownLatch latch = new CountDownLatch(1);
|
|
|
@@ -340,7 +345,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
@Override
|
|
|
public void onResponse(StartRollupJobAction.Response response) {
|
|
|
assertTrue(response.isStarted());
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
|
|
latch.countDown();
|
|
|
}
|
|
|
|
|
|
@@ -352,7 +357,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
latch.await(3, TimeUnit.SECONDS);
|
|
|
|
|
|
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.INDEXING));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
|
|
|
assertThat(task.getStats().getNumInvocations(), equalTo(1L));
|
|
|
}
|
|
|
|
|
|
@@ -392,11 +397,12 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
|
|
null, client, schedulerEngine, pool, Collections.emptyMap()) {
|
|
|
@Override
|
|
|
- public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
|
|
+ public void updatePersistentTaskState(PersistentTaskState taskState,
|
|
|
+ ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
|
|
Integer counterValue = counter.getAndIncrement();
|
|
|
if (counterValue == 0) {
|
|
|
- assertThat(status, instanceOf(RollupJobStatus.class));
|
|
|
- assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED));
|
|
|
+ assertThat(taskState, instanceOf(RollupJobStatus.class));
|
|
|
+ assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
|
|
|
listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
|
|
|
new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
|
|
|
} else if (counterValue == 1) {
|
|
|
@@ -405,14 +411,14 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
|
|
|
}
|
|
|
};
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
|
|
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
|
|
|
|
|
|
task.start(new ActionListener<StartRollupJobAction.Response>() {
|
|
|
@Override
|
|
|
public void onResponse(StartRollupJobAction.Response response) {
|
|
|
assertTrue(response.isStarted());
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
|
|
started.set(true);
|
|
|
}
|
|
|
|
|
|
@@ -424,7 +430,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
ESTestCase.awaitBusy(started::get);
|
|
|
|
|
|
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.INDEXING)); // Should still be started, not INDEXING
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
|
|
|
assertThat(task.getStats().getNumInvocations(), equalTo(1L));
|
|
|
// Allow search response to return now
|
|
|
latch.countDown();
|
|
|
@@ -475,11 +481,12 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
|
|
null, client, schedulerEngine, pool, Collections.emptyMap()) {
|
|
|
@Override
|
|
|
- public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
|
|
+ public void updatePersistentTaskState(PersistentTaskState taskState,
|
|
|
+ ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
|
|
Integer counterValue = counter.getAndIncrement();
|
|
|
if (counterValue == 0) {
|
|
|
- assertThat(status, instanceOf(RollupJobStatus.class));
|
|
|
- assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED));
|
|
|
+ assertThat(taskState, instanceOf(RollupJobStatus.class));
|
|
|
+ assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
|
|
|
listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
|
|
|
new PersistentTasksCustomMetaData.Assignment("foo", "foo")));
|
|
|
} else if (counterValue == 1) {
|
|
|
@@ -488,14 +495,14 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
|
|
|
}
|
|
|
};
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
|
|
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
|
|
|
|
|
|
task.start(new ActionListener<StartRollupJobAction.Response>() {
|
|
|
@Override
|
|
|
public void onResponse(StartRollupJobAction.Response response) {
|
|
|
assertTrue(response.isStarted());
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
|
|
started.set(true);
|
|
|
}
|
|
|
|
|
|
@@ -507,7 +514,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
ESTestCase.awaitBusy(started::get);
|
|
|
|
|
|
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.INDEXING)); // Should still be started, not INDEXING
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
|
|
|
assertThat(task.getStats().getNumInvocations(), equalTo(1L));
|
|
|
// Allow search response to return now
|
|
|
latch.countDown();
|
|
|
@@ -524,7 +531,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
|
|
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
|
|
status, client, schedulerEngine, pool, Collections.emptyMap());
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
|
|
|
|
|
CountDownLatch latch = new CountDownLatch(1);
|
|
|
task.stop(new ActionListener<StopRollupJobAction.Response>() {
|
|
|
@@ -553,15 +560,16 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
|
|
null, client, schedulerEngine, pool, Collections.emptyMap()) {
|
|
|
@Override
|
|
|
- public void updatePersistentStatus(Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
|
|
- assertThat(status, instanceOf(RollupJobStatus.class));
|
|
|
+ public void updatePersistentTaskState(PersistentTaskState taskState,
|
|
|
+ ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
|
|
|
+ assertThat(taskState, instanceOf(RollupJobStatus.class));
|
|
|
int c = counter.get();
|
|
|
if (c == 0) {
|
|
|
- assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED));
|
|
|
+ assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
|
|
|
} else if (c == 1) {
|
|
|
- assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STOPPED));
|
|
|
+ assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
|
|
|
} else if (c == 2) {
|
|
|
- assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STOPPED));
|
|
|
+ assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
|
|
|
} else {
|
|
|
fail("Should not have updated persistent statuse > 3 times");
|
|
|
}
|
|
|
@@ -571,7 +579,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
|
|
|
}
|
|
|
};
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
|
|
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
|
|
|
|
|
|
CountDownLatch latch = new CountDownLatch(1);
|
|
|
@@ -579,7 +587,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
@Override
|
|
|
public void onResponse(StartRollupJobAction.Response response) {
|
|
|
assertTrue(response.isStarted());
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
|
|
latch.countDown();
|
|
|
}
|
|
|
|
|
|
@@ -591,7 +599,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
latch.await(3, TimeUnit.SECONDS);
|
|
|
|
|
|
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.INDEXING));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
|
|
|
assertThat(task.getStats().getNumInvocations(), equalTo(1L));
|
|
|
|
|
|
task.stop(new ActionListener<StopRollupJobAction.Response>() {
|
|
|
@@ -642,7 +650,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
|
|
latch.countDown();
|
|
|
}
|
|
|
};
|
|
|
- assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED));
|
|
|
+ assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
|
|
|
|
|
task.onCancelled();
|
|
|
task.stop(new ActionListener<StopRollupJobAction.Response>() {
|