瀏覽代碼

[ML] Addressing bug streaming DatafeedConfig aggs from (<= 6.5.4) -> 6.7.0 (#40610) (#40660)

* Addressing stream failure and adding tests to catch such in the future

* Add aggs to full cluster restart tests

* Test BWC for datafeeds with and without aggs

The wire serialisation is different for null/non-null
aggs, so it's worth testing both cases.
Benjamin Trent 6 年之前
父節點
當前提交
b6ca8b73a9

+ 4 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/AggProvider.java

@@ -73,7 +73,8 @@ class AggProvider implements Writeable, ToXContentObject {
         } else if (in.getVersion().onOrAfter(Version.V_6_6_0)) { // Has the bug, but supports lazy objects
             return new AggProvider(in.readMap(), null, null);
         } else { // only supports eagerly parsed objects
-            return AggProvider.fromParsedAggs(in.readOptionalWriteable(AggregatorFactories.Builder::new));
+            // Upstream, we have read the bool already and know for sure that we have parsed aggs in the stream
+            return AggProvider.fromParsedAggs(new AggregatorFactories.Builder(in));
         }
     }
 
@@ -111,7 +112,8 @@ class AggProvider implements Writeable, ToXContentObject {
                 // actually are aggregations defined
                 throw new ElasticsearchException("Unsupported operation: parsed aggregations are null");
             }
-            out.writeOptionalWriteable(parsedAggs);
+            // Upstream we already verified that this calling object is not null, no need to write a second boolean to the stream
+            parsedAggs.writeTo(out);
         }
     }
 

+ 2 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java

@@ -212,6 +212,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
         }
         // each of these writables are version aware
         this.queryProvider = QueryProvider.fromStream(in);
+        // This reads a boolean from the stream, if true, it sends the stream to the `fromStream` method
         this.aggProvider = in.readOptionalWriteable(AggProvider::fromStream);
 
         if (in.readBoolean()) {
@@ -420,6 +421,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
 
         // Each of these writables are version aware
         queryProvider.writeTo(out); // never null
+        // This writes a boolean to the stream, if true, it sends the stream to the `writeTo` method
         out.writeOptionalWriteable(aggProvider);
 
         if (scriptFields != null) {

+ 12 - 0
x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java

@@ -13,6 +13,10 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
+import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
 import org.elasticsearch.upgrades.AbstractFullClusterRestartTestCase;
 import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
@@ -112,6 +116,7 @@ public class MlMigrationFullClusterRestartIT extends AbstractFullClusterRestartT
 
         DatafeedConfig.Builder dfBuilder = new DatafeedConfig.Builder(OLD_CLUSTER_STARTED_DATAFEED_ID, OLD_CLUSTER_OPEN_JOB_ID);
         dfBuilder.setIndices(Collections.singletonList("airline-data"));
+        addAggregations(dfBuilder);
 
         Request putDatafeed = new Request("PUT", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID);
         putDatafeed.setJsonEntity(Strings.toString(dfBuilder.build()));
@@ -245,4 +250,11 @@ public class MlMigrationFullClusterRestartIT extends AbstractFullClusterRestartT
                 .filter(id -> id.equals(jobId)).findFirst();
         assertFalse(config.isPresent());
     }
+
+    private void addAggregations(DatafeedConfig.Builder dfBuilder) {
+        TermsAggregationBuilder airline = AggregationBuilders.terms("airline");
+        MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time").subAggregation(airline);
+        dfBuilder.setParsedAggregations(AggregatorFactories.builder().addAggregator(
+                AggregationBuilders.histogram("time").interval(300000).subAggregation(maxTime).field("time")));
+    }
 }

+ 2 - 0
x-pack/qa/rolling-upgrade/build.gradle

@@ -232,6 +232,8 @@ for (Version version : bwcVersions.wireCompatible) {
                 'mixed_cluster/10_basic/Start scroll in mixed cluster on upgraded node that we will continue after upgrade',
                 'mixed_cluster/30_ml_jobs_crud/Create a job in the mixed cluster and write some data',
                 'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed in mixed cluster',
+                'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed without aggs in mixed cluster',
+                'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed with aggs in mixed cluster'
         ].join(',')
         finalizedBy "${baseName}#oldClusterTestCluster#node1.stop"
     }

