|  | @@ -6,10 +6,14 @@
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  package org.elasticsearch.xpack.dataframe.action;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +import org.apache.log4j.LogManager;
 | 
	
		
			
				|  |  | +import org.apache.log4j.Logger;
 | 
	
		
			
				|  |  |  import org.elasticsearch.ElasticsearchException;
 | 
	
		
			
				|  |  |  import org.elasticsearch.ElasticsearchStatusException;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.ActionListener;
 | 
	
		
			
				|  |  | +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.support.ActionFilters;
 | 
	
		
			
				|  |  | +import org.elasticsearch.action.support.IndicesOptions;
 | 
	
		
			
				|  |  |  import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 | 
	
		
			
				|  |  |  import org.elasticsearch.client.Client;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.ClusterState;
 | 
	
	
		
			
				|  | @@ -35,32 +39,40 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
 | 
	
		
			
				|  |  |  import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
 | 
	
		
			
				|  |  |  import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
 | 
	
		
			
				|  |  |  import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
 | 
	
		
			
				|  |  | +import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
 | 
	
		
			
				|  |  |  import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
 | 
	
		
			
				|  |  | +import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex;
 | 
	
		
			
				|  |  | +import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import java.util.Collection;
 | 
	
		
			
				|  |  | +import java.util.Map;
 | 
	
		
			
				|  |  |  import java.util.function.Consumer;
 | 
	
		
			
				|  |  |  import java.util.function.Predicate;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  public class TransportStartDataFrameTransformAction extends
 | 
	
		
			
				|  |  |      TransportMasterNodeAction<StartDataFrameTransformAction.Request, StartDataFrameTransformAction.Response> {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    private static final Logger logger = LogManager.getLogger(TransportStartDataFrameTransformAction.class);
 | 
	
		
			
				|  |  |      private final XPackLicenseState licenseState;
 | 
	
		
			
				|  |  |      private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
 | 
	
		
			
				|  |  |      private final PersistentTasksService persistentTasksService;
 | 
	
		
			
				|  |  |      private final Client client;
 | 
	
		
			
				|  |  | +    private final DataFrameAuditor auditor;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      @Inject
 | 
	
		
			
				|  |  |      public TransportStartDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters,
 | 
	
		
			
				|  |  |                                                    ClusterService clusterService, XPackLicenseState licenseState,
 | 
	
		
			
				|  |  |                                                    ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver,
 | 
	
		
			
				|  |  |                                                    DataFrameTransformsConfigManager dataFrameTransformsConfigManager,
 | 
	
		
			
				|  |  | -                                                  PersistentTasksService persistentTasksService, Client client) {
 | 
	
		
			
				|  |  | +                                                  PersistentTasksService persistentTasksService, Client client,
 | 
	
		
			
				|  |  | +                                                  DataFrameAuditor auditor) {
 | 
	
		
			
				|  |  |          super(StartDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters,
 | 
	
		
			
				|  |  |                  StartDataFrameTransformAction.Request::new, indexNameExpressionResolver);
 | 
	
		
			
				|  |  |          this.licenseState = licenseState;
 | 
	
		
			
				|  |  |          this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
 | 
	
		
			
				|  |  |          this.persistentTasksService = persistentTasksService;
 | 
	
		
			
				|  |  |          this.client = client;
 | 
	
		
			
				|  |  | +        this.auditor = auditor;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      @Override
 | 
	
	
		
			
				|  | @@ -83,7 +95,7 @@ public class TransportStartDataFrameTransformAction extends
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |          final DataFrameTransform transformTask = createDataFrameTransform(request.getId(), threadPool);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        // <3> Set the allocated task's state to STARTED
 | 
	
		
			
				|  |  | +        // <4> Set the allocated task's state to STARTED
 | 
	
		
			
				|  |  |          ActionListener<PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform>> persistentTaskActionListener = ActionListener.wrap(
 | 
	
		
			
				|  |  |              task -> {
 | 
	
		
			
				|  |  |                  waitForDataFrameTaskAllocated(task.getId(),
 | 
	
	
		
			
				|  | @@ -102,16 +114,9 @@ public class TransportStartDataFrameTransformAction extends
 | 
	
		
			
				|  |  |              listener::onFailure
 | 
	
		
			
				|  |  |          );
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        // <2> Create the task in cluster state so that it will start executing on the node
 | 
	
		
			
				|  |  | -        ActionListener<DataFrameTransformConfig> getTransformListener = ActionListener.wrap(
 | 
	
		
			
				|  |  | -            config -> {
 | 
	
		
			
				|  |  | -                if (config.isValid() == false) {
 | 
	
		
			
				|  |  | -                    listener.onFailure(new ElasticsearchStatusException(
 | 
	
		
			
				|  |  | -                        DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_CONFIG_INVALID, request.getId()),
 | 
	
		
			
				|  |  | -                        RestStatus.BAD_REQUEST
 | 
	
		
			
				|  |  | -                    ));
 | 
	
		
			
				|  |  | -                    return;
 | 
	
		
			
				|  |  | -                }
 | 
	
		
			
				|  |  | +        // <3> Create the task in cluster state so that it will start executing on the node
 | 
	
		
			
				|  |  | +        ActionListener<Void> createOrGetIndexListener = ActionListener.wrap(
 | 
	
		
			
				|  |  | +            unused -> {
 | 
	
		
			
				|  |  |                  PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform> existingTask =
 | 
	
		
			
				|  |  |                      getExistingTask(transformTask.getId(), state);
 | 
	
		
			
				|  |  |                  if (existingTask == null) {
 | 
	
	
		
			
				|  | @@ -123,14 +128,14 @@ public class TransportStartDataFrameTransformAction extends
 | 
	
		
			
				|  |  |                      DataFrameTransformState transformState = (DataFrameTransformState)existingTask.getState();
 | 
	
		
			
				|  |  |                      if(transformState.getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) {
 | 
	
		
			
				|  |  |                          listener.onFailure(new ElasticsearchStatusException(
 | 
	
		
			
				|  |  | -                            "Unable to start data frame transform [" + config.getId() +
 | 
	
		
			
				|  |  | +                            "Unable to start data frame transform [" + request.getId() +
 | 
	
		
			
				|  |  |                                  "] as it is in a failed state with failure: [" + transformState.getReason() +
 | 
	
		
			
				|  |  |                              "]. Use force start to restart data frame transform once error is resolved.",
 | 
	
		
			
				|  |  |                              RestStatus.CONFLICT));
 | 
	
		
			
				|  |  |                      } else if (transformState.getTaskState() != DataFrameTransformTaskState.STOPPED &&
 | 
	
		
			
				|  |  |                                 transformState.getTaskState() != DataFrameTransformTaskState.FAILED) {
 | 
	
		
			
				|  |  |                          listener.onFailure(new ElasticsearchStatusException(
 | 
	
		
			
				|  |  | -                            "Unable to start data frame transform [" + config.getId() +
 | 
	
		
			
				|  |  | +                            "Unable to start data frame transform [" + request.getId() +
 | 
	
		
			
				|  |  |                                  "] as it is in state [" + transformState.getTaskState()  + "]", RestStatus.CONFLICT));
 | 
	
		
			
				|  |  |                      } else {
 | 
	
		
			
				|  |  |                          persistentTaskActionListener.onResponse(existingTask);
 | 
	
	
		
			
				|  | @@ -140,10 +145,80 @@ public class TransportStartDataFrameTransformAction extends
 | 
	
		
			
				|  |  |              listener::onFailure
 | 
	
		
			
				|  |  |          );
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +        // <2> If the destination index exists, start the task, otherwise deduce our mappings for the destination index and create it
 | 
	
		
			
				|  |  | +        ActionListener<DataFrameTransformConfig> getTransformListener = ActionListener.wrap(
 | 
	
		
			
				|  |  | +            config -> {
 | 
	
		
			
				|  |  | +                if (config.isValid() == false) {
 | 
	
		
			
				|  |  | +                    listener.onFailure(new ElasticsearchStatusException(
 | 
	
		
			
				|  |  | +                        DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_CONFIG_INVALID, request.getId()),
 | 
	
		
			
				|  |  | +                        RestStatus.BAD_REQUEST
 | 
	
		
			
				|  |  | +                    ));
 | 
	
		
			
				|  |  | +                    return;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +                final String destinationIndex = config.getDestination().getIndex();
 | 
	
		
			
				|  |  | +                String[] dest = indexNameExpressionResolver.concreteIndexNames(state,
 | 
	
		
			
				|  |  | +                    IndicesOptions.lenientExpandOpen(),
 | 
	
		
			
				|  |  | +                    destinationIndex);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                if(dest.length == 0) {
 | 
	
		
			
				|  |  | +                    auditor.info(request.getId(),
 | 
	
		
			
				|  |  | +                        "Could not find destination index [" +  destinationIndex + "]." +
 | 
	
		
			
				|  |  | +                            " Creating index with deduced mappings.");
 | 
	
		
			
				|  |  | +                    createDestinationIndex(config, createOrGetIndexListener);
 | 
	
		
			
				|  |  | +                } else {
 | 
	
		
			
				|  |  | +                    auditor.info(request.getId(), "Destination index [" + destinationIndex + "] already exists.");
 | 
	
		
			
				|  |  | +                    ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(),
 | 
	
		
			
				|  |  | +                        ClientHelper.DATA_FRAME_ORIGIN,
 | 
	
		
			
				|  |  | +                        client.admin()
 | 
	
		
			
				|  |  | +                            .indices()
 | 
	
		
			
				|  |  | +                            .prepareStats(dest)
 | 
	
		
			
				|  |  | +                            .clear()
 | 
	
		
			
				|  |  | +                            .setDocs(true)
 | 
	
		
			
				|  |  | +                            .request(),
 | 
	
		
			
				|  |  | +                        ActionListener.<IndicesStatsResponse>wrap(
 | 
	
		
			
				|  |  | +                            r -> {
 | 
	
		
			
				|  |  | +                                long docTotal = r.getTotal().docs.getCount();
 | 
	
		
			
				|  |  | +                                if (docTotal > 0L) {
 | 
	
		
			
				|  |  | +                                    auditor.warning(request.getId(), "Non-empty destination index [" + destinationIndex + "]. " +
 | 
	
		
			
				|  |  | +                                        "Contains [" + docTotal + "] total documents.");
 | 
	
		
			
				|  |  | +                                }
 | 
	
		
			
				|  |  | +                                createOrGetIndexListener.onResponse(null);
 | 
	
		
			
				|  |  | +                            },
 | 
	
		
			
				|  |  | +                            e -> {
 | 
	
		
			
				|  |  | +                                String msg = "Unable to determine destination index stats, error: " + e.getMessage();
 | 
	
		
			
				|  |  | +                                logger.error(msg, e);
 | 
	
		
			
				|  |  | +                                auditor.warning(request.getId(), msg);
 | 
	
		
			
				|  |  | +                                createOrGetIndexListener.onResponse(null);
 | 
	
		
			
				|  |  | +                            }),
 | 
	
		
			
				|  |  | +                        client.admin().indices()::stats);
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            },
 | 
	
		
			
				|  |  | +            listener::onFailure
 | 
	
		
			
				|  |  | +        );
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |          // <1> Get the config to verify it exists and is valid
 | 
	
		
			
				|  |  |          dataFrameTransformsConfigManager.getTransformConfiguration(request.getId(), getTransformListener);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    private void createDestinationIndex(final DataFrameTransformConfig config, final ActionListener<Void> listener) {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        final Pivot pivot = new Pivot(config.getSource().getIndex(),
 | 
	
		
			
				|  |  | +            config.getSource().getQueryConfig().getQuery(),
 | 
	
		
			
				|  |  | +            config.getPivotConfig());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        ActionListener<Map<String, String>> deduceMappingsListener = ActionListener.wrap(
 | 
	
		
			
				|  |  | +            mappings -> DataframeIndex.createDestinationIndex(client,
 | 
	
		
			
				|  |  | +                config,
 | 
	
		
			
				|  |  | +                mappings,
 | 
	
		
			
				|  |  | +                ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure)),
 | 
	
		
			
				|  |  | +            deduceTargetMappingsException -> listener.onFailure(
 | 
	
		
			
				|  |  | +                new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_DEST_MAPPINGS,
 | 
	
		
			
				|  |  | +                    deduceTargetMappingsException))
 | 
	
		
			
				|  |  | +        );
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        pivot.deduceMappings(client, deduceMappingsListener);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      @Override
 | 
	
		
			
				|  |  |      protected ClusterBlockException checkBlock(StartDataFrameTransformAction.Request request, ClusterState state) {
 | 
	
		
			
				|  |  |          return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
 |