瀏覽代碼

[ML-DataFrame] make checkpointing more robust (#44344)

make checkpointing more robust:

- do not let checkpointing fail if indexes got deleted
- treat missing seqNoStats as just created indices (checkpoint 0)
- loglevel: do not treat failed updated checks as error

fixes #43992
Hendrik Muhs 6 年之前
父節點
當前提交
eb50f47440

+ 5 - 7
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpoint.java

@@ -279,18 +279,16 @@ public class DataFrameTransformCheckpoint implements Writeable, ToXContentObject
             throw new IllegalArgumentException("old checkpoint is newer than new checkpoint");
         }
 
-        // all old indices must be contained in the new ones but not vice versa
-        if (newCheckpoint.indicesCheckpoints.keySet().containsAll(oldCheckpoint.indicesCheckpoints.keySet()) == false) {
-            return -1L;
-        }
-
         // get the sum of of shard checkpoints
         // note: we require shard checkpoints to strictly increase and never decrease
         long oldCheckPointSum = 0;
         long newCheckPointSum = 0;
 
-        for (long[] v : oldCheckpoint.indicesCheckpoints.values()) {
-            oldCheckPointSum += Arrays.stream(v).sum();
+        for (Entry<String, long[]> entry : oldCheckpoint.indicesCheckpoints.entrySet()) {
+            // ignore entries that aren't part of newCheckpoint, e.g. deleted indices
+            if (newCheckpoint.indicesCheckpoints.containsKey(entry.getKey())) {
+                oldCheckPointSum += Arrays.stream(entry.getValue()).sum();
+            }
         }
 
         for (long[] v : newCheckpoint.indicesCheckpoints.values()) {

+ 3 - 2
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointTests.java

@@ -162,9 +162,10 @@ public class DataFrameTransformCheckpointTests extends AbstractSerializingDataFr
         checkpointsByIndexNew.remove(checkpointsByIndexNew.firstKey());
         assertEquals((indices - 1) * shards * 10L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew));
 
-        // remove 1st index from new, now old has 1 index more, behind can not be calculated
+        // remove 1st index from new, now old has 1 index more, which should be ignored
         checkpointsByIndexNew.remove(checkpointsByIndexNew.firstKey());
-        assertEquals(-1L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew));
+
+        assertEquals((indices - 2) * shards * 10L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew));
     }
 
     private static Map<String, long[]> randomCheckpointsByIndex() {

+ 33 - 29
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java

@@ -14,9 +14,8 @@ import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
 import org.elasticsearch.action.admin.indices.stats.ShardStats;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.index.seqno.SeqNoStats;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats;
@@ -94,7 +93,8 @@ public class DataFrameTransformsCheckpointService {
         // 1st get index to see the indexes the user has access to
         GetIndexRequest getIndexRequest = new GetIndexRequest()
             .indices(transformConfig.getSource().getIndex())
-            .features(new GetIndexRequest.Feature[0]);
+            .features(new GetIndexRequest.Feature[0])
+            .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
 
         ClientHelper.executeWithHeadersAsync(transformConfig.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, GetIndexAction.INSTANCE,
                 getIndexRequest, ActionListener.wrap(getIndexResponse -> {
@@ -105,7 +105,8 @@ public class DataFrameTransformsCheckpointService {
                         IndicesStatsAction.INSTANCE,
                         new IndicesStatsRequest()
                             .indices(transformConfig.getSource().getIndex())
-                            .clear(),
+                            .clear()
+                            .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN),
                         ActionListener.wrap(
                             response -> {
                                 if (response.getFailedShards() != 0) {
@@ -113,21 +114,18 @@ public class DataFrameTransformsCheckpointService {
                                         new CheckpointException("Source has [" + response.getFailedShards() + "] failed shards"));
                                     return;
                                 }
-                                try {
-                                    Map<String, long[]> checkpointsByIndex = extractIndexCheckPoints(response.getShards(), userIndices);
-                                    listener.onResponse(new DataFrameTransformCheckpoint(transformConfig.getId(),
-                                        timestamp,
-                                        checkpoint,
-                                        checkpointsByIndex,
-                                        timeUpperBound));
-                                } catch (CheckpointException checkpointException) {
-                                    listener.onFailure(checkpointException);
-                                }
+
+                                Map<String, long[]> checkpointsByIndex = extractIndexCheckPoints(response.getShards(), userIndices);
+                                listener.onResponse(new DataFrameTransformCheckpoint(transformConfig.getId(),
+                                    timestamp,
+                                    checkpoint,
+                                    checkpointsByIndex,
+                                    timeUpperBound));
                             },
-                            listener::onFailure
+                            e-> listener.onFailure(new CheckpointException("Failed to create checkpoint", e))
                         ));
                 },
-                listener::onFailure
+                e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e))
             ));
 
     }
