1
0
Эх сурвалжийг харах

[ML] Monitor reindex response in DF analytics (#60911)

Examines the reindex response in order to report potential
problems that occurred during the reindexing phase of
data frame analytics jobs.
Dimitris Athanasiou 5 жил өмнө
parent
commit
2ed1359c72

+ 34 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java

@@ -19,6 +19,7 @@ import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
 import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.support.ContextPreservingActionListener;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.ParentTaskAssigningClient;
@@ -214,6 +215,7 @@ public class DataFrameAnalyticsManager {
         // Reindexing is complete; start analytics
         ActionListener<BulkByScrollResponse> reindexCompletedListener = ActionListener.wrap(
             reindexResponse -> {
+
                 // If the reindex task is canceled, this listener is called.
                 // Consequently, we should not signal reindex completion.
                 if (task.isStopping()) {
@@ -222,7 +224,18 @@ public class DataFrameAnalyticsManager {
                     task.markAsCompleted();
                     return;
                 }
+
                 task.setReindexingTaskId(null);
+
+                Exception reindexError = getReindexError(task.getParams().getId(), reindexResponse);
+                if (reindexError != null) {
+                    task.markAsFailed(reindexError);
+                    return;
+                }
+
+                LOGGER.debug("[{}] Reindex completed; created [{}]; retries [{}]", task.getParams().getId(),
+                    reindexResponse.getCreated(), reindexResponse.getBulkRetries());
+
                 auditor.info(
                     config.getId(),
                     Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex(),
@@ -251,6 +264,7 @@ public class DataFrameAnalyticsManager {
                 reindexRequest.setDestIndex(config.getDest().getIndex());
                 reindexRequest.setScript(new Script("ctx._source." + DestinationIndex.ID_COPY + " = ctx._id"));
                 reindexRequest.setParentTask(task.getParentTaskId());
+                reindexRequest.getSearchRequest().allowPartialSearchResults(false);
 
                 final ThreadContext threadContext = parentTaskClient.threadPool().getThreadContext();
                 final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
@@ -295,6 +309,26 @@ public class DataFrameAnalyticsManager {
                 new GetIndexRequest().indices(config.getDest().getIndex()), destIndexListener);
     }
 
+    private static Exception getReindexError(String jobId, BulkByScrollResponse reindexResponse) {
+        if (reindexResponse.getBulkFailures().isEmpty() == false) {
+            LOGGER.error("[{}] reindexing encountered {} failures", jobId,
+                reindexResponse.getBulkFailures().size());
+            for (BulkItemResponse.Failure failure : reindexResponse.getBulkFailures()) {
+                LOGGER.error("[{}] reindexing failure: {}", jobId, failure);
+            }
+            return ExceptionsHelper.serverError("reindexing encountered " + reindexResponse.getBulkFailures().size() + " failures");
+        }
+        if (reindexResponse.getReasonCancelled() != null) {
+            LOGGER.error("[{}] reindex task got cancelled with reason [{}]", jobId, reindexResponse.getReasonCancelled());
+            return ExceptionsHelper.serverError("reindex task got cancelled with reason [" + reindexResponse.getReasonCancelled() + "]");
+        }
+        if (reindexResponse.isTimedOut()) {
+            LOGGER.error("[{}] reindex task timed out after [{}]", jobId, reindexResponse.getTook().getStringRep());
+            return ExceptionsHelper.serverError("reindex task timed out after [" + reindexResponse.getTook().getStringRep() + "]");
+        }
+        return null;
+    }
+
     private static boolean isTaskCancelledException(Exception error) {
         return ExceptionsHelper.unwrapCause(error) instanceof TaskCancelledException
             || ExceptionsHelper.unwrapCause(error.getCause()) instanceof TaskCancelledException;

+ 1 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java

@@ -282,6 +282,7 @@ public class DataFrameDataExtractor {
         }
 
         return new SearchRequestBuilder(client, SearchAction.INSTANCE)
+            .setAllowPartialSearchResults(false)
             .setIndices(context.indices)
             .setSize(0)
             .setQuery(summaryQuery)