|
@@ -9,7 +9,6 @@ package org.elasticsearch.xpack.dataframe.checkpoint;
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
-import org.elasticsearch.action.LatchedActionListener;
|
|
|
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
|
|
|
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
|
|
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
|
|
@@ -28,8 +27,6 @@ import java.util.HashSet;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeMap;
|
|
|
-import java.util.concurrent.CountDownLatch;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
/**
|
|
|
* DataFrameTransform Checkpoint Service
|
|
@@ -41,17 +38,22 @@ import java.util.concurrent.TimeUnit;
|
|
|
*/
|
|
|
public class DataFrameTransformsCheckpointService {
|
|
|
|
|
|
- private class Checkpoints {
|
|
|
+ private static class Checkpoints {
|
|
|
DataFrameTransformCheckpoint currentCheckpoint = DataFrameTransformCheckpoint.EMPTY;
|
|
|
DataFrameTransformCheckpoint inProgressCheckpoint = DataFrameTransformCheckpoint.EMPTY;
|
|
|
DataFrameTransformCheckpoint sourceCheckpoint = DataFrameTransformCheckpoint.EMPTY;
|
|
|
+
|
|
|
+ DataFrameTransformCheckpointingInfo buildInfo() {
|
|
|
+ return new DataFrameTransformCheckpointingInfo(
|
|
|
+ new DataFrameTransformCheckpointStats(currentCheckpoint.getTimestamp(), currentCheckpoint.getTimeUpperBound()),
|
|
|
+ new DataFrameTransformCheckpointStats(inProgressCheckpoint.getTimestamp(), inProgressCheckpoint.getTimeUpperBound()),
|
|
|
+ DataFrameTransformCheckpoint.getBehind(currentCheckpoint, sourceCheckpoint));
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
private static final Logger logger = LogManager.getLogger(DataFrameTransformsCheckpointService.class);
|
|
|
|
|
|
- // timeout for retrieving checkpoint information
|
|
|
- private static final int CHECKPOINT_STATS_TIMEOUT_SECONDS = 5;
|
|
|
-
|
|
|
private final Client client;
|
|
|
private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
|
|
|
|
|
@@ -86,40 +88,49 @@ public class DataFrameTransformsCheckpointService {
|
|
|
long timeUpperBound = 0;
|
|
|
|
|
|
// 1st get index to see the indexes the user has access to
|
|
|
- GetIndexRequest getIndexRequest = new GetIndexRequest().indices(transformConfig.getSource().getIndex());
|
|
|
+ GetIndexRequest getIndexRequest = new GetIndexRequest()
|
|
|
+ .indices(transformConfig.getSource().getIndex())
|
|
|
+ .features(new GetIndexRequest.Feature[0]);
|
|
|
|
|
|
ClientHelper.executeWithHeadersAsync(transformConfig.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, GetIndexAction.INSTANCE,
|
|
|
getIndexRequest, ActionListener.wrap(getIndexResponse -> {
|
|
|
Set<String> userIndices = new HashSet<>(Arrays.asList(getIndexResponse.getIndices()));
|
|
|
-
|
|
|
// 2nd get stats request
|
|
|
- ClientHelper.executeAsyncWithOrigin(client, ClientHelper.DATA_FRAME_ORIGIN, IndicesStatsAction.INSTANCE,
|
|
|
- new IndicesStatsRequest().indices(transformConfig.getSource().getIndex()), ActionListener.wrap(response -> {
|
|
|
+ ClientHelper.executeAsyncWithOrigin(client,
|
|
|
+ ClientHelper.DATA_FRAME_ORIGIN,
|
|
|
+ IndicesStatsAction.INSTANCE,
|
|
|
+ new IndicesStatsRequest()
|
|
|
+ .indices(transformConfig.getSource().getIndex())
|
|
|
+ .clear(),
|
|
|
+ ActionListener.wrap(
|
|
|
+ response -> {
|
|
|
if (response.getFailedShards() != 0) {
|
|
|
- throw new CheckpointException("Source has [" + response.getFailedShards() + "] failed shards");
|
|
|
+ listener.onFailure(
|
|
|
+ new CheckpointException("Source has [" + response.getFailedShards() + "] failed shards"));
|
|
|
+ return;
|
|
|
}
|
|
|
-
|
|
|
- Map<String, long[]> checkpointsByIndex = extractIndexCheckPoints(response.getShards(), userIndices);
|
|
|
- DataFrameTransformCheckpoint checkpointDoc = new DataFrameTransformCheckpoint(transformConfig.getId(),
|
|
|
- timestamp, checkpoint, checkpointsByIndex, timeUpperBound);
|
|
|
-
|
|
|
- listener.onResponse(checkpointDoc);
|
|
|
-
|
|
|
- }, IndicesStatsRequestException -> {
|
|
|
- throw new CheckpointException("Failed to retrieve indices stats", IndicesStatsRequestException);
|
|
|
- }));
|
|
|
-
|
|
|
- }, getIndexException -> {
|
|
|
- throw new CheckpointException("Failed to retrieve list of indices", getIndexException);
|
|
|
- }));
|
|
|
+ try {
|
|
|
+ Map<String, long[]> checkpointsByIndex = extractIndexCheckPoints(response.getShards(), userIndices);
|
|
|
+ listener.onResponse(new DataFrameTransformCheckpoint(transformConfig.getId(),
|
|
|
+ timestamp,
|
|
|
+ checkpoint,
|
|
|
+ checkpointsByIndex,
|
|
|
+ timeUpperBound));
|
|
|
+ } catch (CheckpointException checkpointException) {
|
|
|
+ listener.onFailure(checkpointException);
|
|
|
+ }
|
|
|
+ },
|
|
|
+ listener::onFailure
|
|
|
+ ));
|
|
|
+ },
|
|
|
+ listener::onFailure
|
|
|
+ ));
|
|
|
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Get checkpointing stats for a data frame
|
|
|
*
|
|
|
- * Implementation details:
|
|
|
- * - fires up to 3 requests _in parallel_ rather than cascading them
|
|
|
*
|
|
|
* @param transformId The data frame task
|
|
|
* @param currentCheckpoint the current checkpoint
|
|
@@ -132,71 +143,66 @@ public class DataFrameTransformsCheckpointService {
|
|
|
long inProgressCheckpoint,
|
|
|
ActionListener<DataFrameTransformCheckpointingInfo> listener) {
|
|
|
|
|
|
- // process in parallel: current checkpoint, in-progress checkpoint, current state of the source
|
|
|
- CountDownLatch latch = new CountDownLatch(3);
|
|
|
-
|
|
|
- // ensure listener is called exactly once
|
|
|
- final ActionListener<DataFrameTransformCheckpointingInfo> wrappedListener = ActionListener.notifyOnce(listener);
|
|
|
-
|
|
|
- // holder structure for writing the results of the 3 parallel tasks
|
|
|
Checkpoints checkpoints = new Checkpoints();
|
|
|
|
|
|
- // get the current checkpoint
|
|
|
- if (currentCheckpoint != 0) {
|
|
|
- dataFrameTransformsConfigManager.getTransformCheckpoint(transformId, currentCheckpoint,
|
|
|
- new LatchedActionListener<>(ActionListener.wrap(checkpoint -> checkpoints.currentCheckpoint = checkpoint, e -> {
|
|
|
- logger.debug("Failed to retrieve checkpoint [" + currentCheckpoint + "] for data frame []" + transformId, e);
|
|
|
- wrappedListener
|
|
|
- .onFailure(new CheckpointException("Failed to retrieve current checkpoint [" + currentCheckpoint + "]", e));
|
|
|
- }), latch));
|
|
|
- } else {
|
|
|
- latch.countDown();
|
|
|
- }
|
|
|
-
|
|
|
- // get the in-progress checkpoint
|
|
|
- if (inProgressCheckpoint != 0) {
|
|
|
- dataFrameTransformsConfigManager.getTransformCheckpoint(transformId, inProgressCheckpoint,
|
|
|
- new LatchedActionListener<>(ActionListener.wrap(checkpoint -> checkpoints.inProgressCheckpoint = checkpoint, e -> {
|
|
|
- logger.debug("Failed to retrieve in progress checkpoint [" + inProgressCheckpoint + "] for data frame ["
|
|
|
- + transformId + "]", e);
|
|
|
- wrappedListener.onFailure(
|
|
|
- new CheckpointException("Failed to retrieve in progress checkpoint [" + inProgressCheckpoint + "]", e));
|
|
|
- }), latch));
|
|
|
- } else {
|
|
|
- latch.countDown();
|
|
|
- }
|
|
|
-
|
|
|
- // get the current state
|
|
|
- dataFrameTransformsConfigManager.getTransformConfiguration(transformId, ActionListener.wrap(transformConfig -> {
|
|
|
- getCheckpoint(transformConfig,
|
|
|
- new LatchedActionListener<>(ActionListener.wrap(checkpoint -> checkpoints.sourceCheckpoint = checkpoint, e2 -> {
|
|
|
- logger.debug("Failed to retrieve actual checkpoint for data frame [" + transformId + "]", e2);
|
|
|
- wrappedListener.onFailure(new CheckpointException("Failed to retrieve actual checkpoint", e2));
|
|
|
- }), latch));
|
|
|
- }, e -> {
|
|
|
- logger.warn("Failed to retrieve configuration for data frame [" + transformId + "]", e);
|
|
|
- wrappedListener.onFailure(new CheckpointException("Failed to retrieve configuration", e));
|
|
|
- latch.countDown();
|
|
|
- }));
|
|
|
-
|
|
|
- try {
|
|
|
- if (latch.await(CHECKPOINT_STATS_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
|
|
|
- logger.debug("Retrieval of checkpoint information succeeded for data frame [" + transformId + "]");
|
|
|
- wrappedListener.onResponse(new DataFrameTransformCheckpointingInfo(
|
|
|
- new DataFrameTransformCheckpointStats(checkpoints.currentCheckpoint.getTimestamp(),
|
|
|
- checkpoints.currentCheckpoint.getTimeUpperBound()),
|
|
|
- new DataFrameTransformCheckpointStats(checkpoints.inProgressCheckpoint.getTimestamp(),
|
|
|
- checkpoints.inProgressCheckpoint.getTimeUpperBound()),
|
|
|
- DataFrameTransformCheckpoint.getBehind(checkpoints.currentCheckpoint, checkpoints.sourceCheckpoint)));
|
|
|
- } else {
|
|
|
- // timed out
|
|
|
- logger.debug("Retrieval of checkpoint information has timed out for data frame [" + transformId + "]");
|
|
|
- wrappedListener.onFailure(new CheckpointException("Retrieval of checkpoint information has timed out"));
|
|
|
+ // <3> notify the user once we have the current checkpoint
|
|
|
+ ActionListener<DataFrameTransformCheckpoint> currentCheckpointListener = ActionListener.wrap(
|
|
|
+ currentCheckpointObj -> {
|
|
|
+ checkpoints.currentCheckpoint = currentCheckpointObj;
|
|
|
+ listener.onResponse(checkpoints.buildInfo());
|
|
|
+ },
|
|
|
+ e -> {
|
|
|
+ logger.debug("Failed to retrieve current checkpoint [" +
|
|
|
+ currentCheckpoint + "] for data frame [" + transformId + "]", e);
|
|
|
+ listener.onFailure(new CheckpointException("Failure during current checkpoint info retrieval", e));
|
|
|
}
|
|
|
- } catch (InterruptedException e) {
|
|
|
- logger.debug("Failed to retrieve checkpoints for data frame [" + transformId + "]", e);
|
|
|
- wrappedListener.onFailure(new CheckpointException("Failure during checkpoint info retrieval", e));
|
|
|
- }
|
|
|
+ );
|
|
|
+
|
|
|
+ // <2> after the in progress checkpoint, get the current checkpoint
|
|
|
+ ActionListener<DataFrameTransformCheckpoint> inProgressCheckpointListener = ActionListener.wrap(
|
|
|
+ inProgressCheckpointObj -> {
|
|
|
+ checkpoints.inProgressCheckpoint = inProgressCheckpointObj;
|
|
|
+ if (currentCheckpoint != 0) {
|
|
|
+ dataFrameTransformsConfigManager.getTransformCheckpoint(transformId,
|
|
|
+ currentCheckpoint,
|
|
|
+ currentCheckpointListener);
|
|
|
+ } else {
|
|
|
+ currentCheckpointListener.onResponse(DataFrameTransformCheckpoint.EMPTY);
|
|
|
+ }
|
|
|
+ },
|
|
|
+ e -> {
|
|
|
+ logger.debug("Failed to retrieve in progress checkpoint [" +
|
|
|
+ inProgressCheckpoint + "] for data frame [" + transformId + "]", e);
|
|
|
+ listener.onFailure(new CheckpointException("Failure during in progress checkpoint info retrieval", e));
|
|
|
+ }
|
|
|
+ );
|
|
|
+
|
|
|
+ // <1> after the source checkpoint, get the in progress checkpoint
|
|
|
+ ActionListener<DataFrameTransformCheckpoint> sourceCheckpointListener = ActionListener.wrap(
|
|
|
+ sourceCheckpoint -> {
|
|
|
+ checkpoints.sourceCheckpoint = sourceCheckpoint;
|
|
|
+ if (inProgressCheckpoint != 0) {
|
|
|
+ dataFrameTransformsConfigManager.getTransformCheckpoint(transformId,
|
|
|
+ inProgressCheckpoint,
|
|
|
+ inProgressCheckpointListener);
|
|
|
+ } else {
|
|
|
+ inProgressCheckpointListener.onResponse(DataFrameTransformCheckpoint.EMPTY);
|
|
|
+ }
|
|
|
+ },
|
|
|
+ e -> {
|
|
|
+ logger.debug("Failed to retrieve source checkpoint for data frame [" + transformId + "]", e);
|
|
|
+ listener.onFailure(new CheckpointException("Failure during source checkpoint info retrieval", e));
|
|
|
+ }
|
|
|
+ );
|
|
|
+
|
|
|
+ // <0> get the transform and the source, transient checkpoint
|
|
|
+ dataFrameTransformsConfigManager.getTransformConfiguration(transformId, ActionListener.wrap(
|
|
|
+ transformConfig -> getCheckpoint(transformConfig, sourceCheckpointListener),
|
|
|
+ transformError -> {
|
|
|
+ logger.warn("Failed to retrieve configuration for data frame [" + transformId + "]", transformError);
|
|
|
+ listener.onFailure(new CheckpointException("Failed to retrieve configuration", transformError));
|
|
|
+ })
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<String> userIndices) {
|