|
@@ -23,6 +23,17 @@ import java.util.HashSet;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
|
|
|
+/**
|
|
|
+ * Calculates the estimated model memory requirement of an anomaly detection job
|
|
|
+ * from its analysis config and estimates of the cardinality of the various fields
|
|
|
+ * referenced in it.
|
|
|
+ *
|
|
|
+ * Answers are capped at <code>Long.MAX_VALUE</code> bytes, to avoid returning
|
|
|
+ * values with bigger units that cannot trivially be converted back to bytes.
|
|
|
+ * (In reality if the memory estimate is greater than <code>Long.MAX_VALUE</code>
|
|
|
+ * bytes then the job will be impossible to run successfully, so this is not a
|
|
|
+ * major limitation.)
|
|
|
+ */
|
|
|
public class TransportEstimateModelMemoryAction
|
|
|
extends HandledTransportAction<EstimateModelMemoryAction.Request, EstimateModelMemoryAction.Response> {
|
|
|
|
|
@@ -47,23 +58,24 @@ public class TransportEstimateModelMemoryAction
|
|
|
Map<String, Long> overallCardinality = request.getOverallCardinality();
|
|
|
Map<String, Long> maxBucketCardinality = request.getMaxBucketCardinality();
|
|
|
|
|
|
- long answer = BASIC_REQUIREMENT.getBytes()
|
|
|
- + calculateDetectorsRequirementBytes(analysisConfig, overallCardinality)
|
|
|
- + calculateInfluencerRequirementBytes(analysisConfig, maxBucketCardinality)
|
|
|
- + calculateCategorizationRequirementBytes(analysisConfig);
|
|
|
+ long answer = BASIC_REQUIREMENT.getBytes();
|
|
|
+ answer = addNonNegativeLongsWithMaxValueCap(answer, calculateDetectorsRequirementBytes(analysisConfig, overallCardinality));
|
|
|
+ answer = addNonNegativeLongsWithMaxValueCap(answer, calculateInfluencerRequirementBytes(analysisConfig, maxBucketCardinality));
|
|
|
+ answer = addNonNegativeLongsWithMaxValueCap(answer, calculateCategorizationRequirementBytes(analysisConfig));
|
|
|
|
|
|
listener.onResponse(new EstimateModelMemoryAction.Response(roundUpToNextMb(answer)));
|
|
|
}
|
|
|
|
|
|
static long calculateDetectorsRequirementBytes(AnalysisConfig analysisConfig, Map<String, Long> overallCardinality) {
|
|
|
return analysisConfig.getDetectors().stream().map(detector -> calculateDetectorRequirementBytes(detector, overallCardinality))
|
|
|
- .reduce(0L, Long::sum);
|
|
|
+ .reduce(0L, TransportEstimateModelMemoryAction::addNonNegativeLongsWithMaxValueCap);
|
|
|
}
|
|
|
|
|
|
static long calculateDetectorRequirementBytes(Detector detector, Map<String, Long> overallCardinality) {
|
|
|
|
|
|
long answer = 0;
|
|
|
|
|
|
+ // These values for detectors assume splitting is via a partition field
|
|
|
switch (detector.getFunction()) {
|
|
|
case COUNT:
|
|
|
case LOW_COUNT:
|
|
@@ -71,7 +83,7 @@ public class TransportEstimateModelMemoryAction
|
|
|
case NON_ZERO_COUNT:
|
|
|
case LOW_NON_ZERO_COUNT:
|
|
|
case HIGH_NON_ZERO_COUNT:
|
|
|
- answer = 1; // TODO add realistic number
|
|
|
+ answer = new ByteSizeValue(32, ByteSizeUnit.KB).getBytes();
|
|
|
break;
|
|
|
case DISTINCT_COUNT:
|
|
|
case LOW_DISTINCT_COUNT:
|
|
@@ -88,7 +100,8 @@ public class TransportEstimateModelMemoryAction
|
|
|
answer = 1; // TODO add realistic number
|
|
|
break;
|
|
|
case METRIC:
|
|
|
- answer = 1; // TODO add realistic number
|
|
|
+ // metric analyses mean, min and max simultaneously, and uses about 2.5 times the memory of one of these
|
|
|
+ answer = new ByteSizeValue(160, ByteSizeUnit.KB).getBytes();
|
|
|
break;
|
|
|
case MEAN:
|
|
|
case LOW_MEAN:
|
|
@@ -104,18 +117,14 @@ public class TransportEstimateModelMemoryAction
|
|
|
case NON_NULL_SUM:
|
|
|
case LOW_NON_NULL_SUM:
|
|
|
case HIGH_NON_NULL_SUM:
|
|
|
- // 64 comes from https://github.com/elastic/kibana/issues/18722
|
|
|
- answer = new ByteSizeValue(64, ByteSizeUnit.KB).getBytes();
|
|
|
- break;
|
|
|
case MEDIAN:
|
|
|
case LOW_MEDIAN:
|
|
|
case HIGH_MEDIAN:
|
|
|
- answer = 1; // TODO add realistic number
|
|
|
- break;
|
|
|
case VARP:
|
|
|
case LOW_VARP:
|
|
|
case HIGH_VARP:
|
|
|
- answer = 1; // TODO add realistic number
|
|
|
+ // 64 comes from https://github.com/elastic/kibana/issues/18722
|
|
|
+ answer = new ByteSizeValue(64, ByteSizeUnit.KB).getBytes();
|
|
|
break;
|
|
|
case TIME_OF_DAY:
|
|
|
case TIME_OF_WEEK:
|
|
@@ -130,19 +139,26 @@ public class TransportEstimateModelMemoryAction
|
|
|
|
|
|
String byFieldName = detector.getByFieldName();
|
|
|
if (byFieldName != null) {
|
|
|
- answer *= cardinalityEstimate(Detector.BY_FIELD_NAME_FIELD.getPreferredName(), byFieldName, overallCardinality, true);
|
|
|
+ long cardinalityEstimate =
|
|
|
+ cardinalityEstimate(Detector.BY_FIELD_NAME_FIELD.getPreferredName(), byFieldName, overallCardinality, true);
|
|
|
+ // The memory cost of a by field is about 2/3rds that of a partition field
|
|
|
+ long multiplier = addNonNegativeLongsWithMaxValueCap(cardinalityEstimate, 2) / 3 * 2;
|
|
|
+ answer = multiplyNonNegativeLongsWithMaxValueCap(answer, multiplier);
|
|
|
}
|
|
|
|
|
|
String overFieldName = detector.getOverFieldName();
|
|
|
if (overFieldName != null) {
|
|
|
- cardinalityEstimate(Detector.OVER_FIELD_NAME_FIELD.getPreferredName(), overFieldName, overallCardinality, true);
|
|
|
- // TODO - how should "over" field cardinality affect estimate?
|
|
|
+ long cardinalityEstimate =
|
|
|
+ cardinalityEstimate(Detector.OVER_FIELD_NAME_FIELD.getPreferredName(), overFieldName, overallCardinality, true);
|
|
|
+ // Over fields don't multiply the whole estimate, just add a small amount (estimate 512 bytes) per value
|
|
|
+ answer = addNonNegativeLongsWithMaxValueCap(answer, multiplyNonNegativeLongsWithMaxValueCap(cardinalityEstimate, 512));
|
|
|
}
|
|
|
|
|
|
String partitionFieldName = detector.getPartitionFieldName();
|
|
|
if (partitionFieldName != null) {
|
|
|
- answer *=
|
|
|
+ long multiplier =
|
|
|
cardinalityEstimate(Detector.PARTITION_FIELD_NAME_FIELD.getPreferredName(), partitionFieldName, overallCardinality, true);
|
|
|
+ answer = multiplyNonNegativeLongsWithMaxValueCap(answer, multiplier);
|
|
|
}
|
|
|
|
|
|
return answer;
|
|
@@ -156,10 +172,10 @@ public class TransportEstimateModelMemoryAction
|
|
|
pureInfluencers.removeAll(detector.extractAnalysisFields());
|
|
|
}
|
|
|
|
|
|
- return pureInfluencers.stream()
|
|
|
- .map(influencer -> cardinalityEstimate(AnalysisConfig.INFLUENCERS.getPreferredName(), influencer, maxBucketCardinality, false)
|
|
|
- * BYTES_PER_INFLUENCER_VALUE)
|
|
|
- .reduce(0L, Long::sum);
|
|
|
+ long totalInfluencerCardinality = pureInfluencers.stream()
|
|
|
+ .map(influencer -> cardinalityEstimate(AnalysisConfig.INFLUENCERS.getPreferredName(), influencer, maxBucketCardinality, false))
|
|
|
+ .reduce(0L, TransportEstimateModelMemoryAction::addNonNegativeLongsWithMaxValueCap);
|
|
|
+ return multiplyNonNegativeLongsWithMaxValueCap(BYTES_PER_INFLUENCER_VALUE, totalInfluencerCardinality);
|
|
|
}
|
|
|
|
|
|
static long calculateCategorizationRequirementBytes(AnalysisConfig analysisConfig) {
|
|
@@ -187,7 +203,25 @@ public class TransportEstimateModelMemoryAction
|
|
|
}
|
|
|
|
|
|
static ByteSizeValue roundUpToNextMb(long bytes) {
|
|
|
- assert bytes >= 0;
|
|
|
- return new ByteSizeValue((BYTES_IN_MB - 1 + bytes) / BYTES_IN_MB, ByteSizeUnit.MB);
|
|
|
+ assert bytes >= 0 : "negative bytes " + bytes;
|
|
|
+ return new ByteSizeValue(addNonNegativeLongsWithMaxValueCap(bytes, BYTES_IN_MB - 1) / BYTES_IN_MB, ByteSizeUnit.MB);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static long addNonNegativeLongsWithMaxValueCap(long a, long b) {
|
|
|
+ assert a >= 0;
|
|
|
+ assert b >= 0;
|
|
|
+ if (Long.MAX_VALUE - a - b < 0) {
|
|
|
+ return Long.MAX_VALUE;
|
|
|
+ }
|
|
|
+ return a + b;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static long multiplyNonNegativeLongsWithMaxValueCap(long a, long b) {
|
|
|
+ assert a >= 0;
|
|
|
+ assert b >= 0;
|
|
|
+ if (Long.MAX_VALUE / a < b) {
|
|
|
+ return Long.MAX_VALUE;
|
|
|
+ }
|
|
|
+ return a * b;
|
|
|
}
|
|
|
}
|