|  | @@ -26,6 +26,7 @@ import org.elasticsearch.common.component.Lifecycle.State;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.scheduler.SchedulerEngine;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.scheduler.TimeValueSchedule;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.settings.Settings;
 | 
	
		
			
				|  |  | +import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 | 
	
		
			
				|  |  |  import org.elasticsearch.core.FixForMultiProject;
 | 
	
		
			
				|  |  |  import org.elasticsearch.core.Nullable;
 | 
	
		
			
				|  |  |  import org.elasticsearch.core.SuppressForbidden;
 | 
	
	
		
			
				|  | @@ -60,6 +61,8 @@ import java.util.Collection;
 | 
	
		
			
				|  |  |  import java.util.List;
 | 
	
		
			
				|  |  |  import java.util.Map;
 | 
	
		
			
				|  |  |  import java.util.Set;
 | 
	
		
			
				|  |  | +import java.util.concurrent.ExecutorService;
 | 
	
		
			
				|  |  | +import java.util.concurrent.atomic.AtomicReference;
 | 
	
		
			
				|  |  |  import java.util.function.LongSupplier;
 | 
	
		
			
				|  |  |  import java.util.stream.Collectors;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -90,7 +93,12 @@ public class IndexLifecycleService
 | 
	
		
			
				|  |  |      private final IndexLifecycleRunner lifecycleRunner;
 | 
	
		
			
				|  |  |      private final Settings settings;
 | 
	
		
			
				|  |  |      private final ClusterService clusterService;
 | 
	
		
			
				|  |  | +    private final ThreadPool threadPool;
 | 
	
		
			
				|  |  |      private final LongSupplier nowSupplier;
 | 
	
		
			
				|  |  | +    private final ExecutorService managementExecutor;
 | 
	
		
			
				|  |  | +    /** A reference to the last seen cluster state. If it's not null, we're currently processing a cluster state. */
 | 
	
		
			
				|  |  | +    private final AtomicReference<ClusterState> lastSeenState = new AtomicReference<>();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      private SchedulerEngine.Job scheduledJob;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      @SuppressWarnings("this-escape")
 | 
	
	
		
			
				|  | @@ -108,12 +116,14 @@ public class IndexLifecycleService
 | 
	
		
			
				|  |  |          super();
 | 
	
		
			
				|  |  |          this.settings = settings;
 | 
	
		
			
				|  |  |          this.clusterService = clusterService;
 | 
	
		
			
				|  |  | +        this.threadPool = threadPool;
 | 
	
		
			
				|  |  |          this.clock = clock;
 | 
	
		
			
				|  |  |          this.nowSupplier = nowSupplier;
 | 
	
		
			
				|  |  |          this.scheduledJob = null;
 | 
	
		
			
				|  |  |          this.policyRegistry = new PolicyStepsRegistry(xContentRegistry, client, licenseState);
 | 
	
		
			
				|  |  |          this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, ilmHistoryStore, clusterService, threadPool, nowSupplier);
 | 
	
		
			
				|  |  |          this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings);
 | 
	
		
			
				|  |  | +        this.managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
 | 
	
		
			
				|  |  |          clusterService.addStateApplier(this);
 | 
	
		
			
				|  |  |          clusterService.addListener(this);
 | 
	
		
			
				|  |  |          clusterService.getClusterSettings()
 | 
	
	
		
			
				|  | @@ -332,17 +342,76 @@ public class IndexLifecycleService
 | 
	
		
			
				|  |  |              // ClusterChangedEvent.indicesDeleted uses an equality check to skip computation if necessary.
 | 
	
		
			
				|  |  |              final List<Index> indicesDeleted = event.indicesDeleted();
 | 
	
		
			
				|  |  |              if (indicesDeleted.isEmpty() == false) {
 | 
	
		
			
				|  |  | -                clusterService.getClusterApplierService().threadPool().executor(ThreadPool.Names.MANAGEMENT).execute(() -> {
 | 
	
		
			
				|  |  | +                managementExecutor.execute(() -> {
 | 
	
		
			
				|  |  |                      for (Index index : indicesDeleted) {
 | 
	
		
			
				|  |  |                          policyRegistry.delete(index);
 | 
	
		
			
				|  |  |                      }
 | 
	
		
			
				|  |  |                  });
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            triggerPolicies(event.state(), true);
 | 
	
		
			
				|  |  | +            // Only start processing the new cluster state if we're not already processing one.
 | 
	
		
			
				|  |  | +            // Note that we might override the last seen state with a new one, even if the previous one hasn't been processed yet.
 | 
	
		
			
				|  |  | +            // This means that when ILM's cluster state processing takes longer than the overall cluster state application or when
 | 
	
		
			
				|  |  | +            // the forked thread is waiting in the thread pool queue (e.g. when the master node is swamped), we might skip some
 | 
	
		
			
				|  |  | +            // cluster state updates. Since ILM does not depend on "deltas" in cluster states, we can skip some cluster states just fine.
 | 
	
		
			
				|  |  | +            if (lastSeenState.getAndSet(event.state()) == null) {
 | 
	
		
			
				|  |  | +                processClusterState();
 | 
	
		
			
				|  |  | +            } else {
 | 
	
		
			
				|  |  | +                logger.trace("ILM state processor still running, not starting new thread");
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * Instead of processing cluster state updates on the cluster state applier thread, we fork to a different thread where
 | 
	
		
			
				|  |  | +     * ILM's runtime of processing the cluster state update does not affect the speed at which the cluster can apply new cluster states.
 | 
	
		
			
				|  |  | +     * That does not mean we don't need to optimize ILM's cluster state processing, as the overall amount of processing is generally
 | 
	
		
			
				|  |  | +     * unaffected by this fork approach (unless we skip some cluster states), but it does mean we're saving a significant amount
 | 
	
		
			
				|  |  | +     * of processing on the critical cluster state applier thread.
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    private void processClusterState() {
 | 
	
		
			
				|  |  | +        managementExecutor.execute(new AbstractRunnable() {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            private final SetOnce<ClusterState> currentState = new SetOnce<>();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            protected void doRun() throws Exception {
 | 
	
		
			
				|  |  | +                final ClusterState currentState = lastSeenState.get();
 | 
	
		
			
				|  |  | +                // This should never be null, but we're checking anyway to be sure.
 | 
	
		
			
				|  |  | +                if (currentState == null) {
 | 
	
		
			
				|  |  | +                    assert false : "Expected current state to non-null when processing cluster state in ILM";
 | 
	
		
			
				|  |  | +                    return;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +                this.currentState.set(currentState);
 | 
	
		
			
				|  |  | +                triggerPolicies(currentState, true);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public void onFailure(Exception e) {
 | 
	
		
			
				|  |  | +                logger.warn("ILM failed to process cluster state", e);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public void onAfter() {
 | 
	
		
			
				|  |  | +                // If the last seen state is unchanged, we set it to null to indicate that processing has finished and we return.
 | 
	
		
			
				|  |  | +                if (lastSeenState.compareAndSet(currentState.get(), null)) {
 | 
	
		
			
				|  |  | +                    return;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +                // If the last seen cluster state changed while this thread was running, it means a new cluster state came in and we need to
 | 
	
		
			
				|  |  | +                // process it. We do that by kicking off a new thread, which will pick up the new cluster state when the thread gets
 | 
	
		
			
				|  |  | +                // executed.
 | 
	
		
			
				|  |  | +                processClusterState();
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public boolean isForceExecution() {
 | 
	
		
			
				|  |  | +                // Without force execution, we risk ILM state processing being postponed arbitrarily long, which in turn could cause
 | 
	
		
			
				|  |  | +                // thundering herd issues if there's significant time between ILM runs.
 | 
	
		
			
				|  |  | +                return true;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        });
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      @Override
 | 
	
		
			
				|  |  |      public void applyClusterState(ClusterChangedEvent event) {
 | 
	
		
			
				|  |  |          // only act if we are master, otherwise keep idle until elected
 |