@@ -223,38 +221,44 @@ public class DataFrameTransformsCheckpointService {
 
         for (ShardStats shard : shards) {
             String indexName = shard.getShardRouting().getIndexName();
+
             if (userIndices.contains(indexName)) {
-                SeqNoStats seqNoStats = shard.getSeqNoStats();
-                // SeqNoStats could be `null`. This indicates that an `AlreadyClosed` exception was thrown somewhere down the stack
-                // Indicates that the index COULD be closed, or at least that the shard is not fully recovered yet.
-                if (seqNoStats == null) {
-                    logger.warn("failure gathering checkpoint information for index [{}] as seq_no_stats were null. Shard Stats [{}]",
-                        indexName,
-                        Strings.toString(shard));
-                    throw new CheckpointException(
-                        "Unable to gather checkpoint information for index [" + indexName + "]. seq_no_stats are missing.");
-                }
+                // SeqNoStats could be `null`, assume the global checkpoint to be 0 in this case
+                long globalCheckpoint = shard.getSeqNoStats() == null ? 0 : shard.getSeqNoStats().getGlobalCheckpoint();
                 if (checkpointsByIndex.containsKey(indexName)) {
                     // we have already seen this index, just check/add shards
                     TreeMap<Integer, Long> checkpoints = checkpointsByIndex.get(indexName);
                     if (checkpoints.containsKey(shard.getShardRouting().getId())) {
                         // there is already a checkpoint entry for this index/shard combination, check if they match
-                        if (checkpoints.get(shard.getShardRouting().getId()) != shard.getSeqNoStats().getGlobalCheckpoint()) {
+                        if (checkpoints.get(shard.getShardRouting().getId()) != globalCheckpoint) {
                             throw new CheckpointException("Global checkpoints mismatch for index [" + indexName + "] between shards of id ["
                                     + shard.getShardRouting().getId() + "]");
                         }
                     } else {
                         // 1st time we see this shard for this index, add the entry for the shard
-                        checkpoints.put(shard.getShardRouting().getId(), shard.getSeqNoStats().getGlobalCheckpoint());
+                        checkpoints.put(shard.getShardRouting().getId(), globalCheckpoint);
                     }
                 } else {
                     // 1st time we see this index, create an entry for the index and add the shard checkpoint
                     checkpointsByIndex.put(indexName, new TreeMap<>());
-                    checkpointsByIndex.get(indexName).put(shard.getShardRouting().getId(), shard.getSeqNoStats().getGlobalCheckpoint());
+                    checkpointsByIndex.get(indexName).put(shard.getShardRouting().getId(), globalCheckpoint);
                 }
             }
         }
 
+        // checkpoint extraction is done in 2 steps:
+        // 1. GetIndexRequest to retrieve the indices the user has access to
+        // 2. IndicesStatsRequest to retrieve stats about indices
+        // between 1 and 2 indices could get deleted or created
+        if (logger.isDebugEnabled()) {
+            Set<String> userIndicesClone = new HashSet<>(userIndices);
+
+            userIndicesClone.removeAll(checkpointsByIndex.keySet());
+            if (userIndicesClone.isEmpty() == false) {
+                logger.debug("Original set of user indices contained more indexes [{}]", userIndicesClone);
+            }
+        }
+
         // create the final structure
         Map<String, long[]> checkpointsByIndexReduced = new TreeMap<>();
 

+ 12 - 2
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -764,7 +764,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                         }
                     }, e -> {
                         changed.set(false);
-                        logger.error("failure in update check", e);
+                        logger.warn(
+                                "Failed to detect changes for data frame transform [" + transformId + "], skipping update till next check",
+                                e);
+
+                        auditor.warning(transformId,
+                                "Failed to detect changes for data frame transform, skipping update till next check. Exception: "
+                                        + e.getMessage());
                     }), latch));
 
             try {
@@ -773,7 +779,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                     return changed.get();
                 }
             } catch (InterruptedException e) {
-                logger.error("Failed to check for update", e);
+                logger.warn("Failed to detect changes for data frame transform [" + transformId + "], skipping update till next check", e);
+
+                auditor.warning(transformId,
+                        "Failed to detect changes for data frame transform, skipping update till next check. Exception: "
+                                + e.getMessage());
             }
 
             return false;

