|
@@ -16,9 +16,11 @@ import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
|
|
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
|
|
|
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
|
|
|
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
|
|
+import org.elasticsearch.action.support.GroupedActionListener;
|
|
|
import org.elasticsearch.action.support.IndicesOptions;
|
|
|
import org.elasticsearch.client.Client;
|
|
|
import org.elasticsearch.common.util.set.Sets;
|
|
|
+import org.elasticsearch.transport.RemoteClusterService;
|
|
|
import org.elasticsearch.xpack.core.ClientHelper;
|
|
|
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
|
|
|
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
|
|
@@ -26,16 +28,20 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingI
|
|
|
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
|
|
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
|
|
|
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
|
|
|
+import org.elasticsearch.xpack.transform.checkpoint.RemoteClusterResolver.ResolvedIndices;
|
|
|
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
|
|
|
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
|
|
|
|
|
|
import java.time.Instant;
|
|
|
import java.util.Arrays;
|
|
|
+import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashSet;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeMap;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
public class DefaultCheckpointProvider implements CheckpointProvider {
|
|
|
|
|
@@ -45,17 +51,20 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
|
|
|
private static final Logger logger = LogManager.getLogger(DefaultCheckpointProvider.class);
|
|
|
|
|
|
protected final Client client;
|
|
|
+ protected final RemoteClusterResolver remoteClusterResolver;
|
|
|
protected final TransformConfigManager transformConfigManager;
|
|
|
protected final TransformAuditor transformAuditor;
|
|
|
protected final TransformConfig transformConfig;
|
|
|
|
|
|
public DefaultCheckpointProvider(
|
|
|
final Client client,
|
|
|
+ final RemoteClusterResolver remoteClusterResolver,
|
|
|
final TransformConfigManager transformConfigManager,
|
|
|
final TransformAuditor transformAuditor,
|
|
|
final TransformConfig transformConfig
|
|
|
) {
|
|
|
this.client = client;
|
|
|
+ this.remoteClusterResolver = remoteClusterResolver;
|
|
|
this.transformConfigManager = transformConfigManager;
|
|
|
this.transformAuditor = transformAuditor;
|
|
|
this.transformConfig = transformConfig;
|
|
@@ -84,13 +93,61 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
|
|
|
}
|
|
|
|
|
|
protected void getIndexCheckpoints(ActionListener<Map<String, long[]>> listener) {
|
|
|
+ try {
|
|
|
+ ResolvedIndices resolvedIndexes = remoteClusterResolver.resolve(transformConfig.getSource().getIndex());
|
|
|
+ ActionListener<Map<String, long[]>> groupedListener = listener;
|
|
|
+
|
|
|
+ if (resolvedIndexes.numClusters() > 1) {
|
|
|
+ ActionListener<Collection<Map<String, long[]>>> mergeMapsListener = ActionListener.wrap(indexCheckpoints -> {
|
|
|
+ listener.onResponse(
|
|
|
+ indexCheckpoints.stream()
|
|
|
+ .flatMap(m -> m.entrySet().stream())
|
|
|
+ .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()))
|
|
|
+ );
|
|
|
+ }, listener::onFailure);
|
|
|
+
|
|
|
+ groupedListener = new GroupedActionListener<>(mergeMapsListener, resolvedIndexes.numClusters());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (resolvedIndexes.getLocalIndices().isEmpty() == false) {
|
|
|
+ getCheckpointsFromOneCluster(
|
|
|
+ client,
|
|
|
+ transformConfig.getHeaders(),
|
|
|
+ resolvedIndexes.getLocalIndices().toArray(new String[0]),
|
|
|
+ RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY,
|
|
|
+ groupedListener
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ for (Map.Entry<String, List<String>> remoteIndex : resolvedIndexes.getRemoteIndicesPerClusterAlias().entrySet()) {
|
|
|
+ Client remoteClient = client.getRemoteClusterClient(remoteIndex.getKey());
|
|
|
+ getCheckpointsFromOneCluster(
|
|
|
+ remoteClient,
|
|
|
+ transformConfig.getHeaders(),
|
|
|
+ remoteIndex.getValue().toArray(new String[0]),
|
|
|
+ remoteIndex.getKey() + RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR,
|
|
|
+ groupedListener
|
|
|
+ );
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ listener.onFailure(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void getCheckpointsFromOneCluster(
|
|
|
+ Client client,
|
|
|
+ Map<String, String> headers,
|
|
|
+ String[] indices,
|
|
|
+ String prefix,
|
|
|
+ ActionListener<Map<String, long[]>> listener
|
|
|
+ ) {
|
|
|
// 1st get index to see the indexes the user has access to
|
|
|
- GetIndexRequest getIndexRequest = new GetIndexRequest().indices(transformConfig.getSource().getIndex())
|
|
|
+ GetIndexRequest getIndexRequest = new GetIndexRequest().indices(indices)
|
|
|
.features(new GetIndexRequest.Feature[0])
|
|
|
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
|
|
|
|
|
|
ClientHelper.executeWithHeadersAsync(
|
|
|
- transformConfig.getHeaders(),
|
|
|
+ headers,
|
|
|
ClientHelper.TRANSFORM_ORIGIN,
|
|
|
client,
|
|
|
GetIndexAction.INSTANCE,
|
|
@@ -104,23 +161,20 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
|
|
|
client,
|
|
|
ClientHelper.TRANSFORM_ORIGIN,
|
|
|
IndicesStatsAction.INSTANCE,
|
|
|
- new IndicesStatsRequest().indices(transformConfig.getSource().getIndex())
|
|
|
- .clear()
|
|
|
- .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN),
|
|
|
+ new IndicesStatsRequest().indices(indices).clear().indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN),
|
|
|
ActionListener.wrap(response -> {
|
|
|
if (response.getFailedShards() != 0) {
|
|
|
listener.onFailure(new CheckpointException("Source has [" + response.getFailedShards() + "] failed shards"));
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
- listener.onResponse(extractIndexCheckPoints(response.getShards(), userIndices));
|
|
|
+ listener.onResponse(extractIndexCheckPoints(response.getShards(), userIndices, prefix));
|
|
|
}, e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e)))
|
|
|
);
|
|
|
}, e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e)))
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<String> userIndices) {
|
|
|
+ static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<String> userIndices, String prefix) {
|
|
|
Map<String, TreeMap<Integer, Long>> checkpointsByIndex = new TreeMap<>();
|
|
|
|
|
|
for (ShardStats shard : shards) {
|
|
@@ -129,9 +183,10 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
|
|
|
if (userIndices.contains(indexName)) {
|
|
|
// SeqNoStats could be `null`, assume the global checkpoint to be -1 in this case
|
|
|
long globalCheckpoint = shard.getSeqNoStats() == null ? -1L : shard.getSeqNoStats().getGlobalCheckpoint();
|
|
|
- if (checkpointsByIndex.containsKey(indexName)) {
|
|
|
+ String fullIndexName = prefix + indexName;
|
|
|
+ if (checkpointsByIndex.containsKey(fullIndexName)) {
|
|
|
// we have already seen this index, just check/add shards
|
|
|
- TreeMap<Integer, Long> checkpoints = checkpointsByIndex.get(indexName);
|
|
|
+ TreeMap<Integer, Long> checkpoints = checkpointsByIndex.get(fullIndexName);
|
|
|
// 1st time we see this shard for this index, add the entry for the shard
|
|
|
// or there is already a checkpoint entry for this index/shard combination
|
|
|
// but with a higher global checkpoint. This is by design(not a problem) and
|
|
@@ -142,8 +197,8 @@ public class DefaultCheckpointProvider implements CheckpointProvider {
|
|
|
}
|
|
|
} else {
|
|
|
// 1st time we see this index, create an entry for the index and add the shard checkpoint
|
|
|
- checkpointsByIndex.put(indexName, new TreeMap<>());
|
|
|
- checkpointsByIndex.get(indexName).put(shard.getShardRouting().getId(), globalCheckpoint);
|
|
|
+ checkpointsByIndex.put(fullIndexName, new TreeMap<>());
|
|
|
+ checkpointsByIndex.get(fullIndexName).put(shard.getShardRouting().getId(), globalCheckpoint);
|
|
|
}
|
|
|
}
|
|
|
}
|