Kaynağa Gözat

Fix Rollup job creation to work with templates (#43943)

The PutJob API accidentally used an "expert" API of CreateIndexRequest.
That API is semi-lenient to syntax; a type could be omitted and the
request would work as expected.  But if a type was omitted it would
not merge with templates correctly, leading to index creation that
only has the template and not the requested mappings in the request.

This commit refactors the PutJob API to:

- Include the type name
- Use a less "expert" API in an attempt to future proof against errors
- Uses an XContentBuilder instead of string replacing, removes json template
Zachary Tong 6 yıl önce
ebeveyn
işleme
ed3da66e8e

+ 0 - 26
x-pack/plugin/core/src/main/resources/rollup-dynamic-template.json

@@ -1,26 +0,0 @@
-{
-  "_meta":{
-    "_rollup": {
-      "ROLLUP_METADATA_PLACEHOLDER":"ROLLUP_METADATA_PLACEHOLDER"
-    },
-    "rollup-version": "${rollup.dynamic_template.version}"
-  },
-  "dynamic_templates": [
-    {
-      "strings": {
-        "match_mapping_type": "string",
-        "mapping": {
-          "type": "keyword"
-        }
-      }
-    },
-    {
-      "date_histograms": {
-        "path_match": "*.date_histogram.timestamp",
-        "mapping": {
-          "type": "date"
-        }
-      }
-    }
-  ]
-}

+ 0 - 11
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java

@@ -6,7 +6,6 @@
 package org.elasticsearch.xpack.rollup;
 
 import org.apache.lucene.util.SetOnce;
-import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.client.Client;
@@ -46,7 +45,6 @@ import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
 import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction;
 import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction;
 import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
-import org.elasticsearch.xpack.core.template.TemplateUtils;
 import org.elasticsearch.xpack.rollup.action.TransportDeleteRollupJobAction;
 import org.elasticsearch.xpack.rollup.action.TransportGetRollupCapsAction;
 import org.elasticsearch.xpack.rollup.action.TransportGetRollupIndexCapsAction;
@@ -73,7 +71,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.function.Supplier;
-import java.util.regex.Pattern;
 
 import static java.util.Collections.emptyList;
 
@@ -89,20 +86,12 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin
     public static final String TASK_THREAD_POOL_NAME = RollupField.NAME + "_indexing";
     public static final String SCHEDULE_THREAD_POOL_NAME = RollupField.NAME + "_scheduler";
 
-    public static final String MAPPING_METADATA_PLACEHOLDER = "\"ROLLUP_METADATA_PLACEHOLDER\":\"ROLLUP_METADATA_PLACEHOLDER\"";
     public static final String ROLLUP_TEMPLATE_VERSION_FIELD = "rollup-version";
-    public static final String ROLLUP_TEMPLATE_VERSION_PATTERN =
-            Pattern.quote("${rollup.dynamic_template.version}");
-
-    private static final String ROLLUP_TEMPLATE_NAME = "/rollup-dynamic-template.json";
-    public static final String DYNAMIC_MAPPING_TEMPLATE = TemplateUtils.loadTemplate(ROLLUP_TEMPLATE_NAME,
-            Version.CURRENT.toString(), Rollup.ROLLUP_TEMPLATE_VERSION_PATTERN);
 
     // list of headers that will be stored when a job is created
     public static final Set<String> HEADER_FILTERS =
             new HashSet<>(Arrays.asList("es-security-runas-user", "_xpack_security_authentication"));
 
-
     private final SetOnce<SchedulerEngine> schedulerEngine = new SetOnce<>();
     private final Settings settings;
     private final boolean enabled;

+ 44 - 7
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java

@@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ResourceAlreadyExistsException;
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
@@ -37,6 +38,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.logging.DeprecationLogger;
 import org.elasticsearch.common.time.DateUtils;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.license.LicenseUtils;
 import org.elasticsearch.license.XPackLicenseState;
@@ -105,7 +107,7 @@ public class TransportPutRollupJobAction extends TransportMasterNodeAction<PutRo
             .fields(request.getConfig().getAllFields().toArray(new String[0]));
         fieldCapsRequest.setParentTask(clusterService.localNode().getId(), task.getId());
 
-        client.fieldCaps(fieldCapsRequest, new ActionListener<FieldCapabilitiesResponse>() {
+        client.fieldCaps(fieldCapsRequest, new ActionListener<>() {
             @Override
             public void onResponse(FieldCapabilitiesResponse fieldCapabilitiesResponse) {
                 ActionRequestValidationException validationException = request.validateMappings(fieldCapabilitiesResponse.get());
@@ -145,13 +147,14 @@ public class TransportPutRollupJobAction extends TransportMasterNodeAction<PutRo
     static void createIndex(RollupJob job, ActionListener<AcknowledgedResponse> listener,
                             PersistentTasksService persistentTasksService, Client client, Logger logger) {
 
-        String jobMetadata = "\"" + job.getConfig().getId() + "\":" + job.getConfig().toJSONString();
-
-        String mapping = Rollup.DYNAMIC_MAPPING_TEMPLATE
-                .replace(Rollup.MAPPING_METADATA_PLACEHOLDER, jobMetadata);
-
         CreateIndexRequest request = new CreateIndexRequest(job.getConfig().getRollupIndex());
-        request.mapping(RollupField.TYPE_NAME, mapping, XContentType.JSON);
+        try {
+            XContentBuilder mapping = createMappings(job.getConfig());
+            request.source(mapping);
+        } catch (IOException e) {
+            listener.onFailure(e);
+            return;
+        }
 
         client.execute(CreateIndexAction.INSTANCE, request,
                 ActionListener.wrap(createIndexResponse -> startPersistentTask(job, listener, persistentTasksService), e -> {
@@ -166,6 +169,40 @@ public class TransportPutRollupJobAction extends TransportMasterNodeAction<PutRo
                 }));
     }
 
+    private static XContentBuilder createMappings(RollupJobConfig config) throws IOException {
+        return XContentBuilder.builder(XContentType.JSON.xContent())
+            .startObject()
+                .startObject("mappings")
+                    .startObject("_doc")
+                        .startObject("_meta")
+                            .field(Rollup.ROLLUP_TEMPLATE_VERSION_FIELD, Version.CURRENT.toString())
+                            .startObject("_rollup")
+                                .field(config.getId(), config)
+                            .endObject()
+                        .endObject()
+                        .startArray("dynamic_templates")
+                            .startObject()
+                                .startObject("strings")
+                                    .field("match_mapping_type", "string")
+                                    .startObject("mapping")
+                                        .field("type", "keyword")
+                                    .endObject()
+                                .endObject()
+                            .endObject()
+                            .startObject()
+                                .startObject("date_histograms")
+                                    .field("path_match", "*.date_histogram.timestamp")
+                                    .startObject("mapping")
+                                        .field("type", "date")
+                                    .endObject()
+                                .endObject()
+                            .endObject()
+                        .endArray()
+                    .endObject()
+                .endObject()
+            .endObject();
+    }
+
     @SuppressWarnings("unchecked")
     static void updateMapping(RollupJob job, ActionListener<AcknowledgedResponse> listener,
                               PersistentTasksService persistentTasksService, Client client, Logger logger) {

+ 2 - 2
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java

@@ -132,8 +132,8 @@ public class PutJobStateMachineTests extends ESTestCase {
             String mapping = requestCaptor.getValue().mappings().get("_doc");
 
             // Make sure the version is present, and we have our date template (the most important aspects)
-            assertThat(mapping, containsString("\"rollup-version\": \"" + Version.CURRENT.toString() + "\""));
-            assertThat(mapping, containsString("\"path_match\": \"*.date_histogram.timestamp\""));
+            assertThat(mapping, containsString("\"rollup-version\":\"" + Version.CURRENT.toString() + "\""));
+            assertThat(mapping, containsString("\"path_match\":\"*.date_histogram.timestamp\""));
 
             listenerCaptor.getValue().onFailure(new ResourceAlreadyExistsException(job.getConfig().getRollupIndex()));
             latch.countDown();

+ 83 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/put_job.yml

@@ -256,4 +256,87 @@ setup:
             ]
           }
 
+---
+"Test put job with templates":
+
+  - do:
+      indices.put_template:
+        name: test
+        body:
+          index_patterns: foo_*
+          mappings:
+            properties:
+              field:
+                type: keyword
+
+  - do:
+      headers:
+        Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
+      rollup.put_job:
+        id: foo
+        body:  >
+          {
+            "index_pattern": "foo",
+            "rollup_index": "foo_rollup",
+            "cron": "*/30 * * * * ?",
+            "page_size" :10,
+            "groups" : {
+              "date_histogram": {
+                "field": "the_field",
+                "calendar_interval": "1h"
+              }
+            },
+            "metrics": [
+              {
+                "field": "value_field",
+                "metrics": ["min", "max", "sum"]
+              }
+            ]
+          }
+  - is_true: acknowledged
+
+  - do:
+      indices.get_mapping:
+        index: foo_rollup
+
+  - set: {foo_rollup.mappings._meta.rollup-version: version}
+
+  - match:
+      foo_rollup:
+        mappings:
+          _meta:
+            _rollup:
+              foo:
+                id: "foo"
+                index_pattern: "foo"
+                rollup_index: "foo_rollup"
+                cron: "*/30 * * * * ?"
+                page_size: 10
+                groups :
+                  date_histogram:
+                    calendar_interval: "1h"
+                    field: "the_field"
+                    time_zone: "UTC"
+                metrics:
+                  - field: "value_field"
+                    metrics:
+                     - "min"
+                     - "max"
+                     - "sum"
+                timeout: "20s"
+            rollup-version: $version
+          dynamic_templates:
+            - strings:
+                match_mapping_type: "string"
+                mapping:
+                  type: "keyword"
+            - date_histograms:
+                path_match: "*.date_histogram.timestamp"
+                mapping:
+                  type: "date"
+          properties:
+            field:
+              type: "keyword"
+
+