|
@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.dataframe.process;
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
|
|
+import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
|
|
|
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
|
|
import org.elasticsearch.client.Client;
|
|
@@ -15,9 +16,11 @@ import org.elasticsearch.common.Nullable;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.xpack.core.ClientHelper;
|
|
|
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
|
|
|
+import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
|
|
|
+import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
|
|
|
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
|
|
import org.elasticsearch.xpack.ml.MachineLearning;
|
|
|
-import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction;
|
|
|
+import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask;
|
|
|
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
|
|
|
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
|
|
|
|
|
@@ -46,7 +49,7 @@ public class AnalyticsProcessManager {
|
|
|
this.processFactory = Objects.requireNonNull(analyticsProcessFactory);
|
|
|
}
|
|
|
|
|
|
- public void runJob(TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config,
|
|
|
+ public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config,
|
|
|
DataFrameDataExtractorFactory dataExtractorFactory, Consumer<Exception> finishHandler) {
|
|
|
threadPool.generic().execute(() -> {
|
|
|
if (task.isStopping()) {
|
|
@@ -61,10 +64,10 @@ public class AnalyticsProcessManager {
|
|
|
+ "] Could not create process as one already exists"));
|
|
|
return;
|
|
|
}
|
|
|
- if (processContext.startProcess(dataExtractorFactory, config)) {
|
|
|
+ if (processContext.startProcess(dataExtractorFactory, config, task)) {
|
|
|
ExecutorService executorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME);
|
|
|
- executorService.execute(() -> processContext.resultProcessor.process(processContext.process));
|
|
|
- executorService.execute(() -> processData(task.getAllocationId(), config, processContext.dataExtractor,
|
|
|
+ executorService.execute(() -> processResults(processContext));
|
|
|
+ executorService.execute(() -> processData(task, config, processContext.dataExtractor,
|
|
|
processContext.process, processContext.resultProcessor, finishHandler));
|
|
|
} else {
|
|
|
finishHandler.accept(null);
|
|
@@ -72,8 +75,17 @@ public class AnalyticsProcessManager {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- private void processData(long taskAllocationId, DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor,
|
|
|
+ private void processResults(ProcessContext processContext) {
|
|
|
+ try {
|
|
|
+ processContext.resultProcessor.process(processContext.process);
|
|
|
+ } catch (Exception e) {
|
|
|
+ processContext.setFailureReason(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor,
|
|
|
AnalyticsProcess process, AnalyticsResultProcessor resultProcessor, Consumer<Exception> finishHandler) {
|
|
|
+
|
|
|
try {
|
|
|
writeHeaderRecord(dataExtractor, process);
|
|
|
writeDataRows(dataExtractor, process);
|
|
@@ -82,26 +94,28 @@ public class AnalyticsProcessManager {
|
|
|
|
|
|
LOGGER.info("[{}] Waiting for result processor to complete", config.getId());
|
|
|
resultProcessor.awaitForCompletion();
|
|
|
+ processContextByAllocation.get(task.getAllocationId()).setFailureReason(resultProcessor.getFailure());
|
|
|
+
|
|
|
refreshDest(config);
|
|
|
LOGGER.info("[{}] Result processor has completed", config.getId());
|
|
|
- } catch (IOException e) {
|
|
|
- LOGGER.error(new ParameterizedMessage("[{}] Error writing data to the process", config.getId()), e);
|
|
|
- // TODO Handle this failure by setting the task state to FAILED
|
|
|
+ } catch (Exception e) {
|
|
|
+ String errorMsg = new ParameterizedMessage("[{}] Error while processing data", config.getId()).getFormattedMessage();
|
|
|
+ processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg);
|
|
|
} finally {
|
|
|
- LOGGER.info("[{}] Closing process", config.getId());
|
|
|
- try {
|
|
|
- process.close();
|
|
|
- LOGGER.info("[{}] Closed process", config.getId());
|
|
|
+ closeProcess(task);
|
|
|
|
|
|
+ ProcessContext processContext = processContextByAllocation.remove(task.getAllocationId());
|
|
|
+ LOGGER.debug("Removed process context for task [{}]; [{}] processes still running", config.getId(),
|
|
|
+ processContextByAllocation.size());
|
|
|
+
|
|
|
+ if (processContext.getFailureReason() == null) {
|
|
|
// This results in marking the persistent task as complete
|
|
|
+ LOGGER.info("[{}] Marking task completed", config.getId());
|
|
|
finishHandler.accept(null);
|
|
|
- } catch (IOException e) {
|
|
|
- LOGGER.error("[{}] Error closing data frame analyzer process", config.getId());
|
|
|
- finishHandler.accept(e);
|
|
|
+ } else {
|
|
|
+ LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason());
|
|
|
+ updateTaskState(task, DataFrameAnalyticsState.FAILED, processContext.getFailureReason());
|
|
|
}
|
|
|
- processContextByAllocation.remove(taskAllocationId);
|
|
|
- LOGGER.debug("Removed process context for task [{}]; [{}] processes still running", config.getId(),
|
|
|
- processContextByAllocation.size());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -142,15 +156,34 @@ public class AnalyticsProcessManager {
|
|
|
process.writeRecord(headerRecord);
|
|
|
}
|
|
|
|
|
|
- private AnalyticsProcess createProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig) {
|
|
|
+ private AnalyticsProcess createProcess(DataFrameAnalyticsTask task, AnalyticsProcessConfig analyticsProcessConfig) {
|
|
|
ExecutorService executorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME);
|
|
|
- AnalyticsProcess process = processFactory.createAnalyticsProcess(jobId, analyticsProcessConfig, executorService);
|
|
|
+ AnalyticsProcess process = processFactory.createAnalyticsProcess(task.getParams().getId(), analyticsProcessConfig,
|
|
|
+ executorService, onProcessCrash(task));
|
|
|
if (process.isProcessAlive() == false) {
|
|
|
throw ExceptionsHelper.serverError("Failed to start data frame analytics process");
|
|
|
}
|
|
|
return process;
|
|
|
}
|
|
|
|
|
|
+ private Consumer<String> onProcessCrash(DataFrameAnalyticsTask task) {
|
|
|
+ return reason -> {
|
|
|
+ ProcessContext processContext = processContextByAllocation.get(task.getAllocationId());
|
|
|
+ if (processContext != null) {
|
|
|
+ processContext.setFailureReason(reason);
|
|
|
+ processContext.stop();
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ private void updateTaskState(DataFrameAnalyticsTask task, DataFrameAnalyticsState state, @Nullable String reason) {
|
|
|
+ DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, task.getAllocationId(), reason);
|
|
|
+ task.updatePersistentTaskState(newTaskState, ActionListener.wrap(
|
|
|
+ updatedTask -> LOGGER.info("[{}] Successfully update task state to [{}]", task.getParams().getId(), state),
|
|
|
+ e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}]", task.getParams().getId(), state), e)
|
|
|
+ ));
|
|
|
+ }
|
|
|
+
|
|
|
@Nullable
|
|
|
public Integer getProgressPercent(long allocationId) {
|
|
|
ProcessContext processContext = processContextByAllocation.get(allocationId);
|
|
@@ -162,13 +195,29 @@ public class AnalyticsProcessManager {
|
|
|
() -> client.execute(RefreshAction.INSTANCE, new RefreshRequest(config.getDest().getIndex())).actionGet());
|
|
|
}
|
|
|
|
|
|
- public void stop(TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask task) {
|
|
|
+ private void closeProcess(DataFrameAnalyticsTask task) {
|
|
|
+ String configId = task.getParams().getId();
|
|
|
+ LOGGER.info("[{}] Closing process", configId);
|
|
|
+
|
|
|
+ ProcessContext processContext = processContextByAllocation.get(task.getAllocationId());
|
|
|
+ try {
|
|
|
+ processContext.process.close();
|
|
|
+ LOGGER.info("[{}] Closed process", configId);
|
|
|
+ } catch (IOException e) {
|
|
|
+ String errorMsg = new ParameterizedMessage("[{}] Error closing data frame analyzer process [{}]"
|
|
|
+ , configId, e.getMessage()).getFormattedMessage();
|
|
|
+ processContext.setFailureReason(errorMsg);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void stop(DataFrameAnalyticsTask task) {
|
|
|
ProcessContext processContext = processContextByAllocation.get(task.getAllocationId());
|
|
|
if (processContext != null) {
|
|
|
LOGGER.debug("[{}] Stopping process", task.getParams().getId() );
|
|
|
processContext.stop();
|
|
|
} else {
|
|
|
LOGGER.debug("[{}] No process context to stop", task.getParams().getId() );
|
|
|
+ task.markAsCompleted();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -180,6 +229,7 @@ public class AnalyticsProcessManager {
|
|
|
private volatile AnalyticsResultProcessor resultProcessor;
|
|
|
private final AtomicInteger progressPercent = new AtomicInteger(0);
|
|
|
private volatile boolean processKilled;
|
|
|
+ private volatile String failureReason;
|
|
|
|
|
|
ProcessContext(String id) {
|
|
|
this.id = Objects.requireNonNull(id);
|
|
@@ -197,6 +247,17 @@ public class AnalyticsProcessManager {
|
|
|
this.progressPercent.set(progressPercent);
|
|
|
}
|
|
|
|
|
|
+ private synchronized void setFailureReason(String failureReason) {
|
|
|
+ // Only set the new reason if there isn't one already as we want to keep the first reason
|
|
|
+ if (failureReason != null) {
|
|
|
+ this.failureReason = failureReason;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private String getFailureReason() {
|
|
|
+ return failureReason;
|
|
|
+ }
|
|
|
+
|
|
|
public synchronized void stop() {
|
|
|
LOGGER.debug("[{}] Stopping process", id);
|
|
|
processKilled = true;
|
|
@@ -215,14 +276,15 @@ public class AnalyticsProcessManager {
|
|
|
/**
|
|
|
* @return {@code true} if the process was started or {@code false} if it was not because it was stopped in the meantime
|
|
|
*/
|
|
|
- private synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtractorFactory, DataFrameAnalyticsConfig config) {
|
|
|
+ private synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtractorFactory, DataFrameAnalyticsConfig config,
|
|
|
+ DataFrameAnalyticsTask task) {
|
|
|
if (processKilled) {
|
|
|
// The job was stopped before we started the process so no need to start it
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
dataExtractor = dataExtractorFactory.newExtractor(false);
|
|
|
- process = createProcess(config.getId(), createProcessConfig(config, dataExtractor));
|
|
|
+ process = createProcess(task, createProcessConfig(config, dataExtractor));
|
|
|
DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), client,
|
|
|
dataExtractorFactory.newExtractor(true));
|
|
|
resultProcessor = new AnalyticsResultProcessor(id, dataFrameRowsJoiner, this::isProcessKilled, this::setProgressPercent);
|