Browse Source

Update enrich execution to only set index false on fields that support it (#98038)

This PR fixes a bug with the enrich policy runner's mapping generation 
logic to keep it from setting the index parameter on mapping field types
that do not support it.

Fixes #98019
James Baiera 2 years ago
parent
commit
e3ec8f1d84

+ 6 - 0
docs/changelog/98038.yaml

@@ -0,0 +1,6 @@
+pr: 98038
+summary: Update enrich execution to only set index false on fields that support it
+area: Ingest Node
+type: bug
+issues:
+ - 98019

+ 3 - 0
server/src/main/java/org/elasticsearch/index/mapper/MapperService.java

@@ -564,4 +564,7 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
         return documentMapper().mapping().getRoot().dynamicTemplates();
     }
 
+    public MapperRegistry getMapperRegistry() {
+        return mapperRegistry;
+    }
 }

+ 1 - 0
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java

@@ -209,6 +209,7 @@ public class EnrichPlugin extends Plugin implements SystemIndexPlugin, IngestPlu
         EnrichPolicyExecutor enrichPolicyExecutor = new EnrichPolicyExecutor(
             settings,
             clusterService,
+            indicesService,
             client,
             threadPool,
             expressionResolver,

+ 5 - 0
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java

@@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
@@ -41,6 +42,7 @@ public class EnrichPolicyExecutor {
     public static final String TASK_ACTION = "policy_execution";
 
     private final ClusterService clusterService;
+    private final IndicesService indicesService;
     private final Client client;
     private final ThreadPool threadPool;
     private final IndexNameExpressionResolver indexNameExpressionResolver;
@@ -54,6 +56,7 @@ public class EnrichPolicyExecutor {
     public EnrichPolicyExecutor(
         Settings settings,
         ClusterService clusterService,
+        IndicesService indicesService,
         Client client,
         ThreadPool threadPool,
         IndexNameExpressionResolver indexNameExpressionResolver,
@@ -61,6 +64,7 @@ public class EnrichPolicyExecutor {
         LongSupplier nowSupplier
     ) {
         this.clusterService = clusterService;
+        this.indicesService = indicesService;
         this.client = client;
         this.threadPool = threadPool;
         this.indexNameExpressionResolver = indexNameExpressionResolver;
@@ -215,6 +219,7 @@ public class EnrichPolicyExecutor {
             task,
             listener,
             clusterService,
+            indicesService,
             client,
             indexNameExpressionResolver,
             enrichIndexName,

+ 33 - 1
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java

@@ -33,19 +33,24 @@ import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.client.internal.FilterClient;
 import org.elasticsearch.client.internal.OriginSettingClient;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.common.util.iterable.Iterables;
+import org.elasticsearch.index.IndexVersion;
+import org.elasticsearch.index.mapper.Mapper;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.elasticsearch.index.reindex.ReindexRequest;
 import org.elasticsearch.index.reindex.ScrollableHitSource;
+import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.tasks.TaskCancelledException;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -84,6 +89,7 @@ public class EnrichPolicyRunner implements Runnable {
     private final ExecuteEnrichPolicyTask task;
     private final ActionListener<ExecuteEnrichPolicyStatus> listener;
     private final ClusterService clusterService;
+    private final IndicesService indicesService;
     private final Client client;
     private final IndexNameExpressionResolver indexNameExpressionResolver;
     private final String enrichIndexName;
@@ -96,6 +102,7 @@ public class EnrichPolicyRunner implements Runnable {
         ExecuteEnrichPolicyTask task,
         ActionListener<ExecuteEnrichPolicyStatus> listener,
         ClusterService clusterService,
+        IndicesService indicesService,
         Client client,
         IndexNameExpressionResolver indexNameExpressionResolver,
         String enrichIndexName,
@@ -107,6 +114,7 @@ public class EnrichPolicyRunner implements Runnable {
         this.task = Objects.requireNonNull(task);
         this.listener = Objects.requireNonNull(listener);
         this.clusterService = Objects.requireNonNull(clusterService);
+        this.indicesService = indicesService;
         this.client = wrapClient(client, policyName, task, clusterService);
         this.indexNameExpressionResolver = Objects.requireNonNull(indexNameExpressionResolver);
         this.enrichIndexName = enrichIndexName;
@@ -342,6 +350,7 @@ public class EnrichPolicyRunner implements Runnable {
     private XContentBuilder createEnrichMapping(List<Map<String, Object>> sourceMappings) {
         Map<String, Map<String, Object>> fieldMappings = new HashMap<>();
         Map<String, Object> mappingForMatchField = mappingForMatchField(policy, sourceMappings);
+        MapperService mapperService = createMapperServiceForValidation(indicesService, enrichIndexName);
         for (String enrichField : policy.getEnrichFields()) {
             if (enrichField.equals(policy.getMatchField())) {
                 mappingForMatchField = new HashMap<>(mappingForMatchField);
@@ -354,7 +363,9 @@ public class EnrichPolicyRunner implements Runnable {
                     if (typeAndFormat.format != null) {
                         mapping.put("format", typeAndFormat.format);
                     }
-                    mapping.put("index", false); // disable index
+                    if (isIndexableField(mapperService, enrichField, typeAndFormat.type, mapping)) {
+                        mapping.put("index", false);
+                    }
                     fieldMappings.put(enrichField, mapping);
                 }
             }
@@ -397,6 +408,27 @@ public class EnrichPolicyRunner implements Runnable {
         }
     }
 
+    private static MapperService createMapperServiceForValidation(IndicesService indicesService, String index) {
+        try {
+            final Settings idxSettings = Settings.builder()
+                .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
+                .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
+                .build();
+            IndexMetadata indexMetadata = IndexMetadata.builder(index).settings(idxSettings).numberOfShards(1).numberOfReplicas(0).build();
+            return indicesService.createIndexMapperServiceForValidation(indexMetadata);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private static boolean isIndexableField(MapperService mapperService, String field, String type, Map<String, Object> properties) {
+        properties = new HashMap<>(properties);
+        properties.put("index", false);
+        Mapper.TypeParser parser = mapperService.getMapperRegistry().getMapperParser(type, IndexVersion.current());
+        parser.parse(field, properties, mapperService.parserContext());
+        return properties.containsKey("index") == false;
+    }
+
     private void prepareAndCreateEnrichIndex(List<Map<String, Object>> mappings) {
         Settings enrichIndexSettings = Settings.builder()
             .put("index.number_of_shards", 1)

+ 4 - 0
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java

@@ -77,6 +77,7 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
         final EnrichPolicyExecutor testExecutor = new EnrichPolicyExecutor(
             Settings.EMPTY,
             null,
+            null,
             client,
             testThreadPool,
             TestIndexNameExpressionResolver.newInstance(testThreadPool.getThreadContext()),
@@ -132,6 +133,7 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
         final EnrichPolicyExecutor testExecutor = new EnrichPolicyExecutor(
             testSettings,
             null,
+            null,
             client,
             testThreadPool,
             TestIndexNameExpressionResolver.newInstance(testThreadPool.getThreadContext()),
@@ -266,6 +268,7 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
         final EnrichPolicyExecutor testExecutor = new EnrichPolicyExecutor(
             Settings.EMPTY,
             null,
+            null,
             client,
             testThreadPool,
             TestIndexNameExpressionResolver.newInstance(testThreadPool.getThreadContext()),
@@ -389,6 +392,7 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
             Settings.EMPTY,
             clusterService,
             null,
+            null,
             testThreadPool,
             TestIndexNameExpressionResolver.newInstance(testThreadPool.getThreadContext()),
             new EnrichPolicyLocks(),

+ 97 - 0
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java

@@ -45,6 +45,7 @@ import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.engine.Segment;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.ingest.common.IngestCommonPlugin;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.reindex.ReindexPlugin;
@@ -1871,6 +1872,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
             task,
             wrappedListener,
             clusterService,
+            getInstanceFromNode(IndicesService.class),
             client(),
             resolver,
             createdEnrichIndex,
@@ -2157,6 +2159,100 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
         assertThat(hit1, equalTo(Map.of("user", "u2", "date", "2023-05")));
     }
 
+    public void testEnrichObjectField() {
+        createIndex("source-1", Settings.EMPTY, "_doc", "id", "type=keyword", "name.first", "type=keyword", "name.last", "type=keyword");
+        client().prepareIndex("source-1")
+            .setSource("user", "u1", "name.first", "F1", "name.last", "L1")
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+            .get();
+        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source-1"), "user", List.of("name"));
+        String policyName = "test1";
+        final long createTime = randomNonNegativeLong();
+        String createdEnrichIndex = ".enrich-test1-" + createTime;
+        PlainActionFuture<ExecuteEnrichPolicyStatus> future = new PlainActionFuture<>();
+        EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, future, createdEnrichIndex);
+        enrichPolicyRunner.run();
+        future.actionGet();
+
+        // Validate Index definition
+        GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex);
+        Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap();
+        assertEnrichMapping(mapping, """
+            {
+              "user": {
+                "type": "keyword",
+                "doc_values": false
+              },
+              "name": {
+                "type": "object"
+              }
+            }
+            """);
+        SearchResponse searchResponse = client().search(new SearchRequest(".enrich-test1")).actionGet();
+        ElasticsearchAssertions.assertHitCount(searchResponse, 1L);
+        Map<String, Object> hit0 = searchResponse.getHits().getAt(0).getSourceAsMap();
+        assertThat(hit0, equalTo(Map.of("user", "u1", "name.first", "F1", "name.last", "L1")));
+    }
+
+    public void testEnrichNestedField() throws Exception {
+        final String sourceIndex = "source-index";
+        XContentBuilder mappingBuilder = JsonXContent.contentBuilder();
+        mappingBuilder.startObject()
+            .startObject(MapperService.SINGLE_MAPPING_NAME)
+            .startObject("properties")
+            .startObject("user")
+            .field("type", "keyword")
+            .endObject()
+            .startObject("nesting")
+            .field("type", "nested")
+            .startObject("properties")
+            .startObject("key")
+            .field("type", "keyword")
+            .endObject()
+            .endObject()
+            .endObject()
+            .startObject("field2")
+            .field("type", "integer")
+            .endObject()
+            .endObject()
+            .endObject()
+            .endObject();
+        CreateIndexResponse createResponse = indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(mappingBuilder)).actionGet();
+        assertTrue(createResponse.isAcknowledged());
+
+        String policyName = "test1";
+        List<String> enrichFields = List.of("nesting", "field2");
+        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "user", enrichFields);
+
+        final long createTime = randomNonNegativeLong();
+        String createdEnrichIndex = ".enrich-test1-" + createTime;
+        PlainActionFuture<ExecuteEnrichPolicyStatus> future = new PlainActionFuture<>();
+        EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, future, createdEnrichIndex);
+
+        logger.info("Starting policy run");
+        enrichPolicyRunner.run();
+        future.actionGet();
+
+        // Validate Index definition
+        GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex);
+        Map<String, Object> mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap();
+        assertEnrichMapping(mapping, """
+            {
+              "user": {
+                "type": "keyword",
+                "doc_values": false
+              },
+              "field2": {
+                "type": "integer",
+                "index": false
+              },
+              "nesting": {
+                "type": "nested"
+              }
+            }
+            """);
+    }
+
     private EnrichPolicyRunner createPolicyRunner(
         String policyName,
         EnrichPolicy policy,
@@ -2220,6 +2316,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
             task,
             wrappedListener,
             clusterService,
+            getInstanceFromNode(IndicesService.class),
             client,
             resolver,
             targetIndex,