+ 33 - 6
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointServiceTests.java

@@ -52,7 +52,24 @@ public class DataFrameTransformsCheckpointServiceTests extends ESTestCase {
         Map<String, long[]> expectedCheckpoints = new HashMap<>();
         Set<String> indices = randomUserIndices();
 
-        ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false);
+        ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false, false);
+
+        Map<String, long[]> checkpoints = DataFrameTransformsCheckpointService.extractIndexCheckPoints(shardStatsArray, indices);
+
+        assertEquals(expectedCheckpoints.size(), checkpoints.size());
+        assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet());
+
+        // low-level compare
+        for (Entry<String, long[]> entry : expectedCheckpoints.entrySet()) {
+            assertTrue(Arrays.equals(entry.getValue(), checkpoints.get(entry.getKey())));
+        }
+    }
+
+    public void testExtractIndexCheckpointsMissingSeqNoStats() {
+        Map<String, long[]> expectedCheckpoints = new HashMap<>();
+        Set<String> indices = randomUserIndices();
+
+        ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false, true);
 
         Map<String, long[]> checkpoints = DataFrameTransformsCheckpointService.extractIndexCheckPoints(shardStatsArray, indices);
 
@@ -69,7 +86,7 @@ public class DataFrameTransformsCheckpointServiceTests extends ESTestCase {
         Map<String, long[]> expectedCheckpoints = new HashMap<>();
         Set<String> indices = randomUserIndices();
 
-        ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, true, false);
+        ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, true, false, false);
 
         Map<String, long[]> checkpoints = DataFrameTransformsCheckpointService.extractIndexCheckPoints(shardStatsArray, indices);
 
@@ -86,7 +103,7 @@ public class DataFrameTransformsCheckpointServiceTests extends ESTestCase {
         Map<String, long[]> expectedCheckpoints = new HashMap<>();
         Set<String> indices = randomUserIndices();
 
-        ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, randomBoolean(), true);
+        ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, randomBoolean(), true, false);
 
         // fail
         CheckpointException e = expectThrows(CheckpointException.class,
@@ -120,10 +137,11 @@ public class DataFrameTransformsCheckpointServiceTests extends ESTestCase {
      * @param userIndices set of indices that are visible
      * @param skipPrimaries whether some shards do not have a primary shard at random
      * @param inconsistentGlobalCheckpoints whether to introduce inconsistent global checkpoints
+     * @param missingSeqNoStats whether some indices miss SeqNoStats
      * @return array of ShardStats
      */
     private static ShardStats[] createRandomShardStats(Map<String, long[]> expectedCheckpoints, Set<String> userIndices,
-            boolean skipPrimaries, boolean inconsistentGlobalCheckpoints) {
+            boolean skipPrimaries, boolean inconsistentGlobalCheckpoints, boolean missingSeqNoStats) {
 
         // always create the full list
         List<Index> indices = new ArrayList<>();
@@ -131,6 +149,8 @@ public class DataFrameTransformsCheckpointServiceTests extends ESTestCase {
         indices.add(new Index("index-2", UUIDs.randomBase64UUID(random())));
         indices.add(new Index("index-3", UUIDs.randomBase64UUID(random())));
 
+        String missingSeqNoStatsIndex = randomFrom(userIndices);
+
         List<ShardStats> shardStats = new ArrayList<>();
         for (final Index index : indices) {
             int numShards = randomIntBetween(1, 5);
@@ -160,8 +180,15 @@ public class DataFrameTransformsCheckpointServiceTests extends ESTestCase {
                 long globalCheckpoint = randomBoolean() ? localCheckpoint : randomLongBetween(0L, 100000000L);
                 long maxSeqNo = Math.max(localCheckpoint, globalCheckpoint);
 
-                final SeqNoStats validSeqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
-                checkpoints.add(globalCheckpoint);
+                SeqNoStats validSeqNoStats = null;
+
+                // add broken seqNoStats if requested
+                if (missingSeqNoStats && index.getName().equals(missingSeqNoStatsIndex)) {
+                    checkpoints.add(0L);
+                } else {
+                    validSeqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
+                    checkpoints.add(globalCheckpoint);
+                }
 
                 for (int replica = 0;  replica < numShardCopies; replica++) {
                     ShardId shardId = new ShardId(index, shardIndex);