Kaynağa Gözat

[ML Data Frame] Start directly data frame rather than via the scheduler (#42067)

Trigger indexer start directly to put the indexer in INDEXING state immediately
David Kyle 6 yıl önce
ebeveyn
işleme
cc988ce335

+ 3 - 1
client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java

@@ -72,6 +72,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.oneOf;
 
 public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
 
@@ -264,7 +265,8 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
         GetDataFrameTransformStatsResponse statsResponse = execute(new GetDataFrameTransformStatsRequest(id),
                 client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync);
         assertThat(statsResponse.getTransformsStateAndStats(), hasSize(1));
-        assertEquals(IndexerState.STARTED, statsResponse.getTransformsStateAndStats().get(0).getTransformState().getIndexerState());
+        IndexerState indexerState = statsResponse.getTransformsStateAndStats().get(0).getTransformState().getIndexerState();
+        assertThat(indexerState, is(oneOf(IndexerState.STARTED, IndexerState.INDEXING)));
 
         StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null);
         StopDataFrameTransformResponse stopResponse =

+ 6 - 18
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java

@@ -106,8 +106,6 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
     protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTransform params, PersistentTaskState state) {
         final String transformId = params.getId();
         final DataFrameTransformTask buildTask = (DataFrameTransformTask) task;
-        final SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(DataFrameTransformTask.SCHEDULE_NAME + "_" + transformId,
-            next());
         final DataFrameTransformState transformState = (DataFrameTransformState) state;
 
         final DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder =
@@ -137,7 +135,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
             stats -> {
                 indexerBuilder.setInitialStats(stats);
                 buildTask.initializeIndexer(indexerBuilder);
-                scheduleAndStartTask(buildTask, schedulerJob, startTaskListener);
+                startTask(buildTask, startTaskListener);
             },
             error -> {
                 if (error instanceof ResourceNotFoundException == false) {
@@ -145,7 +143,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
                 }
                 indexerBuilder.setInitialStats(new DataFrameIndexerTransformStats(transformId));
                 buildTask.initializeIndexer(indexerBuilder);
-                scheduleAndStartTask(buildTask, schedulerJob, startTaskListener);
+                startTask(buildTask, startTaskListener);
             }
         );
 
@@ -218,30 +216,20 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
         }
     }
 
-    private void scheduleAndStartTask(DataFrameTransformTask buildTask,
-                                      SchedulerEngine.Job schedulerJob,
-                                      ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
-        // Note that while the task is added to the scheduler here, the internal state will prevent
-        // it from doing any work until the task is "started" via the StartTransform api
-        schedulerEngine.register(buildTask);
-        schedulerEngine.add(schedulerJob);
-        logger.info("Data frame transform [{}] created.", buildTask.getTransformId());
+    private void startTask(DataFrameTransformTask buildTask,
+                           ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
         // If we are stopped, and it is an initial run, this means we have never been started,
         // attempt to start the task
         if (buildTask.getState().getTaskState().equals(DataFrameTransformTaskState.STOPPED) && buildTask.isInitialRun()) {
+            logger.info("Data frame transform [{}] created.", buildTask.getTransformId());
             buildTask.start(listener);
+
         } else {
             logger.debug("No need to start task. Its current state is: {}", buildTask.getState().getIndexerState());
             listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
         }
     }
 
-    static SchedulerEngine.Schedule next() {
-        return (startTime, now) -> {
-            return now + 1000; // to be fixed, hardcode something
-        };
-    }
-
     @Override
     protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
             PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform> persistentTask, Map<String, String> headers) {

+ 27 - 8
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -208,6 +208,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         persistStateToClusterState(state, ActionListener.wrap(
             task -> {
                 auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]");
+                long now = System.currentTimeMillis();
+                // kick off the indexer
+                triggered(new Event(schedulerJobName(), now, now));
+                registerWithSchedulerJob();
                 listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
             },
             exc -> {
@@ -238,7 +242,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             return;
         }
         //  for now no rerun, so only trigger if checkpoint == 0
-        if (currentCheckpoint.get() == 0 && event.getJobName().equals(SCHEDULE_NAME + "_" + transform.getId())) {
+        if (currentCheckpoint.get() == 0 && event.getJobName().equals(schedulerJobName())) {
             logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), getIndexer().getState());
             getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis());
         }
