|
@@ -34,6 +34,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.Nullable;
|
|
|
+import org.elasticsearch.common.Strings;
|
|
|
import org.elasticsearch.common.inject.Inject;
|
|
|
import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
@@ -226,10 +227,41 @@ public class TransportStartDataFrameAnalyticsAction
|
|
|
}
|
|
|
|
|
|
private void getConfigAndValidate(String id, ActionListener<DataFrameAnalyticsConfig> finalListener) {
|
|
|
+
|
|
|
+ // Step 5. Validate that there are analyzable data in the source index
|
|
|
+ ActionListener<DataFrameAnalyticsConfig> validateMappingsMergeListener = ActionListener.wrap(
|
|
|
+ config -> DataFrameDataExtractorFactory.createForSourceIndices(client,
|
|
|
+ "validate_source_index_has_rows-" + id,
|
|
|
+ config,
|
|
|
+ ActionListener.wrap(
|
|
|
+ dataFrameDataExtractorFactory ->
|
|
|
+ dataFrameDataExtractorFactory
|
|
|
+ .newExtractor(false)
|
|
|
+ .collectDataSummaryAsync(ActionListener.wrap(
|
|
|
+ dataSummary -> {
|
|
|
+ if (dataSummary.rows == 0) {
|
|
|
+ finalListener.onFailure(new ElasticsearchStatusException(
|
|
|
+ "Unable to start {} as there are no analyzable data in source indices [{}].",
|
|
|
+ RestStatus.BAD_REQUEST,
|
|
|
+ id,
|
|
|
+ Strings.arrayToCommaDelimitedString(config.getSource().getIndex())
|
|
|
+ ));
|
|
|
+ } else {
|
|
|
+ finalListener.onResponse(config);
|
|
|
+ }
|
|
|
+ },
|
|
|
+ finalListener::onFailure
|
|
|
+ )),
|
|
|
+ finalListener::onFailure
|
|
|
+ ))
|
|
|
+ ,
|
|
|
+ finalListener::onFailure
|
|
|
+ );
|
|
|
+
|
|
|
// Step 4. Validate mappings can be merged
|
|
|
ActionListener<DataFrameAnalyticsConfig> toValidateMappingsListener = ActionListener.wrap(
|
|
|
config -> MappingsMerger.mergeMappings(client, config.getHeaders(), config.getSource().getIndex(), ActionListener.wrap(
|
|
|
- mappings -> finalListener.onResponse(config), finalListener::onFailure)),
|
|
|
+ mappings -> validateMappingsMergeListener.onResponse(config), finalListener::onFailure)),
|
|
|
finalListener::onFailure
|
|
|
);
|
|
|
|