|
@@ -11,14 +11,10 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|
|
import org.elasticsearch.Version;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.LatchedActionListener;
|
|
|
-import org.elasticsearch.action.support.WriteRequest;
|
|
|
import org.elasticsearch.common.Nullable;
|
|
|
-import org.elasticsearch.common.xcontent.ToXContent;
|
|
|
-import org.elasticsearch.common.xcontent.ToXContentObject;
|
|
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
|
|
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
|
|
import org.elasticsearch.license.License;
|
|
|
-import org.elasticsearch.xpack.core.ml.MlStatsIndex;
|
|
|
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
|
|
|
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification;
|
|
|
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression;
|
|
@@ -31,18 +27,16 @@ import org.elasticsearch.xpack.core.ml.inference.TrainedModelDefinition;
|
|
|
import org.elasticsearch.xpack.core.ml.inference.TrainedModelInput;
|
|
|
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
|
|
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
|
|
-import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
|
|
|
import org.elasticsearch.xpack.core.security.user.XPackUser;
|
|
|
import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult;
|
|
|
import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults;
|
|
|
import org.elasticsearch.xpack.ml.dataframe.stats.StatsHolder;
|
|
|
+import org.elasticsearch.xpack.ml.dataframe.stats.StatsPersister;
|
|
|
import org.elasticsearch.xpack.ml.extractor.ExtractedField;
|
|
|
import org.elasticsearch.xpack.ml.extractor.MultiField;
|
|
|
import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider;
|
|
|
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
|
|
|
-import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
|
|
|
|
|
|
-import java.io.IOException;
|
|
|
import java.time.Instant;
|
|
|
import java.util.Collections;
|
|
|
import java.util.Iterator;
|
|
@@ -51,7 +45,6 @@ import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.function.Function;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
import static java.util.stream.Collectors.toList;
|
|
@@ -77,7 +70,7 @@ public class AnalyticsResultProcessor {
|
|
|
private final StatsHolder statsHolder;
|
|
|
private final TrainedModelProvider trainedModelProvider;
|
|
|
private final DataFrameAnalyticsAuditor auditor;
|
|
|
- private final ResultsPersisterService resultsPersisterService;
|
|
|
+ private final StatsPersister statsPersister;
|
|
|
private final List<ExtractedField> fieldNames;
|
|
|
private final CountDownLatch completionLatch = new CountDownLatch(1);
|
|
|
private volatile String failure;
|
|
@@ -85,14 +78,13 @@ public class AnalyticsResultProcessor {
|
|
|
|
|
|
public AnalyticsResultProcessor(DataFrameAnalyticsConfig analytics, DataFrameRowsJoiner dataFrameRowsJoiner,
|
|
|
StatsHolder statsHolder, TrainedModelProvider trainedModelProvider,
|
|
|
- DataFrameAnalyticsAuditor auditor, ResultsPersisterService resultsPersisterService,
|
|
|
- List<ExtractedField> fieldNames) {
|
|
|
+ DataFrameAnalyticsAuditor auditor, StatsPersister statsPersister, List<ExtractedField> fieldNames) {
|
|
|
this.analytics = Objects.requireNonNull(analytics);
|
|
|
this.dataFrameRowsJoiner = Objects.requireNonNull(dataFrameRowsJoiner);
|
|
|
this.statsHolder = Objects.requireNonNull(statsHolder);
|
|
|
this.trainedModelProvider = Objects.requireNonNull(trainedModelProvider);
|
|
|
this.auditor = Objects.requireNonNull(auditor);
|
|
|
- this.resultsPersisterService = Objects.requireNonNull(resultsPersisterService);
|
|
|
+ this.statsPersister = Objects.requireNonNull(statsPersister);
|
|
|
this.fieldNames = Collections.unmodifiableList(Objects.requireNonNull(fieldNames));
|
|
|
}
|
|
|
|
|
@@ -112,6 +104,7 @@ public class AnalyticsResultProcessor {
|
|
|
|
|
|
public void cancel() {
|
|
|
dataFrameRowsJoiner.cancel();
|
|
|
+ statsPersister.cancel();
|
|
|
isCancelled = true;
|
|
|
}
|
|
|
|
|
@@ -177,22 +170,22 @@ public class AnalyticsResultProcessor {
|
|
|
MemoryUsage memoryUsage = result.getMemoryUsage();
|
|
|
if (memoryUsage != null) {
|
|
|
statsHolder.setMemoryUsage(memoryUsage);
|
|
|
- indexStatsResult(memoryUsage, memoryUsage::documentId);
|
|
|
+ statsPersister.persistWithRetry(memoryUsage, memoryUsage::documentId);
|
|
|
}
|
|
|
OutlierDetectionStats outlierDetectionStats = result.getOutlierDetectionStats();
|
|
|
if (outlierDetectionStats != null) {
|
|
|
statsHolder.setAnalysisStats(outlierDetectionStats);
|
|
|
- indexStatsResult(outlierDetectionStats, outlierDetectionStats::documentId);
|
|
|
+ statsPersister.persistWithRetry(outlierDetectionStats, outlierDetectionStats::documentId);
|
|
|
}
|
|
|
ClassificationStats classificationStats = result.getClassificationStats();
|
|
|
if (classificationStats != null) {
|
|
|
statsHolder.setAnalysisStats(classificationStats);
|
|
|
- indexStatsResult(classificationStats, classificationStats::documentId);
|
|
|
+ statsPersister.persistWithRetry(classificationStats, classificationStats::documentId);
|
|
|
}
|
|
|
RegressionStats regressionStats = result.getRegressionStats();
|
|
|
if (regressionStats != null) {
|
|
|
statsHolder.setAnalysisStats(regressionStats);
|
|
|
- indexStatsResult(regressionStats, regressionStats::documentId);
|
|
|
+ statsPersister.persistWithRetry(regressionStats, regressionStats::documentId);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -275,23 +268,4 @@ public class AnalyticsResultProcessor {
|
|
|
failure = "error processing results; " + e.getMessage();
|
|
|
auditor.error(analytics.getId(), "Error processing results; " + e.getMessage());
|
|
|
}
|
|
|
-
|
|
|
- private void indexStatsResult(ToXContentObject result, Function<String, String> docIdSupplier) {
|
|
|
- try {
|
|
|
- resultsPersisterService.indexWithRetry(analytics.getId(),
|
|
|
- MlStatsIndex.writeAlias(),
|
|
|
- result,
|
|
|
- new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")),
|
|
|
- WriteRequest.RefreshPolicy.IMMEDIATE,
|
|
|
- docIdSupplier.apply(analytics.getId()),
|
|
|
- () -> isCancelled == false,
|
|
|
- errorMsg -> auditor.error(analytics.getId(),
|
|
|
- "failed to persist result with id [" + docIdSupplier.apply(analytics.getId()) + "]; " + errorMsg)
|
|
|
- );
|
|
|
- } catch (IOException ioe) {
|
|
|
- LOGGER.error(() -> new ParameterizedMessage("[{}] Failed serializing stats result", analytics.getId()), ioe);
|
|
|
- } catch (Exception e) {
|
|
|
- LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", analytics.getId()), e);
|
|
|
- }
|
|
|
- }
|
|
|
}
|