|
@@ -10,6 +10,7 @@
|
|
|
package org.elasticsearch.action.admin.indices.resolve;
|
|
|
|
|
|
import org.elasticsearch.TransportVersion;
|
|
|
+import org.elasticsearch.TransportVersions;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.ActionRequestValidationException;
|
|
|
import org.elasticsearch.action.ActionResponse;
|
|
@@ -56,6 +57,7 @@ import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collections;
|
|
|
import java.util.Comparator;
|
|
|
+import java.util.EnumSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Locale;
|
|
|
import java.util.Map;
|
|
@@ -63,6 +65,8 @@ import java.util.Objects;
|
|
|
import java.util.Set;
|
|
|
import java.util.SortedMap;
|
|
|
import java.util.TreeMap;
|
|
|
+import java.util.function.Predicate;
|
|
|
+import java.util.stream.Collectors;
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
|
import static org.elasticsearch.action.search.TransportSearchHelper.checkCCSVersionCompatibility;
|
|
@@ -85,6 +89,7 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
|
|
|
|
|
|
private String[] names;
|
|
|
private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
|
|
|
+ private EnumSet<IndexMode> indexModes = EnumSet.noneOf(IndexMode.class);
|
|
|
|
|
|
public Request(String[] names) {
|
|
|
this.names = names;
|
|
@@ -95,6 +100,14 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
|
|
|
this.indicesOptions = indicesOptions;
|
|
|
}
|
|
|
|
|
|
+ public Request(String[] names, IndicesOptions indicesOptions, @Nullable EnumSet<IndexMode> indexModes) {
|
|
|
+ this.names = names;
|
|
|
+ this.indicesOptions = indicesOptions;
|
|
|
+ if (indexModes != null) {
|
|
|
+ this.indexModes = indexModes;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public ActionRequestValidationException validate() {
|
|
|
return null;
|
|
@@ -104,6 +117,11 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
|
|
|
super(in);
|
|
|
this.names = in.readStringArray();
|
|
|
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
|
|
|
+ if (in.getTransportVersion().onOrAfter(TransportVersions.RESOLVE_INDEX_MODE_FILTER)) {
|
|
|
+ this.indexModes = in.readEnumSet(IndexMode.class);
|
|
|
+ } else {
|
|
|
+ this.indexModes = EnumSet.noneOf(IndexMode.class);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -111,6 +129,9 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
|
|
|
super.writeTo(out);
|
|
|
out.writeStringArray(names);
|
|
|
indicesOptions.writeIndicesOptions(out);
|
|
|
+ if (out.getTransportVersion().onOrAfter(TransportVersions.RESOLVE_INDEX_MODE_FILTER)) {
|
|
|
+ out.writeEnumSet(indexModes);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -118,12 +139,12 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
|
|
|
if (this == o) return true;
|
|
|
if (o == null || getClass() != o.getClass()) return false;
|
|
|
Request request = (Request) o;
|
|
|
- return Arrays.equals(names, request.names);
|
|
|
+ return Arrays.equals(names, request.names) && indexModes.equals(request.indexModes);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public int hashCode() {
|
|
|
- return Arrays.hashCode(names);
|
|
|
+ return Objects.hash(Arrays.hashCode(names), indexModes);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -276,6 +297,19 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
|
|
|
result = 31 * result + Arrays.hashCode(attributes);
|
|
|
return result;
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return String.format(
|
|
|
+ Locale.ROOT,
|
|
|
+ "ResolvedIndex{name=%s, aliases=%s, attributes=%s, dataStream=%s, mode=%s}",
|
|
|
+ getName(),
|
|
|
+ Arrays.toString(aliases),
|
|
|
+ Arrays.toString(attributes),
|
|
|
+ dataStream,
|
|
|
+ mode
|
|
|
+ );
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public static class ResolvedAlias extends ResolvedIndexAbstraction implements Writeable, ToXContentObject {
|
|
@@ -333,6 +367,11 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
|
|
|
result = 31 * result + Arrays.hashCode(indices);
|
|
|
return result;
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return String.format(Locale.ROOT, "ResolvedAlias{name=%s, indices=%s}", getName(), Arrays.toString(indices));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public static class ResolvedDataStream extends ResolvedIndexAbstraction implements Writeable, ToXContentObject {
|
|
@@ -400,6 +439,17 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
|
|
|
result = 31 * result + Arrays.hashCode(backingIndices);
|
|
|
return result;
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return String.format(
|
|
|
+ Locale.ROOT,
|
|
|
+ "ResolvedDataStream{name=%s, backingIndices=%s, timestampField=%s}",
|
|
|
+ getName(),
|
|
|
+ Arrays.toString(backingIndices),
|
|
|
+ timestampField
|
|
|
+ );
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public static class Response extends ActionResponse implements ToXContentObject {
|
|
@@ -505,7 +555,7 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
|
|
|
List<ResolvedIndex> indices = new ArrayList<>();
|
|
|
List<ResolvedAlias> aliases = new ArrayList<>();
|
|
|
List<ResolvedDataStream> dataStreams = new ArrayList<>();
|
|
|
- resolveIndices(localIndices, projectState, indexNameExpressionResolver, indices, aliases, dataStreams);
|
|
|
+ resolveIndices(localIndices, projectState, indexNameExpressionResolver, indices, aliases, dataStreams, request.indexModes);
|
|
|
|
|
|
if (remoteClusterIndices.size() > 0) {
|
|
|
final int remoteRequests = remoteClusterIndices.size();
|
|
@@ -513,7 +563,7 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
|
|
|
final SortedMap<String, Response> remoteResponses = Collections.synchronizedSortedMap(new TreeMap<>());
|
|
|
final Runnable terminalHandler = () -> {
|
|
|
if (completionCounter.countDown()) {
|
|
|
- mergeResults(remoteResponses, indices, aliases, dataStreams);
|
|
|
+ mergeResults(remoteResponses, indices, aliases, dataStreams, request.indexModes);
|
|
|
listener.onResponse(new Response(indices, aliases, dataStreams));
|
|
|
}
|
|
|
};
|
|
@@ -554,12 +604,35 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
|
|
|
IndexNameExpressionResolver resolver,
|
|
|
List<ResolvedIndex> indices,
|
|
|
List<ResolvedAlias> aliases,
|
|
|
- List<ResolvedDataStream> dataStreams
|
|
|
+ List<ResolvedDataStream> dataStreams,
|
|
|
+ Set<IndexMode> indexModes
|
|
|
) {
|
|
|
if (localIndices == null) {
|
|
|
return;
|
|
|
}
|
|
|
- resolveIndices(localIndices.indices(), localIndices.indicesOptions(), projectState, resolver, indices, aliases, dataStreams);
|
|
|
+ resolveIndices(
|
|
|
+ localIndices.indices(),
|
|
|
+ localIndices.indicesOptions(),
|
|
|
+ projectState,
|
|
|
+ resolver,
|
|
|
+ indices,
|
|
|
+ aliases,
|
|
|
+ dataStreams,
|
|
|
+ indexModes
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ // Shortcut for tests that don't need index mode filtering
|
|
|
+ static void resolveIndices(
|
|
|
+ String[] names,
|
|
|
+ IndicesOptions indicesOptions,
|
|
|
+ ProjectState projectState,
|
|
|
+ IndexNameExpressionResolver resolver,
|
|
|
+ List<ResolvedIndex> indices,
|
|
|
+ List<ResolvedAlias> aliases,
|
|
|
+ List<ResolvedDataStream> dataStreams
|
|
|
+ ) {
|
|
|
+ resolveIndices(names, indicesOptions, projectState, resolver, indices, aliases, dataStreams, Collections.emptySet());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -580,7 +653,8 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
|
|
|
IndexNameExpressionResolver resolver,
|
|
|
List<ResolvedIndex> indices,
|
|
|
List<ResolvedAlias> aliases,
|
|
|
- List<ResolvedDataStream> dataStreams
|
|
|
+ List<ResolvedDataStream> dataStreams,
|
|
|
+ Set<IndexMode> indexModes
|
|
|
) {
|
|
|
// redundant check to ensure that we don't resolve the list of empty names to "all" in this context
|
|
|
if (names.length == 0) {
|
|
@@ -603,47 +677,104 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
|
|
|
names
|
|
|
);
|
|
|
for (ResolvedExpression s : resolvedIndexAbstractions) {
|
|
|
- enrichIndexAbstraction(projectState, s, indices, aliases, dataStreams);
|
|
|
+ enrichIndexAbstraction(projectState, s, indices, aliases, dataStreams, indexModes);
|
|
|
}
|
|
|
indices.sort(Comparator.comparing(ResolvedIndexAbstraction::getName));
|
|
|
aliases.sort(Comparator.comparing(ResolvedIndexAbstraction::getName));
|
|
|
dataStreams.sort(Comparator.comparing(ResolvedIndexAbstraction::getName));
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Merge the results from remote clusters into the local results lists.
|
|
|
+ * This will also do index mode filtering (if requested), as the remote cluster might be too old to do it itself.
|
|
|
+ */
|
|
|
private static void mergeResults(
|
|
|
Map<String, Response> remoteResponses,
|
|
|
List<ResolvedIndex> indices,
|
|
|
List<ResolvedAlias> aliases,
|
|
|
- List<ResolvedDataStream> dataStreams
|
|
|
+ List<ResolvedDataStream> dataStreams,
|
|
|
+ Set<IndexMode> indexModes
|
|
|
) {
|
|
|
for (Map.Entry<String, Response> responseEntry : remoteResponses.entrySet()) {
|
|
|
String clusterAlias = responseEntry.getKey();
|
|
|
Response response = responseEntry.getValue();
|
|
|
for (ResolvedIndex index : response.indices) {
|
|
|
+ // We want to filter by mode here because the linked cluster might be too old to be able to filter
|
|
|
+ if (indexModes.isEmpty() == false && indexModes.contains(index.getMode()) == false) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
indices.add(index.copy(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index.getName())));
|
|
|
}
|
|
|
+ Set<String> indexNames = indices.stream().map(ResolvedIndexAbstraction::getName).collect(Collectors.toSet());
|
|
|
for (ResolvedAlias alias : response.aliases) {
|
|
|
- aliases.add(alias.copy(RemoteClusterAware.buildRemoteIndexName(clusterAlias, alias.getName())));
|
|
|
+ if (indexModes.isEmpty() == false) {
|
|
|
+ // We filter out indices that are not included in the main index list after index mode filtering
|
|
|
+ String[] filteredIndices = Arrays.stream(alias.getIndices())
|
|
|
+ .filter(idxName -> indexNames.contains(RemoteClusterAware.buildRemoteIndexName(clusterAlias, idxName)))
|
|
|
+ .toArray(String[]::new);
|
|
|
+ if (filteredIndices.length == 0) {
|
|
|
+ // If this alias points to no indices after filtering, we skip it
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ alias = new ResolvedAlias(RemoteClusterAware.buildRemoteIndexName(clusterAlias, alias.getName()), filteredIndices);
|
|
|
+ } else {
|
|
|
+ alias = alias.copy(RemoteClusterAware.buildRemoteIndexName(clusterAlias, alias.getName()));
|
|
|
+ }
|
|
|
+ aliases.add(alias);
|
|
|
}
|
|
|
for (ResolvedDataStream dataStream : response.dataStreams) {
|
|
|
- dataStreams.add(dataStream.copy(RemoteClusterAware.buildRemoteIndexName(clusterAlias, dataStream.getName())));
|
|
|
+ if (indexModes.isEmpty() == false) {
|
|
|
+ // We filter out indices that are not included in the main index list after index mode filtering
|
|
|
+ String[] filteredBackingIndices = Arrays.stream(dataStream.getBackingIndices())
|
|
|
+ .filter(idxName -> indexNames.contains(RemoteClusterAware.buildRemoteIndexName(clusterAlias, idxName)))
|
|
|
+ .toArray(String[]::new);
|
|
|
+ if (filteredBackingIndices.length == 0) {
|
|
|
+ // If this data stream points to no backing indices after filtering, we skip it
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ dataStream = new ResolvedDataStream(
|
|
|
+ RemoteClusterAware.buildRemoteIndexName(clusterAlias, dataStream.getName()),
|
|
|
+ filteredBackingIndices,
|
|
|
+ dataStream.getTimestampField()
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ dataStream = dataStream.copy(RemoteClusterAware.buildRemoteIndexName(clusterAlias, dataStream.getName()));
|
|
|
+ }
|
|
|
+ dataStreams.add(dataStream);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static Predicate<Index> indexModeFilter(ProjectState projectState, Set<IndexMode> indexModes) {
|
|
|
+ if (indexModes.isEmpty()) {
|
|
|
+ return index -> true;
|
|
|
+ }
|
|
|
+ return index -> {
|
|
|
+ IndexMetadata indexMetadata = projectState.metadata().index(index);
|
|
|
+ IndexMode mode = indexMetadata.getIndexMode() == null ? IndexMode.STANDARD : indexMetadata.getIndexMode();
|
|
|
+ return indexModes.contains(mode);
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
private static void enrichIndexAbstraction(
|
|
|
ProjectState projectState,
|
|
|
ResolvedExpression resolvedExpression,
|
|
|
List<ResolvedIndex> indices,
|
|
|
List<ResolvedAlias> aliases,
|
|
|
- List<ResolvedDataStream> dataStreams
|
|
|
+ List<ResolvedDataStream> dataStreams,
|
|
|
+ Set<IndexMode> indexModes
|
|
|
) {
|
|
|
SortedMap<String, IndexAbstraction> indicesLookup = projectState.metadata().getIndicesLookup();
|
|
|
IndexAbstraction ia = indicesLookup.get(resolvedExpression.resource());
|
|
|
+ var filterPredicate = indexModeFilter(projectState, indexModes);
|
|
|
if (ia != null) {
|
|
|
switch (ia.getType()) {
|
|
|
case CONCRETE_INDEX -> {
|
|
|
+ if (filterPredicate.test(ia.getWriteIndex()) == false) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
IndexMetadata writeIndex = projectState.metadata().index(ia.getWriteIndex());
|
|
|
+ IndexMode mode = writeIndex.getIndexMode() == null ? IndexMode.STANDARD : writeIndex.getIndexMode();
|
|
|
String[] aliasNames = writeIndex.getAliases().keySet().stream().sorted().toArray(String[]::new);
|
|
|
List<Attribute> attributes = new ArrayList<>();
|
|
|
attributes.add(writeIndex.getState() == IndexMetadata.State.OPEN ? Attribute.OPEN : Attribute.CLOSED);
|
|
@@ -664,13 +795,17 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
|
|
|
aliasNames,
|
|
|
attributes.stream().map(Enum::name).map(e -> e.toLowerCase(Locale.ROOT)).toArray(String[]::new),
|
|
|
ia.getParentDataStream() == null ? null : ia.getParentDataStream().getName(),
|
|
|
- writeIndex.getIndexMode() == null ? IndexMode.STANDARD : writeIndex.getIndexMode()
|
|
|
+ mode
|
|
|
)
|
|
|
);
|
|
|
}
|
|
|
case ALIAS -> {
|
|
|
- String[] indexNames = getAliasIndexStream(resolvedExpression, ia, projectState.metadata()).map(Index::getName)
|
|
|
+ String[] indexNames = getAliasIndexStream(resolvedExpression, ia, projectState.metadata()).filter(filterPredicate)
|
|
|
+ .map(Index::getName)
|
|
|
.toArray(String[]::new);
|
|
|
+ if (indexModes.isEmpty() == false && indexNames.length == 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
Arrays.sort(indexNames);
|
|
|
aliases.add(new ResolvedAlias(ia.getName(), indexNames));
|
|
|
}
|
|
@@ -682,7 +817,10 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
|
|
|
case DATA -> dataStream.getDataComponent().getIndices().stream();
|
|
|
case FAILURES -> dataStream.getFailureIndices().stream();
|
|
|
};
|
|
|
- String[] backingIndices = dataStreamIndices.map(Index::getName).toArray(String[]::new);
|
|
|
+ String[] backingIndices = dataStreamIndices.filter(filterPredicate).map(Index::getName).toArray(String[]::new);
|
|
|
+ if (indexModes.isEmpty() == false && backingIndices.length == 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
dataStreams.add(new ResolvedDataStream(dataStream.getName(), backingIndices, DataStream.TIMESTAMP_FIELD_NAME));
|
|
|
}
|
|
|
default -> throw new IllegalStateException("unknown index abstraction type: " + ia.getType());
|