|
@@ -18,6 +18,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
|
|
import org.elasticsearch.core.Nullable;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
+import org.elasticsearch.core.Tuple;
|
|
|
import org.elasticsearch.index.Index;
|
|
|
import org.elasticsearch.license.XPackLicenseState;
|
|
|
import org.elasticsearch.xcontent.DeprecationHandler;
|
|
@@ -48,20 +49,27 @@ import java.util.Optional;
|
|
|
import java.util.Set;
|
|
|
import java.util.SortedMap;
|
|
|
import java.util.TreeMap;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
public class PolicyStepsRegistry {
|
|
|
private static final Logger logger = LogManager.getLogger(PolicyStepsRegistry.class);
|
|
|
|
|
|
+ private final NamedXContentRegistry xContentRegistry;
|
|
|
private final Client client;
|
|
|
private final XPackLicenseState licenseState;
|
|
|
+
|
|
|
// keeps track of existing policies in the cluster state
|
|
|
private final SortedMap<String, LifecyclePolicyMetadata> lifecyclePolicyMap;
|
|
|
// keeps track of what the first step in a policy is, the key is policy name
|
|
|
private final Map<String, Step> firstStepMap;
|
|
|
// keeps track of a mapping from policy/step-name to respective Step, the key is policy name
|
|
|
private final Map<String, Map<Step.StepKey, Step>> stepMap;
|
|
|
- private final NamedXContentRegistry xContentRegistry;
|
|
|
+
|
|
|
+ // tracks an index->step cache, where the indexmetadata is also tracked for cache invalidation/eviction purposes.
|
|
|
+ // for a given index, the step can be cached as long as the indexmetadata (and the policy!) hasn't changed. since
|
|
|
+ // policies change infrequently, the entire cache is cleared on policy change.
|
|
|
+ private final Map<Index, Tuple<IndexMetadata, Step>> cachedSteps = new ConcurrentHashMap<>();
|
|
|
|
|
|
public PolicyStepsRegistry(NamedXContentRegistry xContentRegistry, Client client, XPackLicenseState licenseState) {
|
|
|
this(new TreeMap<>(), new HashMap<>(), new HashMap<>(), xContentRegistry, client, licenseState);
|
|
@@ -98,6 +106,9 @@ public class PolicyStepsRegistry {
|
|
|
public void update(IndexLifecycleMetadata meta) {
|
|
|
assert meta != null : "IndexLifecycleMetadata cannot be null when updating the policy steps registry";
|
|
|
|
|
|
+ // since the policies (may have) changed, the whole steps cache needs to be thrown out
|
|
|
+ cachedSteps.clear();
|
|
|
+
|
|
|
DiffableUtils.MapDiff<String, LifecyclePolicyMetadata, Map<String, LifecyclePolicyMetadata>> mapDiff = DiffableUtils.diff(
|
|
|
lifecyclePolicyMap,
|
|
|
meta.getPolicyMetadatas(),
|
|
@@ -154,6 +165,36 @@ public class PolicyStepsRegistry {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Remove the entry for an index from the index->step cache.
|
|
|
+ *
|
|
|
+ * We clear the map entirely when the master of the cluster changes, and when any
|
|
|
+ * policy changes, but in a long-lived cluster that doesn't happen to experience
|
|
|
+ * either of those events (and where indices are removed regularly) we still want
|
|
|
+ * the cache to trim deleted indices.
|
|
|
+ *
|
|
|
+ * n.b. even with this, there's still a pretty small chance that a given index
|
|
|
+ * could leak, if we're right in the middle of populating the cache for that
|
|
|
+ * index (in getStep) when we process the delete here, then we'll end up with an
|
|
|
+ * entry that doesn't get deleted until the master changes or a policy changes
|
|
|
+ * -- it's harmless enough
|
|
|
+ */
|
|
|
+ public void delete(Index deleted) {
|
|
|
+ cachedSteps.remove(deleted);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Clear internal maps that were populated by update (and others).
|
|
|
+ */
|
|
|
+ public void clear() {
|
|
|
+ // this is potentially large, so it's important to clear it
|
|
|
+ cachedSteps.clear();
|
|
|
+ // these are relatively small, but there's no harm in clearing them
|
|
|
+ lifecyclePolicyMap.clear();
|
|
|
+ firstStepMap.clear();
|
|
|
+ stepMap.clear();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Return all ordered steps for the current policy for the index. Does not
|
|
|
* resolve steps using the phase caching, but only for the currently existing policy.
|
|
@@ -266,6 +307,14 @@ public class PolicyStepsRegistry {
|
|
|
|
|
|
@Nullable
|
|
|
public Step getStep(final IndexMetadata indexMetadata, final Step.StepKey stepKey) {
|
|
|
+ final Tuple<IndexMetadata, Step> cachedStep = cachedSteps.get(indexMetadata.getIndex());
|
|
|
+ // n.b. we're using instance equality here for the IndexMetadata rather than object equality because it's fast,
|
|
|
+ // this means that we're erring on the side of cache misses (if the IndexMetadata changed in any way, it'll be
|
|
|
+ // a new instance, so we'll miss-and-repopulate the cache for the index in question)
|
|
|
+ if (cachedStep != null && cachedStep.v1() == indexMetadata && cachedStep.v2().getKey().equals(stepKey)) {
|
|
|
+ return cachedStep.v2();
|
|
|
+ }
|
|
|
+
|
|
|
if (ErrorStep.NAME.equals(stepKey.getName())) {
|
|
|
return new ErrorStep(new Step.StepKey(stepKey.getPhase(), stepKey.getAction(), ErrorStep.NAME));
|
|
|
}
|
|
@@ -304,7 +353,9 @@ public class PolicyStepsRegistry {
|
|
|
+ phaseSteps;
|
|
|
|
|
|
// Return the step that matches the given stepKey or else null if we couldn't find it
|
|
|
- return phaseSteps.stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null);
|
|
|
+ final Step s = phaseSteps.stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null);
|
|
|
+ cachedSteps.put(indexMetadata.getIndex(), Tuple.tuple(indexMetadata, s));
|
|
|
+ return s;
|
|
|
}
|
|
|
|
|
|
/**
|