Browse Source

[ML] Delete obsolete snapshot stats after upgrade (#121661) (#122396)

Ensure that the old snapshot model_size_stats document is removed after the snapshot upgrade.
Valeriy Khakhutskyy 8 tháng trước cách đây
mục cha
commit
ace8cd75df

+ 97 - 28
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/JobModelSnapshotUpgrader.java

@@ -12,19 +12,27 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.common.CheckedSupplier;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
-import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.IOUtils;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
 import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
+import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
+import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
+import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeState;
 import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeTaskState;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
@@ -44,9 +52,7 @@ import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
@@ -153,6 +159,55 @@ public final class JobModelSnapshotUpgrader {
         executor.execute();
     }
 
+    private void removeDuplicateModelSnapshotDoc(Consumer<Exception> runAfter) {
+        String snapshotDocId = jobId + "_model_snapshot_" + snapshotId;
+        client.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexPattern())
+            .setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.idsQuery().addIds(snapshotDocId)))
+            .setSize(2)
+            .addSort(ModelSnapshot.MIN_VERSION.getPreferredName(), org.elasticsearch.search.sort.SortOrder.ASC)
+            .execute(ActionListener.wrap(searchResponse -> {
+                if (searchResponse.getHits().getTotalHits().value > 1) {
+                    deleteOlderSnapshotDoc(searchResponse, runAfter);
+                } else {
+                    onFinish.accept(null);
+                }
+            }, e -> {
+                logger.warn(() -> format("[%s] [%s] error during search for model snapshot documents", jobId, snapshotId), e);
+                onFinish.accept(null);
+            }));
+    }
+
+    private void deleteOlderSnapshotDoc(SearchResponse searchResponse, Consumer<Exception> runAfter) {
+        SearchHit firstHit = searchResponse.getHits().getAt(0);
+        logger.debug(() -> format("[%s] deleting duplicate model snapshot doc [%s]", jobId, firstHit.getId()));
+        client.prepareDelete()
+            .setIndex(firstHit.getIndex())
+            .setId(firstHit.getId())
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+            .execute(ActionListener.runAfter(ActionListener.wrap(deleteResponse -> {
+                if ((deleteResponse.getResult() == DocWriteResponse.Result.DELETED) == false) {
+                    logger.warn(
+                        () -> format(
+                            "[%s] [%s] failed to delete old snapshot [%s] result document, document not found",
+                            jobId,
+                            snapshotId,
+                            ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName()
+                        )
+                    );
+                }
+            }, e -> {
+                logger.warn(
+                    () -> format(
+                        "[%s] [%s] failed to delete old snapshot [%s] result document",
+                        jobId,
+                        snapshotId,
+                        ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName()
+                    ),
+                    e
+                );
+            }), () -> runAfter.accept(null)));
+    }
+
     void setTaskToFailed(String reason, ActionListener<PersistentTask<?>> listener) {
         SnapshotUpgradeTaskState taskState = new SnapshotUpgradeTaskState(SnapshotUpgradeState.FAILED, task.getAllocationId(), reason);
         task.updatePersistentTaskState(taskState, ActionListener.wrap(listener::onResponse, f -> {
@@ -259,7 +314,7 @@ public final class JobModelSnapshotUpgrader {
                 logger.error(() -> format("[%s] [%s] failed to write old state", jobId, snapshotId), e);
                 setTaskToFailed(
                     "Failed to write old state due to: " + e.getMessage(),
-                    ActionListener.wrap(t -> shutdown(e), f -> shutdown(e))
+                    ActionListener.running(() -> shutdownWithFailure(e))
                 );
                 return;
             }
@@ -273,7 +328,7 @@ public final class JobModelSnapshotUpgrader {
                     logger.error(() -> format("[%s] [%s] failed to flush after writing old state", jobId, snapshotId), e);
                     nextStep = () -> setTaskToFailed(
                         "Failed to flush after writing old state due to: " + e.getMessage(),
-                        ActionListener.wrap(t -> shutdown(e), f -> shutdown(e))
+                        ActionListener.running(() -> shutdownWithFailure(e))
                     );
                 } else {
                     logger.debug(
@@ -295,7 +350,7 @@ public final class JobModelSnapshotUpgrader {
                 new SnapshotUpgradeTaskState(SnapshotUpgradeState.SAVING_NEW_STATE, task.getAllocationId(), ""),
                 ActionListener.wrap(readingNewState -> {
                     if (continueRunning.get() == false) {
-                        shutdown(null);
+                        shutdownWithFailure(null);
                         return;
                     }
                     submitOperation(() -> {
@@ -310,12 +365,12 @@ public final class JobModelSnapshotUpgrader {
                         // Execute callback in the UTILITY thread pool, as the current thread in the callback will be one in the
                         // autodetectWorkerExecutor. Trying to run the callback in that executor will cause a dead lock as that
                         // executor has a single processing queue.
-                        (aVoid, e) -> threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> shutdown(e))
+                        (aVoid, e) -> threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> handlePersistingState(e))
                     );
                     logger.debug("[{}] [{}] asked for state to be persisted", jobId, snapshotId);
                 }, f -> {
                     logger.error(() -> format("[%s] [%s] failed to update snapshot upgrader task to started", jobId, snapshotId), f);
-                    shutdown(
+                    shutdownWithFailure(
                         new ElasticsearchStatusException(
                             "Failed to start snapshot upgrade [{}] for job [{}]",
                             RestStatus.INTERNAL_SERVER_ERROR,
@@ -378,17 +433,45 @@ public final class JobModelSnapshotUpgrader {
             }
         }
 
-        void shutdown(Exception e) {
+        private void handlePersistingState(@Nullable Exception exception) {
+            assert Thread.currentThread().getName().contains(UTILITY_THREAD_POOL_NAME);
+
+            if (exception != null) {
+                shutdownWithFailure(exception);
+            } else {
+                stopProcess((aVoid, e) -> {
+                    threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> {
+                        autodetectWorkerExecutor.shutdownNow();
+                        // If there are two snapshot documents in the results indices with the same snapshot id,
+                        // remove the old one. This can happen when the result index has been rolled over and
+                        // the write alias is pointing to the new index.
+                        removeDuplicateModelSnapshotDoc(onFinish);
+                    });
+
+                });
+            }
+        }
+
+        void shutdownWithFailure(Exception e) {
+            stopProcess((aVoid, ignored) -> {
+                threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(() -> {
+                    onFinish.accept(e);
+                    autodetectWorkerExecutor.shutdownNow();
+                });
+            });
+        }
+
+        private void stopProcess(BiConsumer<Class<Void>, Exception> runNext) {
             logger.debug("[{}] [{}] shutdown initiated", jobId, snapshotId);
             // No point in sending an action to the executor if the process has died
             if (process.isProcessAlive() == false) {
                 logger.debug("[{}] [{}] process is dead, no need to shutdown", jobId, snapshotId);
-                onFinish.accept(e);
-                autodetectWorkerExecutor.shutdownNow();
                 stateStreamer.cancel();
+                runNext.accept(null, null);
                 return;
             }
-            Future<?> future = autodetectWorkerExecutor.submit(() -> {
+
+            submitOperation(() -> {
                 try {
                     logger.debug("[{}] [{}] shutdown is now occurring", jobId, snapshotId);
                     if (process.isReady()) {
@@ -401,24 +484,10 @@ public final class JobModelSnapshotUpgrader {
                     processor.awaitCompletion();
                 } catch (IOException | TimeoutException exc) {
                     logger.warn(() -> format("[%s] [%s] failed to shutdown process", jobId, snapshotId), exc);
-                } finally {
-                    onFinish.accept(e);
                 }
                 logger.debug("[{}] [{}] connection for upgrade has been closed, process is shutdown", jobId, snapshotId);
-            });
-            try {
-                future.get();
-                autodetectWorkerExecutor.shutdownNow();
-            } catch (InterruptedException interrupt) {
-                Thread.currentThread().interrupt();
-            } catch (ExecutionException executionException) {
-                if (processor.isProcessKilled()) {
-                    // In this case the original exception is spurious and highly misleading
-                    throw ExceptionsHelper.conflictStatusException("close snapshot upgrade interrupted by kill request");
-                } else {
-                    throw FutureUtils.rethrowExecutionException(executionException);
-                }
-            }
+                return Void.TYPE;
+            }, runNext);
         }
     }
 }

+ 8 - 2
x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlJobSnapshotUpgradeIT.java

@@ -65,7 +65,6 @@ public class MlJobSnapshotUpgradeIT extends AbstractUpgradeTestCase {
      * The purpose of this test is to ensure that when a job is open through a rolling upgrade we upgrade the results
      * index mappings when it is assigned to an upgraded node even if no other ML endpoint is called after the upgrade
      */
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/98560")
     public void testSnapshotUpgrader() throws Exception {
         Request adjustLoggingLevels = new Request("PUT", "/_cluster/settings");
         adjustLoggingLevels.setJsonEntity("""
@@ -98,6 +97,13 @@ public class MlJobSnapshotUpgradeIT extends AbstractUpgradeTestCase {
 
     @SuppressWarnings("unchecked")
     private void testSnapshotUpgradeFailsOnMixedCluster() throws Exception {
+        // TODO the mixed cluster assertions sometimes fail because the code that
+        // detects the mixed cluster relies on the transport versions being different.
+        // This assumption does not hold immediately after a version bump and new
+        // branch being cut as the new branch will have the same transport version
+        // See https://github.com/elastic/elasticsearch/issues/98560
+
+        assumeTrue("The mixed cluster is not always detected correctly, see https://github.com/elastic/elasticsearch/issues/98560", false);
         Map<String, Object> jobs = entityAsMap(getJob(JOB_ID));
 
         String currentSnapshot = ((List<String>) XContentMapValues.extractValue("jobs.model_snapshot_id", jobs)).get(0);
@@ -154,7 +160,7 @@ public class MlJobSnapshotUpgradeIT extends AbstractUpgradeTestCase {
 
         List<Map<String, Object>> upgradedSnapshot = (List<Map<String, Object>>) entityAsMap(getModelSnapshots(JOB_ID, snapshotToUpgradeId))
             .get("model_snapshots");
-        assertThat(upgradedSnapshot, hasSize(1));
+        assertThat(upgradedSnapshot.toString(), upgradedSnapshot, hasSize(1));
         assertThat(upgradedSnapshot.get(0).get("latest_record_time_stamp"), equalTo(snapshotToUpgrade.get("latest_record_time_stamp")));
 
         // Does the snapshot still work?