Quellcode durchsuchen

[ML] Do not start data frame analytics when too many docs are analyzed (#62547)

The data frame structure in c++ has a limit on 2^32 documents. This commit
adds a check that the number of documents involved in the analysis are
less than that and fails to start otherwise. That saves the cost of
reindexing when it is unnecessary.
Dimitris Athanasiou vor 5 Jahren
Ursprung
Commit
d1e963e426

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Classification.java

@@ -231,6 +231,7 @@ public class Classification implements DataFrameAnalysis {
         return numTopClasses;
     }
 
+    @Override
     public double getTrainingPercent() {
         return trainingPercent;
     }

+ 6 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/DataFrameAnalysis.java

@@ -83,6 +83,12 @@ public interface DataFrameAnalysis extends ToXContentObject, NamedWriteable {
      */
     boolean supportsInference();
 
+    /**
+     * @return the percentage of data to use for training
+     */
+    default double getTrainingPercent() {
+        return 100.0;
+    }
     /**
      * Summarizes information about the fields that is necessary for analysis to generate
      * the parameters needed for the process configuration.

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Regression.java

@@ -178,6 +178,7 @@ public class Regression implements DataFrameAnalysis {
         return predictionFieldName;
     }
 
+    @Override
     public double getTrainingPercent() {
         return trainingPercent;
     }

+ 5 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

@@ -330,7 +330,7 @@ public class TransportStartDataFrameAnalyticsAction
 
     private void validateSourceIndexHasAnalyzableData(StartContext startContext, ActionListener<StartContext> listener) {
         ActionListener<Void> validateAtLeastOneAnalyzedFieldListener = ActionListener.wrap(
-            aVoid -> validateSourceIndexHasRows(startContext, listener),
+            aVoid -> validateSourceIndexRowsCount(startContext, listener),
             listener::onFailure
         );
 
@@ -359,7 +359,7 @@ public class TransportStartDataFrameAnalyticsAction
         }
     }
 
-    private void validateSourceIndexHasRows(StartContext startContext, ActionListener<StartContext> listener) {
+    private void validateSourceIndexRowsCount(StartContext startContext, ActionListener<StartContext> listener) {
         DataFrameDataExtractorFactory extractorFactory = DataFrameDataExtractorFactory.createForSourceIndices(client,
             "validate_source_index_has_rows-" + startContext.config.getId(),
             startContext.config,
@@ -377,6 +377,9 @@ public class TransportStartDataFrameAnalyticsAction
                             startContext.config.getId(),
                             Strings.arrayToCommaDelimitedString(startContext.config.getSource().getIndex())
                         ));
+                    } else if (Math.floor(startContext.config.getAnalysis().getTrainingPercent() * dataSummary.rows)  >= Math.pow(2, 32)) {
+                        listener.onFailure(ExceptionsHelper.badRequestException("Unable to start because too many documents " +
+                            "(more than 2^32) are included in the analysis. Consider downsampling."));
                     } else {
                         listener.onResponse(startContext);
                     }