浏览代码

[ML] fixes snapshot upgrader for categorization jobs (#67037)

Categorization jobs require an extra field in the header for the tokenized data.

This commit fixes the header writing for the anomaly job model snapshot upgrader native process.
Benjamin Trent 4 年之前
父节点
当前提交
f6760a1c9e

+ 7 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/JobModelSnapshotUpgrader.java

@@ -189,7 +189,13 @@ public final class JobModelSnapshotUpgrader {
                     fieldIndexes.put(field, index++);
                 }
             }
-            fieldIndexes.put(LengthEncodedWriter.CONTROL_FIELD_NAME, index);
+            // field for categorization tokens
+            if (MachineLearning.CATEGORIZATION_TOKENIZATION_IN_JAVA && job.getAnalysisConfig().getCategorizationFieldName() != null) {
+                fieldIndexes.put(LengthEncodedWriter.PRETOKENISED_TOKEN_FIELD, index++);
+            }
+
+            // control field
+            fieldIndexes.put(LengthEncodedWriter.CONTROL_FIELD_NAME, index++);
             return fieldIndexes;
         }
 

+ 11 - 1
x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlJobSnapshotUpgradeIT.java

@@ -229,8 +229,17 @@ public class MlJobSnapshotUpgradeIT extends AbstractUpgradeTestCase {
     private PutJobResponse buildAndPutJob(String jobId, TimeValue bucketSpan) throws Exception {
         Detector.Builder detector = new Detector.Builder("mean", "value");
         detector.setPartitionFieldName("series");
-        AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build()));
+        List<Detector> detectors = new ArrayList<>();
+        detectors.add(detector.build());
+        boolean isCategorization = randomBoolean();
+        if (isCategorization) {
+            detectors.add(new Detector.Builder("count", null).setByFieldName("mlcategory").build());
+        }
+        AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(detectors);
         analysisConfig.setBucketSpan(bucketSpan);
+        if (randomBoolean()) {
+            analysisConfig.setCategorizationFieldName("text");
+        }
         Job.Builder job = new Job.Builder(jobId);
         job.setAnalysisConfig(analysisConfig);
         DataDescription.Builder dataDescription = new DataDescription.Builder();
@@ -247,6 +256,7 @@ public class MlJobSnapshotUpgradeIT extends AbstractUpgradeTestCase {
                 Map<String, Object> record = new HashMap<>();
                 record.put("time", now);
                 record.put("value", timeAndSeriesToValueFunction.apply(i, field));
+                record.put("text", randomFrom("foo has landed 3", "bar has landed 5", "bar has finished 2", "foo has finished 10"));
                 record.put("series", field);
                 data.add(createJsonRecord(record));