|  | @@ -9,19 +9,19 @@ package org.elasticsearch.xpack.downsample;
 | 
	
		
			
				|  |  |  import org.apache.logging.log4j.LogManager;
 | 
	
		
			
				|  |  |  import org.apache.logging.log4j.Logger;
 | 
	
		
			
				|  |  |  import org.elasticsearch.ElasticsearchException;
 | 
	
		
			
				|  |  | +import org.elasticsearch.ResourceAlreadyExistsException;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.ActionListener;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.ActionRequestValidationException;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.admin.cluster.stats.MappingVisitor;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
 | 
	
		
			
				|  |  | -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
 | 
	
		
			
				|  |  | -import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.downsample.DownsampleConfig;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.support.ActionFilters;
 | 
	
		
			
				|  |  | +import org.elasticsearch.action.support.ActiveShardCount;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.support.master.AcknowledgedResponse;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
 | 
	
		
			
				|  |  |  import org.elasticsearch.client.internal.Client;
 | 
	
	
		
			
				|  | @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.SimpleBatchedExecutor;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.block.ClusterBlockException;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.block.ClusterBlockLevel;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.metadata.IndexMetadata;
 | 
	
		
			
				|  |  | +import org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.metadata.Metadata;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
 | 
	
	
		
			
				|  | @@ -40,6 +41,7 @@ import org.elasticsearch.cluster.routing.allocation.allocator.AllocationActionLi
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.service.ClusterService;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.Priority;
 | 
	
		
			
				|  |  | +import org.elasticsearch.common.Strings;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.bytes.BytesReference;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.compress.CompressedXContent;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.inject.Inject;
 | 
	
	
		
			
				|  | @@ -48,6 +50,7 @@ import org.elasticsearch.common.settings.Setting;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.settings.Settings;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.util.concurrent.ThreadContext;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.xcontent.XContentHelper;
 | 
	
		
			
				|  |  | +import org.elasticsearch.core.TimeValue;
 | 
	
		
			
				|  |  |  import org.elasticsearch.core.Tuple;
 | 
	
		
			
				|  |  |  import org.elasticsearch.index.Index;
 | 
	
		
			
				|  |  |  import org.elasticsearch.index.IndexMode;
 | 
	
	
		
			
				|  | @@ -57,7 +60,11 @@ import org.elasticsearch.index.mapper.DateFieldMapper;
 | 
	
		
			
				|  |  |  import org.elasticsearch.index.mapper.MappedFieldType;
 | 
	
		
			
				|  |  |  import org.elasticsearch.index.mapper.MapperService;
 | 
	
		
			
				|  |  |  import org.elasticsearch.index.mapper.TimeSeriesParams;
 | 
	
		
			
				|  |  | +import org.elasticsearch.index.shard.ShardId;
 | 
	
		
			
				|  |  |  import org.elasticsearch.indices.IndicesService;
 | 
	
		
			
				|  |  | +import org.elasticsearch.persistent.PersistentTaskParams;
 | 
	
		
			
				|  |  | +import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 | 
	
		
			
				|  |  | +import org.elasticsearch.persistent.PersistentTasksService;
 | 
	
		
			
				|  |  |  import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
 | 
	
		
			
				|  |  |  import org.elasticsearch.tasks.Task;
 | 
	
		
			
				|  |  |  import org.elasticsearch.tasks.TaskId;
 | 
	
	
		
			
				|  | @@ -69,25 +76,30 @@ import org.elasticsearch.xcontent.XContentType;
 | 
	
		
			
				|  |  |  import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper;
 | 
	
		
			
				|  |  |  import org.elasticsearch.xpack.core.ClientHelper;
 | 
	
		
			
				|  |  |  import org.elasticsearch.xpack.core.downsample.DownsampleAction;
 | 
	
		
			
				|  |  | -import org.elasticsearch.xpack.core.downsample.DownsampleIndexerAction;
 | 
	
		
			
				|  |  |  import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
 | 
	
		
			
				|  |  | +import org.elasticsearch.xpack.core.rollup.action.RollupShardPersistentTaskState;
 | 
	
		
			
				|  |  | +import org.elasticsearch.xpack.core.rollup.action.RollupShardTask;
 | 
	
		
			
				|  |  |  import org.elasticsearch.xpack.core.security.authz.AuthorizationServiceField;
 | 
	
		
			
				|  |  |  import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import java.io.IOException;
 | 
	
		
			
				|  |  | +import java.time.Instant;
 | 
	
		
			
				|  |  |  import java.time.OffsetDateTime;
 | 
	
		
			
				|  |  |  import java.time.format.DateTimeFormatter;
 | 
	
		
			
				|  |  |  import java.util.ArrayList;
 | 
	
		
			
				|  |  |  import java.util.List;
 | 
	
		
			
				|  |  |  import java.util.Map;
 | 
	
		
			
				|  |  |  import java.util.Set;
 | 
	
		
			
				|  |  | +import java.util.concurrent.atomic.AtomicInteger;
 | 
	
		
			
				|  |  | +import java.util.function.Predicate;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_METRIC_PARAM;
 | 
	
		
			
				|  |  | +import static org.elasticsearch.xpack.core.ilm.DownsampleAction.DOWNSAMPLED_INDEX_PREFIX;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /**
 | 
	
		
			
				|  |  | - * The master rollup action that coordinates
 | 
	
		
			
				|  |  | - *  -  creating the rollup index
 | 
	
		
			
				|  |  | - *  -  calling {@link TransportDownsampleIndexerAction} to index rollup documents
 | 
	
		
			
				|  |  | + * The master downsample action that coordinates
 | 
	
		
			
				|  |  | + *  -  creating the downsample index
 | 
	
		
			
				|  |  | + *  -  instantiating {@link RollupShardIndexer}s to index downsample documents
 | 
	
		
			
				|  |  |   *  -  cleaning up state
 | 
	
		
			
				|  |  |   */
 | 
	
		
			
				|  |  |  public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAction<DownsampleAction.Request> {
 | 
	
	
		
			
				|  | @@ -97,10 +109,11 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
 | 
	
		
			
				|  |  |      private final Client client;
 | 
	
		
			
				|  |  |      private final IndicesService indicesService;
 | 
	
		
			
				|  |  |      private final ClusterService clusterService;
 | 
	
		
			
				|  |  | -    private final MasterServiceTaskQueue<RollupClusterStateUpdateTask> taskQueue;
 | 
	
		
			
				|  |  | +    private final MasterServiceTaskQueue<DownsampleClusterStateUpdateTask> taskQueue;
 | 
	
		
			
				|  |  |      private final MetadataCreateIndexService metadataCreateIndexService;
 | 
	
		
			
				|  |  |      private final IndexScopedSettings indexScopedSettings;
 | 
	
		
			
				|  |  |      private final ThreadContext threadContext;
 | 
	
		
			
				|  |  | +    private final PersistentTasksService persistentTasksService;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      private static final Set<String> FORBIDDEN_SETTINGS = Set.of(
 | 
	
		
			
				|  |  |          IndexSettings.DEFAULT_PIPELINE.getKey(),
 | 
	
	
		
			
				|  | @@ -114,15 +127,16 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
 | 
	
		
			
				|  |  |      /**
 | 
	
		
			
				|  |  |       * This is the cluster state task executor for cluster state update actions.
 | 
	
		
			
				|  |  |       */
 | 
	
		
			
				|  |  | -    private static final SimpleBatchedExecutor<RollupClusterStateUpdateTask, Void> STATE_UPDATE_TASK_EXECUTOR =
 | 
	
		
			
				|  |  | +    private static final SimpleBatchedExecutor<DownsampleClusterStateUpdateTask, Void> STATE_UPDATE_TASK_EXECUTOR =
 | 
	
		
			
				|  |  |          new SimpleBatchedExecutor<>() {
 | 
	
		
			
				|  |  |              @Override
 | 
	
		
			
				|  |  | -            public Tuple<ClusterState, Void> executeTask(RollupClusterStateUpdateTask task, ClusterState clusterState) throws Exception {
 | 
	
		
			
				|  |  | +            public Tuple<ClusterState, Void> executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState)
 | 
	
		
			
				|  |  | +                throws Exception {
 | 
	
		
			
				|  |  |                  return Tuple.tuple(task.execute(clusterState), null);
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              @Override
 | 
	
		
			
				|  |  | -            public void taskSucceeded(RollupClusterStateUpdateTask task, Void unused) {
 | 
	
		
			
				|  |  | +            public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) {
 | 
	
		
			
				|  |  |                  task.listener.onResponse(AcknowledgedResponse.TRUE);
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |          };
 | 
	
	
		
			
				|  | @@ -137,7 +151,8 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
 | 
	
		
			
				|  |  |          MetadataCreateIndexService metadataCreateIndexService,
 | 
	
		
			
				|  |  |          ActionFilters actionFilters,
 | 
	
		
			
				|  |  |          IndexNameExpressionResolver indexNameExpressionResolver,
 | 
	
		
			
				|  |  | -        IndexScopedSettings indexScopedSettings
 | 
	
		
			
				|  |  | +        IndexScopedSettings indexScopedSettings,
 | 
	
		
			
				|  |  | +        PersistentTasksService persistentTasksService
 | 
	
		
			
				|  |  |      ) {
 | 
	
		
			
				|  |  |          super(
 | 
	
		
			
				|  |  |              DownsampleAction.NAME,
 | 
	
	
		
			
				|  | @@ -156,6 +171,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
 | 
	
		
			
				|  |  |          this.indexScopedSettings = indexScopedSettings;
 | 
	
		
			
				|  |  |          this.threadContext = threadPool.getThreadContext();
 | 
	
		
			
				|  |  |          this.taskQueue = clusterService.createTaskQueue("rollup", Priority.URGENT, STATE_UPDATE_TASK_EXECUTOR);
 | 
	
		
			
				|  |  | +        this.persistentTasksService = persistentTasksService;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      @Override
 | 
	
	
		
			
				|  | @@ -216,20 +232,36 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
 | 
	
		
			
				|  |  |              return;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        final String rollupIndexName = request.getTargetIndex();
 | 
	
		
			
				|  |  | -        // Assert rollup index does not exist
 | 
	
		
			
				|  |  | -        MetadataCreateIndexService.validateIndexName(rollupIndexName, state);
 | 
	
		
			
				|  |  | +        // Shortcircuit if target index has been downsampled:
 | 
	
		
			
				|  |  | +        final String downsampleIndexName = request.getTargetIndex();
 | 
	
		
			
				|  |  | +        IndexMetadata downsampleIndex = state.getMetadata().index(downsampleIndexName);
 | 
	
		
			
				|  |  | +        if (downsampleIndex != null) {
 | 
	
		
			
				|  |  | +            var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings());
 | 
	
		
			
				|  |  | +            if (downsampleStatus == DownsampleTaskStatus.UNKNOWN) {
 | 
	
		
			
				|  |  | +                // This isn't a downsample index, so fail:
 | 
	
		
			
				|  |  | +                listener.onFailure(new ResourceAlreadyExistsException(downsampleIndex.getIndex()));
 | 
	
		
			
				|  |  | +                return;
 | 
	
		
			
				|  |  | +            } else if (downsampleStatus == DownsampleTaskStatus.SUCCESS) {
 | 
	
		
			
				|  |  | +                listener.onResponse(AcknowledgedResponse.TRUE);
 | 
	
		
			
				|  |  | +                return;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        try {
 | 
	
		
			
				|  |  | +            MetadataCreateIndexService.validateIndexName(downsampleIndexName, state);
 | 
	
		
			
				|  |  | +        } catch (ResourceAlreadyExistsException e) {
 | 
	
		
			
				|  |  | +            // ignore index already exists
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        // Rollup will perform the following tasks:
 | 
	
		
			
				|  |  | +        // Downsample will perform the following tasks:
 | 
	
		
			
				|  |  |          // 1. Extract source index mappings
 | 
	
		
			
				|  |  | -        // 2. Extract rollup config from index mappings
 | 
	
		
			
				|  |  | -        // 3. Create the rollup index
 | 
	
		
			
				|  |  | -        // 4. Run rollup indexer
 | 
	
		
			
				|  |  | -        // 5. Make rollup index read-only and set replicas
 | 
	
		
			
				|  |  | -        // 6. Refresh rollup index
 | 
	
		
			
				|  |  | -        // 7. Mark rollup index as "completed successfully"
 | 
	
		
			
				|  |  | -        // 8. Force-merge the rollup index to a single segment
 | 
	
		
			
				|  |  | -        // At any point if there is an issue, delete the rollup index
 | 
	
		
			
				|  |  | +        // 2. Extract downsample config from index mappings
 | 
	
		
			
				|  |  | +        // 3. Create the downsample index
 | 
	
		
			
				|  |  | +        // 4. Run downsample indexer
 | 
	
		
			
				|  |  | +        // 5. Make downsample index read-only and set replicas
 | 
	
		
			
				|  |  | +        // 6. Refresh downsample index
 | 
	
		
			
				|  |  | +        // 7. Mark downsample index as "completed successfully"
 | 
	
		
			
				|  |  | +        // 8. Force-merge the downsample index to a single segment
 | 
	
		
			
				|  |  | +        // At any point if there is an issue, delete the downsample index
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          // 1. Extract source index mappings
 | 
	
		
			
				|  |  |          final TaskId parentTask = new TaskId(clusterService.localNode().getId(), task.getId());
 | 
	
	
		
			
				|  | @@ -244,7 +276,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
 | 
	
		
			
				|  |  |                  .map(mappingMetadata -> mappingMetadata.getValue().sourceAsMap())
 | 
	
		
			
				|  |  |                  .orElseThrow(() -> new IllegalArgumentException("No mapping found for rollup source index [" + sourceIndexName + "]"));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            // 2. Extract rollup config from index mappings
 | 
	
		
			
				|  |  | +            // 2. Extract downsample config from index mappings
 | 
	
		
			
				|  |  |              final MapperService mapperService = indicesService.createIndexMapperServiceForValidation(sourceIndexMetadata);
 | 
	
		
			
				|  |  |              final CompressedXContent sourceIndexCompressedXContent = new CompressedXContent(sourceIndexMappings);
 | 
	
		
			
				|  |  |              mapperService.merge(MapperService.SINGLE_MAPPING_NAME, sourceIndexCompressedXContent, MapperService.MergeReason.INDEX_TEMPLATE);
 | 
	
	
		
			
				|  | @@ -280,166 +312,205 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              final String mapping;
 | 
	
		
			
				|  |  |              try {
 | 
	
		
			
				|  |  | -                mapping = createRollupIndexMapping(helper, request.getDownsampleConfig(), mapperService, sourceIndexMappings);
 | 
	
		
			
				|  |  | +                mapping = createDownsampleIndexMapping(helper, request.getDownsampleConfig(), mapperService, sourceIndexMappings);
 | 
	
		
			
				|  |  |              } catch (IOException e) {
 | 
	
		
			
				|  |  |                  listener.onFailure(e);
 | 
	
		
			
				|  |  |                  return;
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | -            // 3. Create rollup index
 | 
	
		
			
				|  |  | -            createRollupIndex(rollupIndexName, sourceIndexMetadata, mapping, request, ActionListener.wrap(createIndexResp -> {
 | 
	
		
			
				|  |  | +            // 3. Create downsample index
 | 
	
		
			
				|  |  | +            createDownsampleIndex(downsampleIndexName, sourceIndexMetadata, mapping, request, ActionListener.wrap(createIndexResp -> {
 | 
	
		
			
				|  |  |                  if (createIndexResp.isAcknowledged()) {
 | 
	
		
			
				|  |  | -                    // 3. Rollup index created. Run rollup indexer
 | 
	
		
			
				|  |  | -                    DownsampleIndexerAction.Request rollupIndexerRequest = new DownsampleIndexerAction.Request(
 | 
	
		
			
				|  |  | +                    performShardDownsampling(
 | 
	
		
			
				|  |  |                          request,
 | 
	
		
			
				|  |  | -                        OffsetDateTime.parse(
 | 
	
		
			
				|  |  | -                            sourceIndexMetadata.getSettings().get(IndexSettings.TIME_SERIES_START_TIME.getKey()),
 | 
	
		
			
				|  |  | -                            DateTimeFormatter.ISO_DATE_TIME
 | 
	
		
			
				|  |  | -                        ).toInstant().toEpochMilli(),
 | 
	
		
			
				|  |  | -                        OffsetDateTime.parse(
 | 
	
		
			
				|  |  | -                            sourceIndexMetadata.getSettings().get(IndexSettings.TIME_SERIES_END_TIME.getKey()),
 | 
	
		
			
				|  |  | -                            DateTimeFormatter.ISO_DATE_TIME
 | 
	
		
			
				|  |  | -                        ).toInstant().toEpochMilli(),
 | 
	
		
			
				|  |  | -                        dimensionFields.toArray(new String[0]),
 | 
	
		
			
				|  |  | -                        metricFields.toArray(new String[0]),
 | 
	
		
			
				|  |  | -                        labelFields.toArray(new String[0])
 | 
	
		
			
				|  |  | +                        listener,
 | 
	
		
			
				|  |  | +                        sourceIndexMetadata,
 | 
	
		
			
				|  |  | +                        downsampleIndexName,
 | 
	
		
			
				|  |  | +                        parentTask,
 | 
	
		
			
				|  |  | +                        metricFields,
 | 
	
		
			
				|  |  | +                        labelFields
 | 
	
		
			
				|  |  |                      );
 | 
	
		
			
				|  |  | -                    rollupIndexerRequest.setParentTask(parentTask);
 | 
	
		
			
				|  |  | -                    client.execute(DownsampleIndexerAction.INSTANCE, rollupIndexerRequest, ActionListener.wrap(indexerResp -> {
 | 
	
		
			
				|  |  | -                        if (indexerResp.isCreated()) {
 | 
	
		
			
				|  |  | -                            // 4. Make rollup index read-only and set the correct number of replicas
 | 
	
		
			
				|  |  | -                            final Settings.Builder settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true);
 | 
	
		
			
				|  |  | -                            // Number of replicas had been previously set to 0 to speed up index population
 | 
	
		
			
				|  |  | -                            if (sourceIndexMetadata.getNumberOfReplicas() > 0) {
 | 
	
		
			
				|  |  | -                                settings.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, sourceIndexMetadata.getNumberOfReplicas());
 | 
	
		
			
				|  |  | -                            }
 | 
	
		
			
				|  |  | -                            // Setting index.hidden has been initially set to true. We revert this to the value of the source index
 | 
	
		
			
				|  |  | -                            if (sourceIndexMetadata.isHidden() == false) {
 | 
	
		
			
				|  |  | -                                if (sourceIndexMetadata.getSettings().keySet().contains(IndexMetadata.SETTING_INDEX_HIDDEN)) {
 | 
	
		
			
				|  |  | -                                    settings.put(IndexMetadata.SETTING_INDEX_HIDDEN, false);
 | 
	
		
			
				|  |  | -                                } else {
 | 
	
		
			
				|  |  | -                                    settings.putNull(IndexMetadata.SETTING_INDEX_HIDDEN);
 | 
	
		
			
				|  |  | -                                }
 | 
	
		
			
				|  |  | -                            }
 | 
	
		
			
				|  |  | -                            UpdateSettingsRequest updateSettingsReq = new UpdateSettingsRequest(settings.build(), rollupIndexName);
 | 
	
		
			
				|  |  | -                            updateSettingsReq.setParentTask(parentTask);
 | 
	
		
			
				|  |  | -                            client.admin().indices().updateSettings(updateSettingsReq, ActionListener.wrap(updateSettingsResponse -> {
 | 
	
		
			
				|  |  | -                                if (updateSettingsResponse.isAcknowledged()) {
 | 
	
		
			
				|  |  | -                                    // 5. Refresh rollup index
 | 
	
		
			
				|  |  | -                                    refreshIndex(rollupIndexName, parentTask, ActionListener.wrap(refreshIndexResponse -> {
 | 
	
		
			
				|  |  | -                                        if (refreshIndexResponse.getFailedShards() == 0) {
 | 
	
		
			
				|  |  | -                                            // 6. Mark rollup index as "completed successfully"
 | 
	
		
			
				|  |  | -                                            updateRollupMetadata(rollupIndexName, request, ActionListener.wrap(resp -> {
 | 
	
		
			
				|  |  | -                                                if (resp.isAcknowledged()) {
 | 
	
		
			
				|  |  | -                                                    // 7. Force-merge the rollup index to a single segment
 | 
	
		
			
				|  |  | -                                                    forceMergeIndex(
 | 
	
		
			
				|  |  | -                                                        rollupIndexName,
 | 
	
		
			
				|  |  | -                                                        parentTask,
 | 
	
		
			
				|  |  | -                                                        ActionListener.wrap(
 | 
	
		
			
				|  |  | -                                                            mergeIndexResp -> listener.onResponse(AcknowledgedResponse.TRUE),
 | 
	
		
			
				|  |  | -                                                            e -> {
 | 
	
		
			
				|  |  | -                                                                /*
 | 
	
		
			
				|  |  | -                                                                 * At this point rollup has been created successfully even if
 | 
	
		
			
				|  |  | -                                                                 * force-merge fails. So, we should not fail the rollup operation.
 | 
	
		
			
				|  |  | -                                                                 */
 | 
	
		
			
				|  |  | -                                                                logger.error(
 | 
	
		
			
				|  |  | -                                                                    "Failed to force-merge rollup index [" + rollupIndexName + "]",
 | 
	
		
			
				|  |  | -                                                                    e
 | 
	
		
			
				|  |  | -                                                                );
 | 
	
		
			
				|  |  | -                                                                listener.onResponse(AcknowledgedResponse.TRUE);
 | 
	
		
			
				|  |  | -                                                            }
 | 
	
		
			
				|  |  | -                                                        )
 | 
	
		
			
				|  |  | -                                                    );
 | 
	
		
			
				|  |  | -                                                } else {
 | 
	
		
			
				|  |  | -                                                    deleteRollupIndex(
 | 
	
		
			
				|  |  | -                                                        sourceIndexName,
 | 
	
		
			
				|  |  | -                                                        rollupIndexName,
 | 
	
		
			
				|  |  | -                                                        parentTask,
 | 
	
		
			
				|  |  | -                                                        listener,
 | 
	
		
			
				|  |  | -                                                        new ElasticsearchException(
 | 
	
		
			
				|  |  | -                                                            "Failed to publish new cluster state with rollup metadata"
 | 
	
		
			
				|  |  | -                                                        )
 | 
	
		
			
				|  |  | -                                                    );
 | 
	
		
			
				|  |  | -                                                }
 | 
	
		
			
				|  |  | -                                            },
 | 
	
		
			
				|  |  | -                                                e -> deleteRollupIndex(
 | 
	
		
			
				|  |  | -                                                    sourceIndexName,
 | 
	
		
			
				|  |  | -                                                    rollupIndexName,
 | 
	
		
			
				|  |  | -                                                    parentTask,
 | 
	
		
			
				|  |  | -                                                    listener,
 | 
	
		
			
				|  |  | -                                                    new ElasticsearchException(
 | 
	
		
			
				|  |  | -                                                        "Failed to publish new cluster state with rollup metadata",
 | 
	
		
			
				|  |  | -                                                        e
 | 
	
		
			
				|  |  | -                                                    )
 | 
	
		
			
				|  |  | -                                                )
 | 
	
		
			
				|  |  | -                                            ));
 | 
	
		
			
				|  |  | -                                        } else {
 | 
	
		
			
				|  |  | -                                            deleteRollupIndex(
 | 
	
		
			
				|  |  | -                                                sourceIndexName,
 | 
	
		
			
				|  |  | -                                                rollupIndexName,
 | 
	
		
			
				|  |  | -                                                parentTask,
 | 
	
		
			
				|  |  | -                                                listener,
 | 
	
		
			
				|  |  | -                                                new ElasticsearchException("Failed to refresh rollup index [" + rollupIndexName + "]")
 | 
	
		
			
				|  |  | -                                            );
 | 
	
		
			
				|  |  | -                                        }
 | 
	
		
			
				|  |  | -                                    },
 | 
	
		
			
				|  |  | -                                        e -> deleteRollupIndex(
 | 
	
		
			
				|  |  | -                                            sourceIndexName,
 | 
	
		
			
				|  |  | -                                            rollupIndexName,
 | 
	
		
			
				|  |  | -                                            parentTask,
 | 
	
		
			
				|  |  | -                                            listener,
 | 
	
		
			
				|  |  | -                                            new ElasticsearchException("Failed to refresh rollup index [" + rollupIndexName + "]", e)
 | 
	
		
			
				|  |  | -                                        )
 | 
	
		
			
				|  |  | -                                    ));
 | 
	
		
			
				|  |  | -                                } else {
 | 
	
		
			
				|  |  | -                                    deleteRollupIndex(
 | 
	
		
			
				|  |  | -                                        sourceIndexName,
 | 
	
		
			
				|  |  | -                                        rollupIndexName,
 | 
	
		
			
				|  |  | -                                        parentTask,
 | 
	
		
			
				|  |  | -                                        listener,
 | 
	
		
			
				|  |  | -                                        new ElasticsearchException("Unable to update settings of rollup index [" + rollupIndexName + "]")
 | 
	
		
			
				|  |  | -                                    );
 | 
	
		
			
				|  |  | -                                }
 | 
	
		
			
				|  |  | -                            },
 | 
	
		
			
				|  |  | -                                e -> deleteRollupIndex(
 | 
	
		
			
				|  |  | -                                    sourceIndexName,
 | 
	
		
			
				|  |  | -                                    rollupIndexName,
 | 
	
		
			
				|  |  | -                                    parentTask,
 | 
	
		
			
				|  |  | -                                    listener,
 | 
	
		
			
				|  |  | -                                    new ElasticsearchException("Unable to update settings of rollup index [" + rollupIndexName + "]", e)
 | 
	
		
			
				|  |  | -                                )
 | 
	
		
			
				|  |  | -                            ));
 | 
	
		
			
				|  |  | -                        } else {
 | 
	
		
			
				|  |  | -                            deleteRollupIndex(
 | 
	
		
			
				|  |  | -                                sourceIndexName,
 | 
	
		
			
				|  |  | -                                rollupIndexName,
 | 
	
		
			
				|  |  | -                                parentTask,
 | 
	
		
			
				|  |  | -                                listener,
 | 
	
		
			
				|  |  | -                                new ElasticsearchException("Unable to index into rollup index [" + rollupIndexName + "]")
 | 
	
		
			
				|  |  | -                            );
 | 
	
		
			
				|  |  | -                        }
 | 
	
		
			
				|  |  | -                    }, e -> deleteRollupIndex(sourceIndexName, rollupIndexName, parentTask, listener, e)));
 | 
	
		
			
				|  |  |                  } else {
 | 
	
		
			
				|  |  | -                    listener.onFailure(new ElasticsearchException("Failed to create rollup index [" + rollupIndexName + "]"));
 | 
	
		
			
				|  |  | +                    listener.onFailure(new ElasticsearchException("Failed to create rollup index [" + downsampleIndexName + "]"));
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  | -            }, listener::onFailure));
 | 
	
		
			
				|  |  | +            }, e -> {
 | 
	
		
			
				|  |  | +                if (e instanceof ResourceAlreadyExistsException) {
 | 
	
		
			
				|  |  | +                    performShardDownsampling(
 | 
	
		
			
				|  |  | +                        request,
 | 
	
		
			
				|  |  | +                        listener,
 | 
	
		
			
				|  |  | +                        sourceIndexMetadata,
 | 
	
		
			
				|  |  | +                        downsampleIndexName,
 | 
	
		
			
				|  |  | +                        parentTask,
 | 
	
		
			
				|  |  | +                        metricFields,
 | 
	
		
			
				|  |  | +                        labelFields
 | 
	
		
			
				|  |  | +                    );
 | 
	
		
			
				|  |  | +                } else {
 | 
	
		
			
				|  |  | +                    listener.onFailure(e);
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            }));
 | 
	
		
			
				|  |  |          }, listener::onFailure));
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    // 3. downsample index created or already exist (in case of retry). Run downsample indexer persistent task on each shard.
 | 
	
		
			
				|  |  | +    private void performShardDownsampling(
 | 
	
		
			
				|  |  | +        DownsampleAction.Request request,
 | 
	
		
			
				|  |  | +        ActionListener<AcknowledgedResponse> listener,
 | 
	
		
			
				|  |  | +        IndexMetadata sourceIndexMetadata,
 | 
	
		
			
				|  |  | +        String downsampleIndexName,
 | 
	
		
			
				|  |  | +        TaskId parentTask,
 | 
	
		
			
				|  |  | +        List<String> metricFields,
 | 
	
		
			
				|  |  | +        List<String> labelFields
 | 
	
		
			
				|  |  | +    ) {
 | 
	
		
			
				|  |  | +        final int numberOfShards = sourceIndexMetadata.getNumberOfShards();
 | 
	
		
			
				|  |  | +        final Index sourceIndex = sourceIndexMetadata.getIndex();
 | 
	
		
			
				|  |  | +        // NOTE: before we set the number of replicas to 0, as a result here we are
 | 
	
		
			
				|  |  | +        // only dealing with primary shards.
 | 
	
		
			
				|  |  | +        final AtomicInteger countDown = new AtomicInteger(numberOfShards);
 | 
	
		
			
				|  |  | +        for (int shardNum = 0; shardNum < numberOfShards; shardNum++) {
 | 
	
		
			
				|  |  | +            final ShardId shardId = new ShardId(sourceIndex, shardNum);
 | 
	
		
			
				|  |  | +            final String persistentTaskId = createPersistentTaskId(
 | 
	
		
			
				|  |  | +                downsampleIndexName,
 | 
	
		
			
				|  |  | +                shardId,
 | 
	
		
			
				|  |  | +                request.getDownsampleConfig().getInterval()
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +            final DownsampleShardTaskParams params = createPersistentTaskParams(
 | 
	
		
			
				|  |  | +                request.getDownsampleConfig(),
 | 
	
		
			
				|  |  | +                sourceIndexMetadata,
 | 
	
		
			
				|  |  | +                downsampleIndexName,
 | 
	
		
			
				|  |  | +                metricFields,
 | 
	
		
			
				|  |  | +                labelFields,
 | 
	
		
			
				|  |  | +                shardId
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +            Predicate<PersistentTasksCustomMetadata.PersistentTask<?>> predicate = runningTask -> {
 | 
	
		
			
				|  |  | +                if (runningTask == null) {
 | 
	
		
			
				|  |  | +                    // NOTE: don't need to wait if the persistent task completed and was removed
 | 
	
		
			
				|  |  | +                    return true;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +                RollupShardPersistentTaskState runningPersistentTaskState = (RollupShardPersistentTaskState) runningTask.getState();
 | 
	
		
			
				|  |  | +                return runningPersistentTaskState != null && runningPersistentTaskState.done();
 | 
	
		
			
				|  |  | +            };
 | 
	
		
			
				|  |  | +            var taskListener = new PersistentTasksService.WaitForPersistentTaskListener<>() {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                @Override
 | 
	
		
			
				|  |  | +                public void onResponse(PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> persistentTask) {
 | 
	
		
			
				|  |  | +                    logger.info("Downsampling task [" + persistentTaskId + " completed for shard " + params.shardId());
 | 
	
		
			
				|  |  | +                    if (countDown.decrementAndGet() == 0) {
 | 
	
		
			
				|  |  | +                        logger.info("All downsampling tasks completed [" + numberOfShards + "]");
 | 
	
		
			
				|  |  | +                        updateTargetIndexSettingStep(request, listener, sourceIndexMetadata, downsampleIndexName, parentTask);
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                @Override
 | 
	
		
			
				|  |  | +                public void onFailure(Exception e) {
 | 
	
		
			
				|  |  | +                    logger.error("error while waiting for downsampling persistent task", e);
 | 
	
		
			
				|  |  | +                    listener.onFailure(e);
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            };
 | 
	
		
			
				|  |  | +            persistentTasksService.sendStartRequest(
 | 
	
		
			
				|  |  | +                persistentTaskId,
 | 
	
		
			
				|  |  | +                RollupShardTask.TASK_NAME,
 | 
	
		
			
				|  |  | +                params,
 | 
	
		
			
				|  |  | +                ActionListener.wrap(
 | 
	
		
			
				|  |  | +                    startedTask -> persistentTasksService.waitForPersistentTaskCondition(
 | 
	
		
			
				|  |  | +                        startedTask.getId(),
 | 
	
		
			
				|  |  | +                        predicate,
 | 
	
		
			
				|  |  | +                        request.getWaitTimeout(),
 | 
	
		
			
				|  |  | +                        taskListener
 | 
	
		
			
				|  |  | +                    ),
 | 
	
		
			
				|  |  | +                    e -> {
 | 
	
		
			
				|  |  | +                        if (e instanceof ResourceAlreadyExistsException) {
 | 
	
		
			
				|  |  | +                            logger.info("Task [" + persistentTaskId + "] already exists. Waiting.");
 | 
	
		
			
				|  |  | +                            persistentTasksService.waitForPersistentTaskCondition(
 | 
	
		
			
				|  |  | +                                persistentTaskId,
 | 
	
		
			
				|  |  | +                                predicate,
 | 
	
		
			
				|  |  | +                                request.getWaitTimeout(),
 | 
	
		
			
				|  |  | +                                taskListener
 | 
	
		
			
				|  |  | +                            );
 | 
	
		
			
				|  |  | +                        } else {
 | 
	
		
			
				|  |  | +                            listener.onFailure(new ElasticsearchException("Task [" + persistentTaskId + "] failed starting", e));
 | 
	
		
			
				|  |  | +                        }
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                )
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    // 4. Make downsample index read-only and set the correct number of replicas
 | 
	
		
			
				|  |  | +    private void updateTargetIndexSettingStep(
 | 
	
		
			
				|  |  | +        final DownsampleAction.Request request,
 | 
	
		
			
				|  |  | +        final ActionListener<AcknowledgedResponse> listener,
 | 
	
		
			
				|  |  | +        final IndexMetadata sourceIndexMetadata,
 | 
	
		
			
				|  |  | +        final String downsampleIndexName,
 | 
	
		
			
				|  |  | +        final TaskId parentTask
 | 
	
		
			
				|  |  | +    ) {
 | 
	
		
			
				|  |  | +        // 4. Make downsample index read-only and set the correct number of replicas
 | 
	
		
			
				|  |  | +        final Settings.Builder settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true);
 | 
	
		
			
				|  |  | +        // Number of replicas had been previously set to 0 to speed up index population
 | 
	
		
			
				|  |  | +        if (sourceIndexMetadata.getNumberOfReplicas() > 0) {
 | 
	
		
			
				|  |  | +            settings.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, sourceIndexMetadata.getNumberOfReplicas());
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        // Setting index.hidden has been initially set to true. We revert this to the value of the
 | 
	
		
			
				|  |  | +        // source index
 | 
	
		
			
				|  |  | +        if (sourceIndexMetadata.isHidden() == false) {
 | 
	
		
			
				|  |  | +            if (sourceIndexMetadata.getSettings().keySet().contains(IndexMetadata.SETTING_INDEX_HIDDEN)) {
 | 
	
		
			
				|  |  | +                settings.put(IndexMetadata.SETTING_INDEX_HIDDEN, false);
 | 
	
		
			
				|  |  | +            } else {
 | 
	
		
			
				|  |  | +                settings.putNull(IndexMetadata.SETTING_INDEX_HIDDEN);
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        UpdateSettingsRequest updateSettingsReq = new UpdateSettingsRequest(settings.build(), downsampleIndexName);
 | 
	
		
			
				|  |  | +        updateSettingsReq.setParentTask(parentTask);
 | 
	
		
			
				|  |  | +        client.admin()
 | 
	
		
			
				|  |  | +            .indices()
 | 
	
		
			
				|  |  | +            .updateSettings(
 | 
	
		
			
				|  |  | +                updateSettingsReq,
 | 
	
		
			
				|  |  | +                new UpdateDownsampleIndexSettingsActionListener(listener, parentTask, downsampleIndexName, request.getWaitTimeout())
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    private static DownsampleShardTaskParams createPersistentTaskParams(
 | 
	
		
			
				|  |  | +        final DownsampleConfig downsampleConfig,
 | 
	
		
			
				|  |  | +        final IndexMetadata sourceIndexMetadata,
 | 
	
		
			
				|  |  | +        final String targetIndexName,
 | 
	
		
			
				|  |  | +        final List<String> metricFields,
 | 
	
		
			
				|  |  | +        final List<String> labelFields,
 | 
	
		
			
				|  |  | +        final ShardId shardId
 | 
	
		
			
				|  |  | +    ) {
 | 
	
		
			
				|  |  | +        return new DownsampleShardTaskParams(
 | 
	
		
			
				|  |  | +            downsampleConfig,
 | 
	
		
			
				|  |  | +            targetIndexName,
 | 
	
		
			
				|  |  | +            parseTimestamp(sourceIndexMetadata, IndexSettings.TIME_SERIES_START_TIME),
 | 
	
		
			
				|  |  | +            parseTimestamp(sourceIndexMetadata, IndexSettings.TIME_SERIES_END_TIME),
 | 
	
		
			
				|  |  | +            shardId,
 | 
	
		
			
				|  |  | +            metricFields.toArray(new String[0]),
 | 
	
		
			
				|  |  | +            labelFields.toArray(new String[0])
 | 
	
		
			
				|  |  | +        );
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    private static long parseTimestamp(final IndexMetadata sourceIndexMetadata, final Setting<Instant> timestampSetting) {
 | 
	
		
			
				|  |  | +        return OffsetDateTime.parse(sourceIndexMetadata.getSettings().get(timestampSetting.getKey()), DateTimeFormatter.ISO_DATE_TIME)
 | 
	
		
			
				|  |  | +            .toInstant()
 | 
	
		
			
				|  |  | +            .toEpochMilli();
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    private static String createPersistentTaskId(final String targetIndex, final ShardId shardId, final DateHistogramInterval interval) {
 | 
	
		
			
				|  |  | +        return DOWNSAMPLED_INDEX_PREFIX + targetIndex + "-" + shardId.id() + "-" + interval;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      @Override
 | 
	
		
			
				|  |  |      protected ClusterBlockException checkBlock(DownsampleAction.Request request, ClusterState state) {
 | 
	
		
			
				|  |  |          return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      /**
 | 
	
		
			
				|  |  | -     * This method creates the mapping for the rollup index, based on the
 | 
	
		
			
				|  |  | +     * This method creates the mapping for the downsample index, based on the
 | 
	
		
			
				|  |  |       * mapping (dimensions and metrics) from the source index, as well as the
 | 
	
		
			
				|  |  | -     * rollup configuration.
 | 
	
		
			
				|  |  | +     * downsample configuration.
 | 
	
		
			
				|  |  |       *
 | 
	
		
			
				|  |  | -     * @param config the rollup configuration
 | 
	
		
			
				|  |  | +     * @param config the downsample configuration
 | 
	
		
			
				|  |  |       * @param sourceIndexMappings a map with the source index mapping
 | 
	
		
			
				|  |  | -     * @return the mapping of the rollup index
 | 
	
		
			
				|  |  | +     * @return the mapping of the downsample index
 | 
	
		
			
				|  |  |       */
 | 
	
		
			
				|  |  | -    public static String createRollupIndexMapping(
 | 
	
		
			
				|  |  | +    public static String createDownsampleIndexMapping(
 | 
	
		
			
				|  |  |          final TimeseriesFieldTypeHelper helper,
 | 
	
		
			
				|  |  |          final DownsampleConfig config,
 | 
	
		
			
				|  |  |          final MapperService mapperService,
 | 
	
	
		
			
				|  | @@ -457,10 +528,10 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
 | 
	
		
			
				|  |  |          builder.endObject(); // match initial startObject
 | 
	
		
			
				|  |  |          builder.endObject(); // match startObject("properties")
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        final CompressedXContent rollupDiffXContent = CompressedXContent.fromJSON(
 | 
	
		
			
				|  |  | +        final CompressedXContent mappingDiffXContent = CompressedXContent.fromJSON(
 | 
	
		
			
				|  |  |              XContentHelper.convertToJson(BytesReference.bytes(builder), false, XContentType.JSON)
 | 
	
		
			
				|  |  |          );
 | 
	
		
			
				|  |  | -        return mapperService.merge(MapperService.SINGLE_MAPPING_NAME, rollupDiffXContent, MapperService.MergeReason.INDEX_TEMPLATE)
 | 
	
		
			
				|  |  | +        return mapperService.merge(MapperService.SINGLE_MAPPING_NAME, mappingDiffXContent, MapperService.MergeReason.INDEX_TEMPLATE)
 | 
	
		
			
				|  |  |              .mappingSource()
 | 
	
		
			
				|  |  |              .uncompressed()
 | 
	
		
			
				|  |  |              .utf8ToString();
 | 
	
	
		
			
				|  | @@ -572,23 +643,23 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      /**
 | 
	
		
			
				|  |  | -     * Copy index settings from the source index to the rollup index. Settings that
 | 
	
		
			
				|  |  | -     * have already been set in the rollup index will not be overridden.
 | 
	
		
			
				|  |  | +     * Copy index settings from the source index to the downsample index. Settings that
 | 
	
		
			
				|  |  | +     * have already been set in the downsample index will not be overridden.
 | 
	
		
			
				|  |  |       */
 | 
	
		
			
				|  |  |      static IndexMetadata.Builder copyIndexMetadata(
 | 
	
		
			
				|  |  |          final IndexMetadata sourceIndexMetadata,
 | 
	
		
			
				|  |  | -        final IndexMetadata rollupIndexMetadata,
 | 
	
		
			
				|  |  | +        final IndexMetadata downsampleIndexMetadata,
 | 
	
		
			
				|  |  |          final IndexScopedSettings indexScopedSettings
 | 
	
		
			
				|  |  |      ) {
 | 
	
		
			
				|  |  |          // Copy index settings from the source index, but do not override the settings
 | 
	
		
			
				|  |  | -        // that already have been set in the rollup index
 | 
	
		
			
				|  |  | -        final Settings.Builder targetSettings = Settings.builder().put(rollupIndexMetadata.getSettings());
 | 
	
		
			
				|  |  | +        // that already have been set in the downsample index
 | 
	
		
			
				|  |  | +        final Settings.Builder targetSettings = Settings.builder().put(downsampleIndexMetadata.getSettings());
 | 
	
		
			
				|  |  |          for (final String key : sourceIndexMetadata.getSettings().keySet()) {
 | 
	
		
			
				|  |  |              final Setting<?> setting = indexScopedSettings.get(key);
 | 
	
		
			
				|  |  |              if (setting == null) {
 | 
	
		
			
				|  |  |                  assert indexScopedSettings.isPrivateSetting(key) : "expected [" + key + "] to be private but it was not";
 | 
	
		
			
				|  |  |              } else if (setting.getProperties().contains(Setting.Property.NotCopyableOnResize)) {
 | 
	
		
			
				|  |  | -                // we leverage the NotCopyableOnResize setting property for rollup, because
 | 
	
		
			
				|  |  | +                // we leverage the NotCopyableOnResize setting property for downsample, because
 | 
	
		
			
				|  |  |                  // the same rules with resize apply
 | 
	
		
			
				|  |  |                  continue;
 | 
	
		
			
				|  |  |              }
 | 
	
	
		
			
				|  | @@ -600,7 +671,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
 | 
	
		
			
				|  |  |              if (OVERRIDE_SETTINGS.contains(key)) {
 | 
	
		
			
				|  |  |                  targetSettings.put(key, sourceIndexMetadata.getSettings().get(key));
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | -            // Do not override settings that have already been set in the rollup index.
 | 
	
		
			
				|  |  | +            // Do not override settings that have already been set in the downsample index.
 | 
	
		
			
				|  |  |              if (targetSettings.keys().contains(key)) {
 | 
	
		
			
				|  |  |                  continue;
 | 
	
		
			
				|  |  |              }
 | 
	
	
		
			
				|  | @@ -609,8 +680,8 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          /*
 | 
	
		
			
				|  |  | -         * Add the source index name and UUID to the rollup index metadata.
 | 
	
		
			
				|  |  | -         * If the source index is a rollup index, we will add the name and UUID
 | 
	
		
			
				|  |  | +         * Add the source index name and UUID to the downsample index metadata.
 | 
	
		
			
				|  |  | +         * If the source index is a downsample index, we will add the name and UUID
 | 
	
		
			
				|  |  |           * of the first index that we initially rolled up.
 | 
	
		
			
				|  |  |           */
 | 
	
		
			
				|  |  |          if (IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_UUID.exists(sourceIndexMetadata.getSettings()) == false
 | 
	
	
		
			
				|  | @@ -620,7 +691,7 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
 | 
	
		
			
				|  |  |                  .put(IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_UUID.getKey(), sourceIndex.getUUID());
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        return IndexMetadata.builder(rollupIndexMetadata).settings(targetSettings);
 | 
	
		
			
				|  |  | +        return IndexMetadata.builder(downsampleIndexMetadata).settings(targetSettings);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      /**
 | 
	
	
		
			
				|  | @@ -639,28 +710,28 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
 | 
	
		
			
				|  |  |              .endArray();
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    private void createRollupIndex(
 | 
	
		
			
				|  |  | -        String rollupIndexName,
 | 
	
		
			
				|  |  | +    private void createDownsampleIndex(
 | 
	
		
			
				|  |  | +        String downsampleIndexName,
 | 
	
		
			
				|  |  |          IndexMetadata sourceIndexMetadata,
 | 
	
		
			
				|  |  |          String mapping,
 | 
	
		
			
				|  |  |          DownsampleAction.Request request,
 | 
	
		
			
				|  |  |          ActionListener<AcknowledgedResponse> listener
 | 
	
		
			
				|  |  |      ) {
 | 
	
		
			
				|  |  |          /*
 | 
	
		
			
				|  |  | -         * When creating the rollup index, we copy the index.number_of_shards from source index,
 | 
	
		
			
				|  |  | +         * When creating the downsample index, we copy the index.number_of_shards from source index,
 | 
	
		
			
				|  |  |           * and we set the index.number_of_replicas to 0, to avoid replicating the index being built.
 | 
	
		
			
				|  |  |           * Also, we set the index.refresh_interval to -1.
 | 
	
		
			
				|  |  |           * We will set the correct number of replicas and refresh the index later.
 | 
	
		
			
				|  |  |           *
 | 
	
		
			
				|  |  | -         * We should note that there is a risk of losing a node during the rollup process. In this
 | 
	
		
			
				|  |  | -         * case rollup will fail.
 | 
	
		
			
				|  |  | +         * We should note that there is a risk of losing a node during the downsample process. In this
 | 
	
		
			
				|  |  | +         * case downsample will fail.
 | 
	
		
			
				|  |  |           */
 | 
	
		
			
				|  |  |          Settings.Builder builder = Settings.builder()
 | 
	
		
			
				|  |  |              .put(IndexMetadata.SETTING_INDEX_HIDDEN, true)
 | 
	
		
			
				|  |  |              .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, sourceIndexMetadata.getNumberOfShards())
 | 
	
		
			
				|  |  |              .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
 | 
	
		
			
				|  |  |              .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1")
 | 
	
		
			
				|  |  | -            .put(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey(), IndexMetadata.DownsampleTaskStatus.STARTED);
 | 
	
		
			
				|  |  | +            .put(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey(), DownsampleTaskStatus.STARTED);
 | 
	
		
			
				|  |  |          if (sourceIndexMetadata.getSettings().hasValue(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey())) {
 | 
	
		
			
				|  |  |              builder.put(
 | 
	
		
			
				|  |  |                  MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(),
 | 
	
	
		
			
				|  | @@ -669,110 +740,190 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          CreateIndexClusterStateUpdateRequest createIndexClusterStateUpdateRequest = new CreateIndexClusterStateUpdateRequest(
 | 
	
		
			
				|  |  | -            "rollup",
 | 
	
		
			
				|  |  | -            rollupIndexName,
 | 
	
		
			
				|  |  | -            rollupIndexName
 | 
	
		
			
				|  |  | -        ).settings(builder.build()).mappings(mapping);
 | 
	
		
			
				|  |  | +            "downsample",
 | 
	
		
			
				|  |  | +            downsampleIndexName,
 | 
	
		
			
				|  |  | +            downsampleIndexName
 | 
	
		
			
				|  |  | +        ).settings(builder.build()).mappings(mapping).waitForActiveShards(ActiveShardCount.ONE);
 | 
	
		
			
				|  |  |          var delegate = new AllocationActionListener<>(listener, threadPool.getThreadContext());
 | 
	
		
			
				|  |  | -        taskQueue.submitTask("create-rollup-index [" + rollupIndexName + "]", new RollupClusterStateUpdateTask(listener) {
 | 
	
		
			
				|  |  | +        taskQueue.submitTask("create-downsample-index [" + downsampleIndexName + "]", new DownsampleClusterStateUpdateTask(listener) {
 | 
	
		
			
				|  |  |              @Override
 | 
	
		
			
				|  |  |              public ClusterState execute(ClusterState currentState) throws Exception {
 | 
	
		
			
				|  |  |                  return metadataCreateIndexService.applyCreateIndexRequest(
 | 
	
		
			
				|  |  |                      currentState,
 | 
	
		
			
				|  |  |                      createIndexClusterStateUpdateRequest,
 | 
	
		
			
				|  |  |                      true,
 | 
	
		
			
				|  |  | -                    // Copy index metadata from source index to rollup index
 | 
	
		
			
				|  |  | -                    (builder, rollupIndexMetadata) -> builder.put(
 | 
	
		
			
				|  |  | -                        copyIndexMetadata(sourceIndexMetadata, rollupIndexMetadata, indexScopedSettings)
 | 
	
		
			
				|  |  | -                    ),
 | 
	
		
			
				|  |  | +                    // Copy index metadata from source index to downsample index
 | 
	
		
			
				|  |  | +                    (builder, indexMetadata) -> builder.put(copyIndexMetadata(sourceIndexMetadata, indexMetadata, indexScopedSettings)),
 | 
	
		
			
				|  |  |                      delegate.reroute()
 | 
	
		
			
				|  |  |                  );
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |          }, request.masterNodeTimeout());
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    private void updateRollupMetadata(
 | 
	
		
			
				|  |  | -        String rollupIndexName,
 | 
	
		
			
				|  |  | -        DownsampleAction.Request request,
 | 
	
		
			
				|  |  | -        ActionListener<AcknowledgedResponse> listener
 | 
	
		
			
				|  |  | -    ) {
 | 
	
		
			
				|  |  | -        // 6. Mark rollup index as "completed successfully" ("index.rollup.status": "success")
 | 
	
		
			
				|  |  | -        taskQueue.submitTask("update-rollup-metadata [" + rollupIndexName + "]", new RollupClusterStateUpdateTask(listener) {
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * A specialized cluster state update task that always takes a listener handling an
 | 
	
		
			
				|  |  | +     * AcknowledgedResponse, as all template actions have simple acknowledged yes/no responses.
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    abstract static class DownsampleClusterStateUpdateTask implements ClusterStateTaskListener {
 | 
	
		
			
				|  |  | +        final ActionListener<AcknowledgedResponse> listener;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            @Override
 | 
	
		
			
				|  |  | -            public ClusterState execute(ClusterState currentState) {
 | 
	
		
			
				|  |  | -                Metadata metadata = currentState.metadata();
 | 
	
		
			
				|  |  | -                Metadata.Builder metadataBuilder = Metadata.builder(metadata);
 | 
	
		
			
				|  |  | -                Index rollupIndex = metadata.index(rollupIndexName).getIndex();
 | 
	
		
			
				|  |  | -                IndexMetadata rollupIndexMetadata = metadata.index(rollupIndex);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -                metadataBuilder.updateSettings(
 | 
	
		
			
				|  |  | -                    Settings.builder()
 | 
	
		
			
				|  |  | -                        .put(rollupIndexMetadata.getSettings())
 | 
	
		
			
				|  |  | -                        .put(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey(), IndexMetadata.DownsampleTaskStatus.SUCCESS)
 | 
	
		
			
				|  |  | -                        .build(),
 | 
	
		
			
				|  |  | -                    rollupIndexName
 | 
	
		
			
				|  |  | -                );
 | 
	
		
			
				|  |  | -                return ClusterState.builder(currentState).metadata(metadataBuilder.build()).build();
 | 
	
		
			
				|  |  | -            }
 | 
	
		
			
				|  |  | -        }, request.masterNodeTimeout());
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | +        DownsampleClusterStateUpdateTask(ActionListener<AcknowledgedResponse> listener) {
 | 
	
		
			
				|  |  | +            this.listener = listener;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    private void refreshIndex(String index, TaskId parentTask, ActionListener<RefreshResponse> listener) {
 | 
	
		
			
				|  |  | -        RefreshRequest request = new RefreshRequest(index);
 | 
	
		
			
				|  |  | -        request.setParentTask(parentTask);
 | 
	
		
			
				|  |  | -        client.admin().indices().refresh(request, listener);
 | 
	
		
			
				|  |  | +        public abstract ClusterState execute(ClusterState currentState) throws Exception;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        @Override
 | 
	
		
			
				|  |  | +        public void onFailure(Exception e) {
 | 
	
		
			
				|  |  | +            listener.onFailure(e);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    private void forceMergeIndex(String index, TaskId parentTask, ActionListener<ForceMergeResponse> listener) {
 | 
	
		
			
				|  |  | -        ForceMergeRequest request = new ForceMergeRequest(index);
 | 
	
		
			
				|  |  | -        request.maxNumSegments(1);
 | 
	
		
			
				|  |  | -        request.setParentTask(parentTask);
 | 
	
		
			
				|  |  | -        client.admin().indices().forceMerge(request, listener);
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * Refreshes the downsample target index
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    class UpdateDownsampleIndexSettingsActionListener implements ActionListener<AcknowledgedResponse> {
 | 
	
		
			
				|  |  | +        final ActionListener<AcknowledgedResponse> listener;
 | 
	
		
			
				|  |  | +        final TaskId parentTask;
 | 
	
		
			
				|  |  | +        final String downsampleIndexName;
 | 
	
		
			
				|  |  | +        final TimeValue timeout;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        UpdateDownsampleIndexSettingsActionListener(
 | 
	
		
			
				|  |  | +            final ActionListener<AcknowledgedResponse> listener,
 | 
	
		
			
				|  |  | +            final TaskId parentTask,
 | 
	
		
			
				|  |  | +            final String downsampleIndexName,
 | 
	
		
			
				|  |  | +            final TimeValue timeout
 | 
	
		
			
				|  |  | +        ) {
 | 
	
		
			
				|  |  | +            this.listener = listener;
 | 
	
		
			
				|  |  | +            this.parentTask = parentTask;
 | 
	
		
			
				|  |  | +            this.downsampleIndexName = downsampleIndexName;
 | 
	
		
			
				|  |  | +            this.timeout = timeout;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        @Override
 | 
	
		
			
				|  |  | +        public void onResponse(final AcknowledgedResponse response) {
 | 
	
		
			
				|  |  | +            final RefreshRequest request = new RefreshRequest(downsampleIndexName);
 | 
	
		
			
				|  |  | +            request.setParentTask(parentTask);
 | 
	
		
			
				|  |  | +            client.admin()
 | 
	
		
			
				|  |  | +                .indices()
 | 
	
		
			
				|  |  | +                .refresh(request, new RefreshDownsampleIndexActionListener(listener, parentTask, downsampleIndexName, timeout));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        @Override
 | 
	
		
			
				|  |  | +        public void onFailure(Exception e) {
 | 
	
		
			
				|  |  | +            listener.onFailure(e);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    private void deleteRollupIndex(
 | 
	
		
			
				|  |  | -        String sourceIndex,
 | 
	
		
			
				|  |  | -        String rollupIndex,
 | 
	
		
			
				|  |  | -        TaskId parentTask,
 | 
	
		
			
				|  |  | -        ActionListener<AcknowledgedResponse> listener,
 | 
	
		
			
				|  |  | -        Exception e
 | 
	
		
			
				|  |  | -    ) {
 | 
	
		
			
				|  |  | -        DeleteIndexRequest request = new DeleteIndexRequest(rollupIndex);
 | 
	
		
			
				|  |  | -        request.setParentTask(parentTask);
 | 
	
		
			
				|  |  | -        client.admin().indices().delete(request, new ActionListener<>() {
 | 
	
		
			
				|  |  | -            @Override
 | 
	
		
			
				|  |  | -            public void onResponse(AcknowledgedResponse acknowledgedResponse) {
 | 
	
		
			
				|  |  | -                if (e == null && acknowledgedResponse.isAcknowledged()) {
 | 
	
		
			
				|  |  | -                    listener.onResponse(acknowledgedResponse);
 | 
	
		
			
				|  |  | -                } else {
 | 
	
		
			
				|  |  | -                    listener.onFailure(new ElasticsearchException("Unable to rollup index [" + sourceIndex + "]", e));
 | 
	
		
			
				|  |  | -                }
 | 
	
		
			
				|  |  | -            }
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * Updates the downsample target index metadata (task status)
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    class RefreshDownsampleIndexActionListener implements ActionListener<RefreshResponse> {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        private final ActionListener<AcknowledgedResponse> actionListener;
 | 
	
		
			
				|  |  | +        private final TaskId parentTask;
 | 
	
		
			
				|  |  | +        private final String downsampleIndexName;
 | 
	
		
			
				|  |  | +        private final TimeValue timeout;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        RefreshDownsampleIndexActionListener(
 | 
	
		
			
				|  |  | +            final ActionListener<AcknowledgedResponse> actionListener,
 | 
	
		
			
				|  |  | +            TaskId parentTask,
 | 
	
		
			
				|  |  | +            final String downsampleIndexName,
 | 
	
		
			
				|  |  | +            final TimeValue timeout
 | 
	
		
			
				|  |  | +        ) {
 | 
	
		
			
				|  |  | +            this.actionListener = actionListener;
 | 
	
		
			
				|  |  | +            this.parentTask = parentTask;
 | 
	
		
			
				|  |  | +            this.downsampleIndexName = downsampleIndexName;
 | 
	
		
			
				|  |  | +            this.timeout = timeout;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            @Override
 | 
	
		
			
				|  |  | -            public void onFailure(Exception deleteException) {
 | 
	
		
			
				|  |  | -                listener.onFailure(new ElasticsearchException("Unable to delete rollup index [" + rollupIndex + "]", e));
 | 
	
		
			
				|  |  | +        @Override
 | 
	
		
			
				|  |  | +        public void onResponse(final RefreshResponse response) {
 | 
	
		
			
				|  |  | +            if (response.getFailedShards() != 0) {
 | 
	
		
			
				|  |  | +                logger.info("Post refresh failed [{}],{}", downsampleIndexName, Strings.toString(response));
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | -        });
 | 
	
		
			
				|  |  | +            // Mark downsample index as "completed successfully" ("index.downsample.status": "success")
 | 
	
		
			
				|  |  | +            taskQueue.submitTask(
 | 
	
		
			
				|  |  | +                "update-downsample-metadata [" + downsampleIndexName + "]",
 | 
	
		
			
				|  |  | +                new DownsampleClusterStateUpdateTask(new ForceMergeActionListener(parentTask, downsampleIndexName, actionListener)) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    @Override
 | 
	
		
			
				|  |  | +                    public ClusterState execute(ClusterState currentState) {
 | 
	
		
			
				|  |  | +                        final Metadata metadata = currentState.metadata();
 | 
	
		
			
				|  |  | +                        final IndexMetadata downsampleIndex = metadata.index(metadata.index(downsampleIndexName).getIndex());
 | 
	
		
			
				|  |  | +                        if (IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings()) == DownsampleTaskStatus.SUCCESS) {
 | 
	
		
			
				|  |  | +                            return currentState;
 | 
	
		
			
				|  |  | +                        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                        final Metadata.Builder metadataBuilder = Metadata.builder(metadata);
 | 
	
		
			
				|  |  | +                        metadataBuilder.updateSettings(
 | 
	
		
			
				|  |  | +                            Settings.builder()
 | 
	
		
			
				|  |  | +                                .put(downsampleIndex.getSettings())
 | 
	
		
			
				|  |  | +                                .put(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey(), DownsampleTaskStatus.SUCCESS)
 | 
	
		
			
				|  |  | +                                .build(),
 | 
	
		
			
				|  |  | +                            downsampleIndexName
 | 
	
		
			
				|  |  | +                        );
 | 
	
		
			
				|  |  | +                        return ClusterState.builder(currentState).metadata(metadataBuilder.build()).build();
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                },
 | 
	
		
			
				|  |  | +                timeout
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        @Override
 | 
	
		
			
				|  |  | +        public void onFailure(Exception e) {
 | 
	
		
			
				|  |  | +            actionListener.onFailure(e);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      /**
 | 
	
		
			
				|  |  | -     * A specialized cluster state update task that always takes a listener handling an
 | 
	
		
			
				|  |  | -     * AcknowledgedResponse, as all template actions have simple acknowledged yes/no responses.
 | 
	
		
			
				|  |  | +     * Triggers a force merge operation on the downsample target index
 | 
	
		
			
				|  |  |       */
 | 
	
		
			
				|  |  | -    private abstract static class RollupClusterStateUpdateTask implements ClusterStateTaskListener {
 | 
	
		
			
				|  |  | -        final ActionListener<AcknowledgedResponse> listener;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        RollupClusterStateUpdateTask(ActionListener<AcknowledgedResponse> listener) {
 | 
	
		
			
				|  |  | -            this.listener = listener;
 | 
	
		
			
				|  |  | +    class ForceMergeActionListener implements ActionListener<AcknowledgedResponse> {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        final ActionListener<AcknowledgedResponse> actionListener;
 | 
	
		
			
				|  |  | +        private final TaskId parentTask;
 | 
	
		
			
				|  |  | +        private final String downsampleIndexName;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        ForceMergeActionListener(
 | 
	
		
			
				|  |  | +            final TaskId parentTask,
 | 
	
		
			
				|  |  | +            final String downsampleIndexName,
 | 
	
		
			
				|  |  | +            final ActionListener<AcknowledgedResponse> onFailure
 | 
	
		
			
				|  |  | +        ) {
 | 
	
		
			
				|  |  | +            this.parentTask = parentTask;
 | 
	
		
			
				|  |  | +            this.downsampleIndexName = downsampleIndexName;
 | 
	
		
			
				|  |  | +            this.actionListener = onFailure;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        public abstract ClusterState execute(ClusterState currentState) throws Exception;
 | 
	
		
			
				|  |  | +        @Override
 | 
	
		
			
				|  |  | +        public void onResponse(final AcknowledgedResponse response) {
 | 
	
		
			
				|  |  | +            /*
 | 
	
		
			
				|  |  | +             * At this point downsample index has been created
 | 
	
		
			
				|  |  | +             * successfully even force merge fails.
 | 
	
		
			
				|  |  | +             * So, we should not fail the downsmaple operation
 | 
	
		
			
				|  |  | +             */
 | 
	
		
			
				|  |  | +            ForceMergeRequest request = new ForceMergeRequest(downsampleIndexName);
 | 
	
		
			
				|  |  | +            request.maxNumSegments(1);
 | 
	
		
			
				|  |  | +            request.setParentTask(parentTask);
 | 
	
		
			
				|  |  | +            client.admin()
 | 
	
		
			
				|  |  | +                .indices()
 | 
	
		
			
				|  |  | +                .forceMerge(request, ActionListener.wrap(mergeIndexResp -> actionListener.onResponse(AcknowledgedResponse.TRUE), t -> {
 | 
	
		
			
				|  |  | +                    /*
 | 
	
		
			
				|  |  | +                     * At this point downsampel index has been created
 | 
	
		
			
				|  |  | +                     * successfully even force merge fails.
 | 
	
		
			
				|  |  | +                     * So, we should not fail the downsample operation
 | 
	
		
			
				|  |  | +                     */
 | 
	
		
			
				|  |  | +                    logger.error("Failed to force-merge " + "downsample index [" + downsampleIndexName + "]", t);
 | 
	
		
			
				|  |  | +                    actionListener.onResponse(AcknowledgedResponse.TRUE);
 | 
	
		
			
				|  |  | +                }));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          @Override
 | 
	
		
			
				|  |  |          public void onFailure(Exception e) {
 | 
	
		
			
				|  |  | -            listener.onFailure(e);
 | 
	
		
			
				|  |  | +            this.actionListener.onFailure(e);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  }
 |