|
@@ -13,8 +13,17 @@ import org.apache.lucene.util.automaton.Automaton;
|
|
|
import org.apache.lucene.util.automaton.CharacterRunAutomaton;
|
|
|
import org.apache.lucene.util.automaton.MinimizationOperations;
|
|
|
import org.apache.lucene.util.automaton.Operations;
|
|
|
+import org.elasticsearch.action.ActionListener;
|
|
|
+import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateResponse.ResetFeatureStateStatus;
|
|
|
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
|
|
|
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
|
|
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
|
|
+import org.elasticsearch.client.Client;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
+import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.Nullable;
|
|
|
+import org.elasticsearch.common.Strings;
|
|
|
+import org.elasticsearch.common.TriConsumer;
|
|
|
import org.elasticsearch.common.collect.Tuple;
|
|
|
import org.elasticsearch.index.Index;
|
|
|
import org.elasticsearch.snapshots.SnapshotsService;
|
|
@@ -29,6 +38,7 @@ import java.util.Map.Entry;
|
|
|
import java.util.Optional;
|
|
|
import java.util.function.Predicate;
|
|
|
import java.util.stream.Collectors;
|
|
|
+import java.util.stream.Stream;
|
|
|
|
|
|
import static java.util.stream.Collectors.toUnmodifiableList;
|
|
|
import static org.elasticsearch.tasks.TaskResultsService.TASKS_DESCRIPTOR;
|
|
@@ -41,13 +51,18 @@ import static org.elasticsearch.tasks.TaskResultsService.TASKS_FEATURE_NAME;
|
|
|
*/
|
|
|
public class SystemIndices {
|
|
|
private static final Map<String, Feature> SERVER_SYSTEM_INDEX_DESCRIPTORS = Map.of(
|
|
|
- TASKS_FEATURE_NAME, new Feature("Manages task results", List.of(TASKS_DESCRIPTOR))
|
|
|
+ TASKS_FEATURE_NAME, new Feature(TASKS_FEATURE_NAME, "Manages task results", List.of(TASKS_DESCRIPTOR))
|
|
|
);
|
|
|
|
|
|
private final CharacterRunAutomaton runAutomaton;
|
|
|
private final Map<String, Feature> featureDescriptors;
|
|
|
private final Map<String, CharacterRunAutomaton> productToSystemIndicesMatcher;
|
|
|
|
|
|
+ /**
|
|
|
+ * Initialize the SystemIndices object
|
|
|
+ * @param pluginAndModulesDescriptors A map of this node's feature names to
|
|
|
+ * feature objects.
|
|
|
+ */
|
|
|
public SystemIndices(Map<String, Feature> pluginAndModulesDescriptors) {
|
|
|
featureDescriptors = buildSystemIndexDescriptorMap(pluginAndModulesDescriptors);
|
|
|
checkForOverlappingPatterns(featureDescriptors);
|
|
@@ -236,6 +251,11 @@ public class SystemIndices {
|
|
|
.collect(Collectors.toList());
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Check that a feature name is not reserved
|
|
|
+ * @param name Name of feature
|
|
|
+ * @param plugin Name of plugin providing the feature
|
|
|
+ */
|
|
|
public static void validateFeatureName(String name, String plugin) {
|
|
|
if (SnapshotsService.NO_FEATURE_STATES_VALUE.equalsIgnoreCase(name)) {
|
|
|
throw new IllegalArgumentException("feature name cannot be reserved name [\"" + SnapshotsService.NO_FEATURE_STATES_VALUE +
|
|
@@ -243,19 +263,44 @@ public class SystemIndices {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Class holding a description of a stateful feature.
|
|
|
+ */
|
|
|
public static class Feature {
|
|
|
private final String description;
|
|
|
private final Collection<SystemIndexDescriptor> indexDescriptors;
|
|
|
private final Collection<String> associatedIndexPatterns;
|
|
|
-
|
|
|
- public Feature(String description, Collection<SystemIndexDescriptor> indexDescriptors, Collection<String> associatedIndexPatterns) {
|
|
|
+ private final TriConsumer<ClusterService, Client, ActionListener<ResetFeatureStateStatus>> cleanUpFunction;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Construct a Feature with a custom cleanup function
|
|
|
+ * @param description Description of the feature
|
|
|
+ * @param indexDescriptors Patterns describing system indices for this feature
|
|
|
+ * @param associatedIndexPatterns Patterns describing associated indices
|
|
|
+ * @param cleanUpFunction A function that will clean up the feature's state
|
|
|
+ */
|
|
|
+ public Feature(
|
|
|
+ String description,
|
|
|
+ Collection<SystemIndexDescriptor> indexDescriptors,
|
|
|
+ Collection<String> associatedIndexPatterns,
|
|
|
+ TriConsumer<ClusterService, Client, ActionListener<ResetFeatureStateStatus>> cleanUpFunction) {
|
|
|
this.description = description;
|
|
|
this.indexDescriptors = indexDescriptors;
|
|
|
this.associatedIndexPatterns = associatedIndexPatterns;
|
|
|
+ this.cleanUpFunction = cleanUpFunction;
|
|
|
}
|
|
|
|
|
|
- public Feature(String description, Collection<SystemIndexDescriptor> indexDescriptors) {
|
|
|
- this(description, indexDescriptors, Collections.emptyList());
|
|
|
+ /**
|
|
|
+ * Construct a Feature using the default clean-up function
|
|
|
+ * @param name Name of the feature, used in logging
|
|
|
+ * @param description Description of the feature
|
|
|
+ * @param indexDescriptors Patterns describing system indices for this feature
|
|
|
+ */
|
|
|
+ public Feature(String name, String description, Collection<SystemIndexDescriptor> indexDescriptors) {
|
|
|
+ this(description, indexDescriptors, Collections.emptyList(),
|
|
|
+ (clusterService, client, listener) ->
|
|
|
+ cleanUpFeature(indexDescriptors, Collections.emptyList(), name, clusterService, client, listener)
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
public String getDescription() {
|
|
@@ -269,5 +314,53 @@ public class SystemIndices {
|
|
|
public Collection<String> getAssociatedIndexPatterns() {
|
|
|
return associatedIndexPatterns;
|
|
|
}
|
|
|
+
|
|
|
+ public TriConsumer<ClusterService, Client, ActionListener<ResetFeatureStateStatus>> getCleanUpFunction() {
|
|
|
+ return cleanUpFunction;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Clean up the state of a feature
|
|
|
+ * @param indexDescriptors List of descriptors of a feature's system indices
|
|
|
+ * @param associatedIndexPatterns List of patterns of a feature's associated indices
|
|
|
+ * @param name Name of the feature, used in logging
|
|
|
+ * @param clusterService A clusterService, for retrieving cluster metadata
|
|
|
+ * @param client A client, for issuing delete requests
|
|
|
+ * @param listener A listener to return success or failure of cleanup
|
|
|
+ */
|
|
|
+ public static void cleanUpFeature(
|
|
|
+ Collection<SystemIndexDescriptor> indexDescriptors,
|
|
|
+ Collection<String> associatedIndexPatterns,
|
|
|
+ String name,
|
|
|
+ ClusterService clusterService,
|
|
|
+ Client client,
|
|
|
+ ActionListener<ResetFeatureStateStatus> listener) {
|
|
|
+ Stream<String> systemIndices = indexDescriptors.stream()
|
|
|
+ .map(sid -> sid.getMatchingIndices(clusterService.state().getMetadata()))
|
|
|
+ .flatMap(List::stream);
|
|
|
+
|
|
|
+ List<String> allIndices = Stream.concat(systemIndices, associatedIndexPatterns.stream())
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ if (allIndices.isEmpty()) {
|
|
|
+ // if no actual indices match the pattern, we can stop here
|
|
|
+ listener.onResponse(new ResetFeatureStateStatus(name, "SUCCESS"));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest();
|
|
|
+ deleteIndexRequest.indices(allIndices.toArray(Strings.EMPTY_ARRAY));
|
|
|
+ client.execute(DeleteIndexAction.INSTANCE, deleteIndexRequest, new ActionListener<>() {
|
|
|
+ @Override
|
|
|
+ public void onResponse(AcknowledgedResponse acknowledgedResponse) {
|
|
|
+ listener.onResponse(new ResetFeatureStateStatus(name, "SUCCESS"));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e) {
|
|
|
+ listener.onResponse(new ResetFeatureStateStatus(name, "FAILURE: " + e.getMessage()));
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
}
|
|
|
}
|