@@ -249,13 +253,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
      * This tries to remove the job from the scheduler and completes the persistent task
      */
     synchronized void shutdown() {
-        try {
-            schedulerEngine.remove(SCHEDULE_NAME + "_" + transform.getId());
-            schedulerEngine.unregister(this);
-        } catch (Exception e) {
-            markAsFailed(e);
-            return;
-        }
+        deregisterSchedulerJob();
         markAsCompleted();
     }
 
@@ -311,6 +309,27 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         }
     }
 
+    private void registerWithSchedulerJob() {
+        schedulerEngine.register(this);
+        final SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(schedulerJobName(), next());
+        schedulerEngine.add(schedulerJob);
+    }
+
+    private void deregisterSchedulerJob() {
+        schedulerEngine.remove(schedulerJobName());
+        schedulerEngine.unregister(this);
+    }
+
+    private String schedulerJobName() {
+        return DataFrameTransformTask.SCHEDULE_NAME + "_" + getTransformId();
+    }
+
+    private SchedulerEngine.Schedule next() {
+        return (startTime, now) -> {
+            return now + 1000; // to be fixed, hardcode something
+        };
+    }
+
     synchronized void initializeIndexer(ClientDataFrameIndexerBuilder indexerBuilder) {
         indexer.set(indexerBuilder.build(this));
     }

+ 4 - 4
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml

@@ -100,7 +100,7 @@ teardown:
         transform_id: "airline-transform-start-stop"
   - match: { count: 1 }
   - match: { transforms.0.id: "airline-transform-start-stop" }
-  - match: { transforms.0.state.indexer_state: "started" }
+  - match: { transforms.0.state.indexer_state: "/started|indexing/" }
   - match: { transforms.0.state.task_state: "started" }
 
   - do:
@@ -127,7 +127,7 @@ teardown:
         transform_id: "airline-transform-start-stop"
   - match: { count: 1 }
   - match: { transforms.0.id: "airline-transform-start-stop" }
-  - match: { transforms.0.state.indexer_state: "started" }
+  - match: { transforms.0.state.indexer_state: "/started|indexing/" }
   - match: { transforms.0.state.task_state: "started" }
 
 ---
@@ -168,7 +168,7 @@ teardown:
         transform_id: "airline-transform-start-stop"
   - match: { count: 1 }
   - match: { transforms.0.id: "airline-transform-start-stop" }
-  - match: { transforms.0.state.indexer_state: "started" }
+  - match: { transforms.0.state.indexer_state: "/started|indexing/" }
   - match: { transforms.0.state.task_state: "started" }
 
   - do:
@@ -194,7 +194,7 @@ teardown:
         transform_id: "airline-transform-start-later"
   - match: { count: 1 }
   - match: { transforms.0.id: "airline-transform-start-later" }
-  - match: { transforms.0.state.indexer_state: "started" }
+  - match: { transforms.0.state.indexer_state: "/started|indexing/" }
   - match: { transforms.0.state.task_state: "started" }
 
   - do:

+ 3 - 3
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml

@@ -47,13 +47,13 @@ teardown:
         transform_id: "airline-transform-stats"
   - match: { count: 1 }
   - match: { transforms.0.id: "airline-transform-stats" }
-  - match: { transforms.0.state.indexer_state: "started" }
+  - match: { transforms.0.state.indexer_state: "/started|indexing/" }
   - match: { transforms.0.state.task_state: "started" }
   - match: { transforms.0.state.checkpoint: 0 }
   - match: { transforms.0.stats.pages_processed: 0 }
   - match: { transforms.0.stats.documents_processed: 0 }
   - match: { transforms.0.stats.documents_indexed: 0 }
-  - match: { transforms.0.stats.trigger_count: 0 }
+  - match: { transforms.0.stats.trigger_count: 1 }
   - match: { transforms.0.stats.index_time_in_ms: 0 }
   - match: { transforms.0.stats.index_total: 0 }
   - match: { transforms.0.stats.index_failures: 0 }
@@ -172,7 +172,7 @@ teardown:
         transform_id: "_all"
   - match: { count: 2 }
   - match: { transforms.0.id: "airline-transform-stats" }
-  - match: { transforms.0.state.indexer_state: "started" }
+  - match: { transforms.0.state.indexer_state: "/started|indexing/" }
   - match: { transforms.1.id: "airline-transform-stats-dos" }
   - match: { transforms.1.state.indexer_state: "stopped" }