+ 103 - 10
x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/40_ml_datafeed_crud.yml

@@ -1,28 +1,94 @@
 ---
-"Test old cluster datafeed":
+"Test old cluster datafeed without aggs":
   - do:
       ml.get_datafeeds:
-        datafeed_id: old-cluster-datafeed
-  - match: { datafeeds.0.datafeed_id: "old-cluster-datafeed"}
+        datafeed_id: old-cluster-datafeed-without-aggs
+  - match: { datafeeds.0.datafeed_id: "old-cluster-datafeed-without-aggs"}
   - length: { datafeeds.0.indices: 1 }
   - gte: { datafeeds.0.scroll_size: 2000 }
+  - match: { datafeeds.0.script_fields.double_responsetime.script.lang: painless }
+  - is_false: datafeeds.0.aggregations
 
   - do:
       ml.get_datafeed_stats:
-        datafeed_id: old-cluster-datafeed
+        datafeed_id: old-cluster-datafeed-without-aggs
   - match: { datafeeds.0.state: "stopped"}
   - is_false: datafeeds.0.node
 
 ---
-"Put job and datafeed in mixed cluster":
+"Test old cluster datafeed with aggs":
+  - do:
+      ml.get_datafeeds:
+        datafeed_id: old-cluster-datafeed-with-aggs
+  - match: { datafeeds.0.datafeed_id: "old-cluster-datafeed-with-aggs"}
+  - length: { datafeeds.0.indices: 1 }
+  - gte: { datafeeds.0.scroll_size: 2000 }
+  - is_false: datafeeds.0.script_fields
+  - match: { datafeeds.0.aggregations.buckets.date_histogram.field: time }
+  - match: { datafeeds.0.aggregations.buckets.aggregations.time.max.field: time }
+
+  - do:
+      ml.get_datafeed_stats:
+        datafeed_id: old-cluster-datafeed-with-aggs
+  - match: { datafeeds.0.state: "stopped"}
+  - is_false: datafeeds.0.node
+
+---
+"Put job and datafeed without aggs in mixed cluster":
+  - do:
+      ml.put_job:
+        job_id: mixed-cluster-datafeed-job-without-aggs
+        body:  >
+          {
+            "description":"Cluster upgrade",
+            "analysis_config" : {
+                "bucket_span": "60s",
+                "detectors" :[{"function":"count"}]
+            },
+            "analysis_limits" : {
+                "model_memory_limit": "50mb"
+            },
+            "data_description" : {
+                "format":"xcontent",
+                "time_field":"time"
+            }
+          }
+  - do:
+      ml.put_datafeed:
+        datafeed_id: mixed-cluster-datafeed-without-aggs
+        body:  >
+          {
+            "job_id":"mixed-cluster-datafeed-job-without-aggs",
+            "indices":["airline-data"],
+            "scroll_size": 2000,
+            "script_fields": {
+              "double_responsetime": {
+                "script": {
+                  "lang": "painless",
+                  "source": "doc['responsetime'].value * 2"
+                }
+              }
+            }
+          }
+
+  - do:
+      ml.get_datafeed_stats:
+        datafeed_id: mixed-cluster-datafeed-without-aggs
+  - match: { datafeeds.0.state: stopped}
+  - is_false: datafeeds.0.node
+
+---
+"Put job and datafeed with aggs in mixed cluster":
+
   - do:
       ml.put_job:
