Browse Source

[ML] Delete dest index and reindex if incompatible (#62960)

Data frame analytics results format changed in version `7.10.0`.
If existing jobs that were not completed are restarted, it is
possible the destination index had already been created. That index's
mappings are not suitable for the new results format.

This commit checks the version of the destination index and deletes
it when the version is outdated. The job will then continue by
recreating the destination index and reindexing.
Dimitris Athanasiou 5 years ago
parent
commit
0361758f30

+ 24 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java

@@ -25,7 +25,9 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.client.ParentTaskAssigningClient;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.mapper.SeqNoFieldMapper;
@@ -90,6 +92,28 @@ public class DataFrameAnalyticsManager {
         // With config in hand, determine action to take
         ActionListener<DataFrameAnalyticsConfig> configListener = ActionListener.wrap(
             config -> {
+                // Check if existing destination index is incompatible.
+                // If it is, we delete it and start from reindexing.
+                IndexMetadata destIndex = clusterState.getMetadata().index(config.getDest().getIndex());
+                if (destIndex != null) {
+                    MappingMetadata destIndexMapping = clusterState.getMetadata().index(config.getDest().getIndex()).mapping();
+                    DestinationIndex.Metadata metadata = DestinationIndex.readMetadata(config.getId(), destIndexMapping);
+                    if (metadata.hasMetadata() && (metadata.isCompatible() == false)) {
+                        LOGGER.info("[{}] Destination index was created in version [{}] but minimum supported version is [{}]. " +
+                            "Deleting index and starting from scratch.", config.getId(), metadata.getVersion(),
+                            DestinationIndex.MIN_COMPATIBLE_VERSION);
+                        task.getStatsHolder().resetProgressTracker(config.getAnalysis().getProgressPhases(),
+                            config.getAnalysis().supportsInference());
+                        DataFrameAnalyticsTaskState reindexingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.REINDEXING,
+                            task.getAllocationId(), "destination index was out of date");
+                        task.updatePersistentTaskState(reindexingState, ActionListener.wrap(
+                            updatedTask -> executeJobInMiddleOfReindexing(task, config),
+                            task::setFailed
+                        ));
+                        return;
+                    }
+                }
+
                 task.getStatsHolder().adjustProgressTracker(config.getAnalysis().getProgressPhases(),
                     config.getAnalysis().supportsInference());
 

+ 92 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndex.java

@@ -5,6 +5,9 @@
  */
 package org.elasticsearch.xpack.ml.dataframe;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
@@ -44,6 +47,8 @@ import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
  */
 public final class DestinationIndex {
 
+    private static final Logger logger = LogManager.getLogger(DestinationIndex.class);
+
     public static final String INCREMENTAL_ID = "ml__incremental_id";
 
     /**
@@ -61,6 +66,8 @@ public final class DestinationIndex {
     private static final String PROPERTIES = "properties";
     private static final String META = "_meta";
 
+    private static final String DFA_CREATOR = "data-frame-analytics";
+
     /**
      * We only preserve the most important settings.
      * If the user needs other settings on the destination index they
@@ -68,6 +75,13 @@ public final class DestinationIndex {
      */
     private static final String[] PRESERVED_SETTINGS = new String[] {"index.number_of_shards", "index.number_of_replicas"};
 
+    /**
+     * This is the minimum compatible version of the destination index we can currently work with.
+     * If the results mappings change in a way existing destination indices will fail to index
+     * the results, this should be bumped accordingly.
+     */
+    public static final Version MIN_COMPATIBLE_VERSION = Version.V_7_10_0;
+
     private DestinationIndex() {}
 
     /**
@@ -125,7 +139,7 @@ public final class DestinationIndex {
         checkResultsFieldIsNotPresentInProperties(config, properties);
         properties.putAll(createAdditionalMappings(config, Collections.unmodifiableMap(properties)));
         Map<String, Object> metadata = getOrPutDefault(mappingsAsMap, META, HashMap::new);
-        metadata.putAll(createMetadata(config.getId(), clock));
+        metadata.putAll(createMetadata(config.getId(), clock, Version.CURRENT));
         return new CreateIndexRequest(destinationIndex, settings).mapping(mappingsAsMap);
     }
 
@@ -164,11 +178,12 @@ public final class DestinationIndex {
         return properties;
     }
 
-    private static Map<String, Object> createMetadata(String analyticsId, Clock clock) {
+    // Visible for testing
+    static Map<String, Object> createMetadata(String analyticsId, Clock clock, Version version) {
         Map<String, Object> metadata = new HashMap<>();
         metadata.put(CREATION_DATE_MILLIS, clock.millis());
-        metadata.put(CREATED_BY, "data-frame-analytics");
-        metadata.put(VERSION, Map.of(CREATED, Version.CURRENT));
+        metadata.put(CREATED_BY, DFA_CREATOR);
+        metadata.put(VERSION, Map.of(CREATED, version.toString()));
         metadata.put(ANALYTICS, analyticsId);
         return metadata;
     }
@@ -220,4 +235,77 @@ public final class DestinationIndex {
                 DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName());
         }
     }
+
+    @SuppressWarnings("unchecked")
+    public static Metadata readMetadata(String jobId, MappingMetadata mappingMetadata) {
+        Map<String, Object> mappings = mappingMetadata.getSourceAsMap();
+        Map<String, Object> meta = (Map<String, Object>) mappings.get(META);
+        if ((meta == null) || (DFA_CREATOR.equals(meta.get(CREATED_BY)) == false)) {
+            return new NoMetadata();
+        }
+        return new DestMetadata(getVersion(jobId, meta));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Version getVersion(String jobId, Map<String, Object> meta) {
+        try {
+            Map<String, Object> version = (Map<String, Object>) meta.get(VERSION);
+            String createdVersionString = (String) version.get(CREATED);
+            return Version.fromString(createdVersionString);
+        } catch (Exception e) {
+            logger.error(new ParameterizedMessage("[{}] Could not retrieve destination index version", jobId), e);
+            return null;
+        }
+    }
+
+    public interface Metadata {
+
+        boolean hasMetadata();
+
+        boolean isCompatible();
+
+        String getVersion();
+    }
+
+    private static class NoMetadata implements Metadata {
+
+        @Override
+        public boolean hasMetadata() {
+            return false;
+        }
+
+        @Override
+        public boolean isCompatible() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String getVersion() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private static class DestMetadata implements Metadata {
+
+        private final Version version;
+
+        private DestMetadata(Version version) {
+            this.version = version;
+        }
+
+        @Override
+        public boolean hasMetadata() {
+            return true;
+        }
+
+        @Override
+        public boolean isCompatible() {
+            return version == null ? false : version.onOrAfter(MIN_COMPATIBLE_VERSION);
+        }
+
+        @Override
+        public String getVersion() {
+            return version == null ? "unknown" : version.toString();
+        }
+    }
 }

+ 5 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolder.java

@@ -40,6 +40,11 @@ public class StatsHolder {
         progressTracker.updateReindexingProgress(reindexingProgressPercent < 100 ? 1 : reindexingProgressPercent);
     }
 
+    public void resetProgressTracker(List<String> analysisPhases, boolean hasInferencePhase) {
+        progressTracker = ProgressTracker.fromZeroes(analysisPhases, hasInferencePhase);
+        progressTracker.updateReindexingProgress(1);
+    }
+
     public ProgressTracker getProgressTracker() {
         return progressTracker;
     }

+ 79 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndexTests.java

@@ -48,6 +48,8 @@ import java.time.Clock;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
@@ -336,6 +338,83 @@ public class DestinationIndexTests extends ESTestCase {
         verifyZeroInteractions(client);
     }
 
+    public void testReadMetadata_GivenNoMeta() {
+        Map<String, Object> mappings = new HashMap<>();
+        MappingMetadata mappingMetadata = mock(MappingMetadata.class);
+        when(mappingMetadata.getSourceAsMap()).thenReturn(mappings);
+
+        DestinationIndex.Metadata metadata = DestinationIndex.readMetadata("test_id", mappingMetadata);
+
+        assertThat(metadata.hasMetadata(), is(false));
+        expectThrows(UnsupportedOperationException.class, () -> metadata.isCompatible());
+        expectThrows(UnsupportedOperationException.class, () -> metadata.getVersion());
+    }
+
+    public void testReadMetadata_GivenMetaWithoutCreatedTag() {
+        Map<String, Object> mappings = new HashMap<>();
+        mappings.put("_meta", Collections.emptyMap());
+        MappingMetadata mappingMetadata = mock(MappingMetadata.class);
+        when(mappingMetadata.getSourceAsMap()).thenReturn(mappings);
+
+        DestinationIndex.Metadata metadata = DestinationIndex.readMetadata("test_id", mappingMetadata);
+
+        assertThat(metadata.hasMetadata(), is(false));
+        expectThrows(UnsupportedOperationException.class, () -> metadata.isCompatible());
+        expectThrows(UnsupportedOperationException.class, () -> metadata.getVersion());
+    }
+
+    public void testReadMetadata_GivenMetaNotCreatedByAnalytics() {
+        Map<String, Object> mappings = new HashMap<>();
+        mappings.put("_meta", Collections.singletonMap("created", "other"));
+        MappingMetadata mappingMetadata = mock(MappingMetadata.class);
+        when(mappingMetadata.getSourceAsMap()).thenReturn(mappings);
+
+        DestinationIndex.Metadata metadata = DestinationIndex.readMetadata("test_id", mappingMetadata);
+
+        assertThat(metadata.hasMetadata(), is(false));
+        expectThrows(UnsupportedOperationException.class, () -> metadata.isCompatible());
+        expectThrows(UnsupportedOperationException.class, () -> metadata.getVersion());
+    }
+
+    public void testReadMetadata_GivenCurrentVersion() {
+        Map<String, Object> mappings = new HashMap<>();
+        mappings.put("_meta", DestinationIndex.createMetadata("test_id", Clock.systemUTC(), Version.CURRENT));
+        MappingMetadata mappingMetadata = mock(MappingMetadata.class);
+        when(mappingMetadata.getSourceAsMap()).thenReturn(mappings);
+
+        DestinationIndex.Metadata metadata = DestinationIndex.readMetadata("test_id", mappingMetadata);
+
+        assertThat(metadata.hasMetadata(), is(true));
+        assertThat(metadata.isCompatible(), is(true));
+        assertThat(metadata.getVersion(), equalTo(Version.CURRENT.toString()));
+    }
+
+    public void testReadMetadata_GivenMinCompatibleVersion() {
+        Map<String, Object> mappings = new HashMap<>();
+        mappings.put("_meta", DestinationIndex.createMetadata("test_id", Clock.systemUTC(), DestinationIndex.MIN_COMPATIBLE_VERSION));
+        MappingMetadata mappingMetadata = mock(MappingMetadata.class);
+        when(mappingMetadata.getSourceAsMap()).thenReturn(mappings);
+
+        DestinationIndex.Metadata metadata = DestinationIndex.readMetadata("test_id", mappingMetadata);
+
+        assertThat(metadata.hasMetadata(), is(true));
+        assertThat(metadata.isCompatible(), is(true));
+        assertThat(metadata.getVersion(), equalTo(DestinationIndex.MIN_COMPATIBLE_VERSION.toString()));
+    }
+
+    public void testReadMetadata_GivenIncompatibleVersion() {
+        Map<String, Object> mappings = new HashMap<>();
+        mappings.put("_meta", DestinationIndex.createMetadata("test_id", Clock.systemUTC(), Version.V_7_9_3));
+        MappingMetadata mappingMetadata = mock(MappingMetadata.class);
+        when(mappingMetadata.getSourceAsMap()).thenReturn(mappings);
+
+        DestinationIndex.Metadata metadata = DestinationIndex.readMetadata("test_id", mappingMetadata);
+
+        assertThat(metadata.hasMetadata(), is(true));
+        assertThat(metadata.isCompatible(), is(false));
+        assertThat(metadata.getVersion(), equalTo(Version.V_7_9_3.toString()));
+    }
+
     private static <Response> Answer<Response> callListenerOnResponse(Response response) {
         return invocationOnMock -> {
             @SuppressWarnings("unchecked")

+ 26 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolderTests.java

@@ -96,4 +96,30 @@ public class StatsHolderTests extends ESTestCase {
         assertThat(phaseProgresses.get(3).getProgressPercent(), equalTo(0));
         assertThat(phaseProgresses.get(4).getProgressPercent(), equalTo(0));
     }
+
+    public void testResetProgressTracker() {
+        List<PhaseProgress> phases = Collections.unmodifiableList(
+            Arrays.asList(
+                new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("reindexing", 100),
+                new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("loading_data", 20),
+                new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("a", 30),
+                new org.elasticsearch.xpack.core.ml.utils.PhaseProgress("b", 40),
+                new PhaseProgress("writing_results", 50)
+            )
+        );
+        StatsHolder statsHolder = new StatsHolder(phases);
+
+        statsHolder.resetProgressTracker(Arrays.asList("a", "b"), false);
+
+        List<PhaseProgress> phaseProgresses = statsHolder.getProgressTracker().report();
+
+        assertThat(phaseProgresses.size(), equalTo(5));
+        assertThat(phaseProgresses.stream().map(PhaseProgress::getPhase).collect(Collectors.toList()),
+            contains("reindexing", "loading_data", "a", "b", "writing_results"));
+        assertThat(phaseProgresses.get(0).getProgressPercent(), equalTo(1));
+        assertThat(phaseProgresses.get(1).getProgressPercent(), equalTo(0));
+        assertThat(phaseProgresses.get(2).getProgressPercent(), equalTo(0));
+        assertThat(phaseProgresses.get(3).getProgressPercent(), equalTo(0));
+        assertThat(phaseProgresses.get(4).getProgressPercent(), equalTo(0));
+    }
 }