|
@@ -211,8 +211,10 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Attempt to start the indexer. If the state is anything other than STOPPED, this will fail.
|
|
|
- * Otherwise, the persistent task's status will be updated to reflect the change.
|
|
|
+ * Attempt to start the indexer.
|
|
|
+ * - If the indexer is started/indexing, returns OK
|
|
|
+ * - If the indexer is stopped, starts task, updates persistent task's status, returns ok
|
|
|
+ * - Anything else returns error
|
|
|
*
|
|
|
* Note that while the job is started, the indexer will not necessarily run immediately. That
|
|
|
* will only occur when the scheduler triggers it based on the cron
|
|
@@ -221,8 +223,14 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
|
|
|
*/
|
|
|
public synchronized void start(ActionListener<StartRollupJobAction.Response> listener) {
|
|
|
final IndexerState prevState = indexer.getState();
|
|
|
- if (prevState != IndexerState.STOPPED) {
|
|
|
- // fails if the task is not STOPPED
|
|
|
+
|
|
|
+ if (prevState == IndexerState.STARTED || prevState == IndexerState.INDEXING) {
|
|
|
+ // We're already running so just return acknowledgement
|
|
|
+ logger.debug("Indexer already running (State: [" + prevState + "]), acknowledging start without change.");
|
|
|
+ listener.onResponse(new StartRollupJobAction.Response(true));
|
|
|
+ return;
|
|
|
+ } else if (prevState != IndexerState.STOPPED) {
|
|
|
+ // if we're not already started/indexing, we must be STOPPED to get started
|
|
|
listener.onFailure(new ElasticsearchException("Cannot start task for Rollup Job [" + job.getConfig().getId() + "] because"
|
|
|
+ " state was [" + prevState + "]"));
|
|
|
return;
|
|
@@ -231,11 +239,10 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
|
|
|
final IndexerState newState = indexer.start();
|
|
|
if (newState != IndexerState.STARTED) {
|
|
|
listener.onFailure(new ElasticsearchException("Cannot start task for Rollup Job [" + job.getConfig().getId() + "] because"
|
|
|
- + " state was [" + newState + "]"));
|
|
|
+ + " new state was [" + newState + "]"));
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
final RollupJobStatus state = new RollupJobStatus(IndexerState.STARTED, indexer.getPosition());
|
|
|
logger.debug("Updating state for rollup job [" + job.getConfig().getId() + "] to [" + state.getIndexerState() + "][" +
|
|
|
state.getPosition() + "]");
|