-        job_id: mixed-cluster-datafeed-job
+        job_id: mixed-cluster-datafeed-job-with-aggs
         body:  >
           {
             "description":"Cluster upgrade",
             "analysis_config" : {
                 "bucket_span": "60s",
+                "summary_count_field_name": "doc_count",
                 "detectors" :[{"function":"count"}]
             },
             "analysis_limits" : {
@@ -36,16 +102,43 @@
 
   - do:
       ml.put_datafeed:
-        datafeed_id: mixed-cluster-datafeed
+        datafeed_id: mixed-cluster-datafeed-with-aggs
         body:  >
           {
-            "job_id":"mixed-cluster-datafeed-job",
+            "job_id":"mixed-cluster-datafeed-job-with-aggs",
             "indices":["airline-data"],
-            "scroll_size": 2000
+            "scroll_size": 2000,
+            "aggregations": {
+              "buckets": {
+                "date_histogram": {
+                  "field": "time",
+                  "interval": "30s",
+                  "time_zone": "UTC"
+                },
+                "aggregations": {
+                  "time": {
+                    "max": {"field": "time"}
+                  },
+                  "airline": {
+                    "terms": {
+                      "field": "airline",
+                      "size": 100
+                    },
+                    "aggregations": {
+                      "responsetime": {
+                        "avg": {
+                          "field": "responsetime"
+                        }
+                      }
+                    }
+                  }
+                }
+              }
+            }
           }
 
   - do:
       ml.get_datafeed_stats:
-        datafeed_id: mixed-cluster-datafeed
+        datafeed_id: mixed-cluster-datafeed-with-aggs
   - match: { datafeeds.0.state: stopped}
   - is_false: datafeeds.0.node

+ 83 - 7
x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/40_ml_datafeed_crud.yml

@@ -1,8 +1,9 @@
 ---
-"Put job and datafeed in old cluster":
+"Put job and datafeed without aggs in old cluster":
+
   - do:
       ml.put_job:
-        job_id: old-cluster-datafeed-job
+        job_id: old-cluster-datafeed-job-without-aggs
         body:  >
           {
             "description":"Cluster upgrade",
@@ -18,20 +19,95 @@
                 "time_field":"time"
             }
           }
-  - match: { job_id: old-cluster-datafeed-job }
+  - match: { job_id: old-cluster-datafeed-job-without-aggs }
 
   - do:
       ml.put_datafeed:
-        datafeed_id: old-cluster-datafeed
+        datafeed_id: old-cluster-datafeed-without-aggs
         body:  >
           {
-            "job_id":"old-cluster-datafeed-job",
+            "job_id":"old-cluster-datafeed-job-without-aggs",
             "indices":["airline-data"],
-            "scroll_size": 2000
+            "scroll_size": 2000,
+            "script_fields": {
+              "double_responsetime": {
+                "script": {
+                  "lang": "painless",
+                  "source": "doc['responsetime'].value * 2"
+                }
+              }
+            }
+          }
+
+  - do:
+      ml.get_datafeed_stats:
+        datafeed_id: old-cluster-datafeed-without-aggs
+  - match: { datafeeds.0.state: stopped}
+  - is_false: datafeeds.0.node
+
+---
+"Put job and datafeed with aggs in old cluster":
+
+  - do:
+      ml.put_job:
+        job_id: old-cluster-datafeed-job-with-aggs
+        body:  >
+          {
+            "description":"Cluster upgrade",
+            "analysis_config" : {
+                "bucket_span": "60s",
+                "summary_count_field_name": "doc_count",
+                "detectors" :[{"function":"count"}]
+            },
+            "analysis_limits" : {
+                "model_memory_limit": "50mb"
+            },
+            "data_description" : {
+                "format":"xcontent",
+                "time_field":"time"
+            }
+          }
+  - match: { job_id: old-cluster-datafeed-job-with-aggs }
+
+  - do:
+      ml.put_datafeed:
+        datafeed_id: old-cluster-datafeed-with-aggs
+        body:  >
+          {
+            "job_id":"old-cluster-datafeed-job-with-aggs",
+            "indices":["airline-data"],
+            "scroll_size": 2000,
+            "aggregations": {
+              "buckets": {
+                "date_histogram": {
+                  "field": "time",
+                  "interval": "30s",
+                  "time_zone": "UTC"
+                },
+                "aggregations": {
+                  "time": {
+                    "max": {"field": "time"}
+                  },
+                  "airline": {
+                    "terms": {
+                      "field": "airline",
+                      "size": 100
+                    },
+                    "aggregations": {
+                      "responsetime": {
+                        "avg": {
+                          "field": "responsetime"
+                        }
+                      }
+                    }
+                  }
+                }
+              }
+            }
           }
 
   - do:
       ml.get_datafeed_stats:
-        datafeed_id: old-cluster-datafeed
+        datafeed_id: old-cluster-datafeed-with-aggs
   - match: { datafeeds.0.state: stopped}
   - is_false: datafeeds.0.node

+ 44 - 8
x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/40_ml_datafeed_crud.yml

@@ -5,7 +5,6 @@ setup:
         wait_for_nodes: 3
         # wait for long enough that we give delayed unassigned shards to stop being delayed
         timeout: 70s
-
   - do:
       indices.create:
         index: airline-data
@@ -14,32 +13,69 @@ setup:
             properties:
               time:
                 type: date
+---
+"Test old and mixed cluster datafeeds without aggs":
+  - do:
+      ml.get_datafeeds:
+        datafeed_id: old-cluster-datafeed-without-aggs
+  - match: { datafeeds.0.datafeed_id: "old-cluster-datafeed-without-aggs"}
+  - length: { datafeeds.0.indices: 1 }
+  - gte: { datafeeds.0.scroll_size: 2000 }
+  - match: { datafeeds.0.script_fields.double_responsetime.script.lang: painless }
+  - is_false: datafeeds.0.aggregations
+
+  - do:
+      ml.get_datafeed_stats:
+        datafeed_id: old-cluster-datafeed-without-aggs
+  - match: { datafeeds.0.state: "stopped"}
+  - is_false: datafeeds.0.node
+
+  - do:
+      ml.get_datafeeds:
+        datafeed_id: mixed-cluster-datafeed-without-aggs
+  - match: { datafeeds.0.datafeed_id: "mixed-cluster-datafeed-without-aggs"}
+  - length: { datafeeds.0.indices: 1 }
+  - gte: { datafeeds.0.scroll_size: 2000 }
+  - match: { datafeeds.0.script_fields.double_responsetime.script.lang: painless }
+  - is_false: datafeeds.0.aggregations
+
+  - do:
+      ml.get_datafeed_stats:
+        datafeed_id: mixed-cluster-datafeed-without-aggs
+  - match: { datafeeds.0.state: "stopped"}
+  - is_false: datafeeds.0.node
 
 ---
-"Test old and mixed cluster datafeeds":
+"Test old and mixed cluster datafeeds with aggs":
   - do:
       ml.get_datafeeds:
-        datafeed_id: old-cluster-datafeed
-  - match: { datafeeds.0.datafeed_id: "old-cluster-datafeed"}
+        datafeed_id: old-cluster-datafeed-with-aggs
+  - match: { datafeeds.0.datafeed_id: "old-cluster-datafeed-with-aggs"}
   - length: { datafeeds.0.indices: 1 }
   - gte: { datafeeds.0.scroll_size: 2000 }
+  - is_false: datafeeds.0.script_fields
+  - match: { datafeeds.0.aggregations.buckets.date_histogram.field: time }
+  - match: { datafeeds.0.aggregations.buckets.aggregations.time.max.field: time }
 
   - do:
       ml.get_datafeed_stats:
-        datafeed_id: old-cluster-datafeed
+        datafeed_id: old-cluster-datafeed-with-aggs
   - match: { datafeeds.0.state: "stopped"}
   - is_false: datafeeds.0.node
 
   - do:
       ml.get_datafeeds:
-        datafeed_id: mixed-cluster-datafeed
-  - match: { datafeeds.0.datafeed_id: "mixed-cluster-datafeed"}
+        datafeed_id: mixed-cluster-datafeed-with-aggs
+  - match: { datafeeds.0.datafeed_id: "mixed-cluster-datafeed-with-aggs"}
   - length: { datafeeds.0.indices: 1 }
   - gte: { datafeeds.0.scroll_size: 2000 }
+  - is_false: datafeeds.0.script_fields
+  - match: { datafeeds.0.aggregations.buckets.date_histogram.field: time }
+  - match: { datafeeds.0.aggregations.buckets.aggregations.time.max.field: time }
 
   - do:
       ml.get_datafeed_stats:
-        datafeed_id: mixed-cluster-datafeed
+        datafeed_id: mixed-cluster-datafeed-with-aggs
   - match: { datafeeds.0.state: "stopped"}
   - is_false: datafeeds.0.node