Browse Source

Change exact match processor to match processor. (#46041)

Besides a rename, this changes allows to processor to attach multiple
enrich docs to the document being ingested.

Also in order to control the maximum number of enrich docs to be
included in the document being ingested, the `max_matches` setting
is added to the enrich processor.

Relates #32789
Martijn van Groningen 6 years ago
parent
commit
43ede36286
19 changed files with 185 additions and 131 deletions
  1. 4 4
      client/rest-high-level/src/test/java/org/elasticsearch/client/EnrichIT.java
  2. 2 2
      client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/EnrichDocumentationIT.java
  3. 16 14
      docs/reference/ingest/ingest-node.asciidoc
  4. 1 0
      docs/reference/ingest/processors/enrich.asciidoc
  5. 2 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java
  6. 5 4
      x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java
  7. 9 9
      x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml
  8. 2 2
      x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java
  9. 8 4
      x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java
  10. 39 30
      x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/MatchProcessor.java
  11. 27 22
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java
  12. 4 3
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java
  13. 2 2
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java
  14. 2 2
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceServiceTests.java
  15. 10 10
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java
  16. 3 3
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java
  17. 33 5
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java
  18. 1 1
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichRestartIT.java
  19. 15 12
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java

+ 4 - 4
client/rest-high-level/src/test/java/org/elasticsearch/client/EnrichIT.java

@@ -38,7 +38,7 @@ public class EnrichIT extends ESRestHighLevelClientTestCase {
     public void testCRUD() throws Exception {
         final EnrichClient enrichClient = highLevelClient().enrich();
         PutPolicyRequest putPolicyRequest =
-            new PutPolicyRequest("my-policy", "exact_match", List.of("my-index"), "enrich_key", List.of("enrich_value"));
+            new PutPolicyRequest("my-policy", "match", List.of("my-index"), "enrich_key", List.of("enrich_value"));
         AcknowledgedResponse putPolicyResponse = execute(putPolicyRequest, enrichClient::putPolicy, enrichClient::putPolicyAsync);
         assertThat(putPolicyResponse.isAcknowledged(), is(true));
 
@@ -50,9 +50,9 @@ public class EnrichIT extends ESRestHighLevelClientTestCase {
         List<?> responsePolicies = (List<?>) responseBody.get("policies");
         assertThat(responsePolicies.size(), equalTo(1));
         Map<?, ?> responsePolicy = (Map<?, ?>) responsePolicies.get(0);
-        assertThat(XContentMapValues.extractValue("exact_match.indices", responsePolicy), equalTo(putPolicyRequest.getIndices()));
-        assertThat(XContentMapValues.extractValue("exact_match.match_field", responsePolicy), equalTo(putPolicyRequest.getMatchField()));
-        assertThat(XContentMapValues.extractValue("exact_match.enrich_fields", responsePolicy),
+        assertThat(XContentMapValues.extractValue("match.indices", responsePolicy), equalTo(putPolicyRequest.getIndices()));
+        assertThat(XContentMapValues.extractValue("match.match_field", responsePolicy), equalTo(putPolicyRequest.getMatchField()));
+        assertThat(XContentMapValues.extractValue("match.enrich_fields", responsePolicy),
             equalTo(putPolicyRequest.getEnrichFields()));
 
         DeletePolicyRequest deletePolicyRequest = new DeletePolicyRequest("my-policy");

+ 2 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/EnrichDocumentationIT.java

@@ -49,7 +49,7 @@ public class EnrichDocumentationIT extends ESRestHighLevelClientTestCase {
         RestHighLevelClient client = highLevelClient();
         // tag::enrich-put-policy-request
         PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
-            "users-policy", "exact_match", List.of("users"),
+            "users-policy", "match", List.of("users"),
             "email", List.of("address", "zip", "city", "state"));
         // end::enrich-put-policy-request
 
@@ -95,7 +95,7 @@ public class EnrichDocumentationIT extends ESRestHighLevelClientTestCase {
         {
             // Add a policy, so that it can be deleted:
             PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
-                "users-policy", "exact_match", List.of("users"),
+                "users-policy", "match", List.of("users"),
                 "email", List.of("address", "zip", "city", "state"));
             client.enrich().putPolicy(putPolicyRequest, RequestOptions.DEFAULT);
         }

+ 16 - 14
docs/reference/ingest/ingest-node.asciidoc

@@ -811,7 +811,7 @@ The policy type of the policy determines what kind of enrichment an `enrich` pro
 
 The following policy types are currently supported:
 
-* `exact_match` - Can lookup exactly one document and use its content to enrich the document being ingested.
+* `match` - Can lookup documents by running a term query and use the retrieved content to enrich the document being ingested.
 
 [[enrich-processor-getting-started]]
 === Getting started
@@ -843,7 +843,7 @@ Create an enrich policy:
 --------------------------------------------------
 PUT /_enrich/policy/users-policy
 {
-    "exact_match": {
+    "match": {
         "indices": "users",
         "match_field": "email",
         "enrich_fields": ["first_name", "last_name", "address", "city", "zip", "state"]
@@ -923,15 +923,17 @@ Which returns:
   "_seq_no": 55,
   "_primary_term": 1,
   "_source": {
-    "user": {
-      "email": "mardy.brown@email.me",
-      "first_name": "Mardy",
-      "last_name": "Brown",
-      "zip": 70116,
-      "address": "6649 N Blue Gum St",
-      "city": "New Orleans",
-      "state": "LA"
-    },
+    "user": [
+      {
+          "email": "mardy.brown@email.me",
+          "first_name": "Mardy",
+          "last_name": "Brown",
+          "zip": 70116,
+          "address": "6649 N Blue Gum St",
+          "city": "New Orleans",
+          "state": "LA"
+      }
+    ],
     "email": "mardy.brown@email.me"
   }
 }
@@ -976,7 +978,7 @@ Request:
 --------------------------------------------------
 PUT /_enrich/policy/my-policy
 {
-    "exact_match": {
+    "match": {
         "indices": "users",
         "match_field": "email",
         "enrich_fields": ["first_name", "last_name", "address", "city", "zip", "state"]
@@ -1016,7 +1018,7 @@ Response:
 {
     "policies": [
         {
-            "exact_match": {
+            "match": {
                 "name" : "my-policy",
                 "indices" : ["users"],
                 "match_field" : "email",
@@ -1053,7 +1055,7 @@ Response:
 {
     "policies": [
         {
-            "exact_match": {
+            "match": {
                 "name" : "my-policy",
                 "indices" : ["users"],
                 "match_field" : "email",

+ 1 - 0
docs/reference/ingest/processors/enrich.asciidoc

@@ -17,5 +17,6 @@ check out the <<enrich-processor-getting-started,getting started>> to get famili
 | `target_field`     | yes       | -                    | The field that will be used for the enrichment data.
 | `ignore_missing`   | no        | false                | If `true` and `field` does not exist, the processor quietly exits without modifying the document
 | `override`         | no        | true                 | If processor will update fields with pre-existing non-null-valued field. When set to `false`, such fields will not be touched.
+| `max_matches`      | no        | 1                    | The maximum number of matched documents to include under the configured target field. In order to avoid documents getting too large, the maximum allowed value is 128.
 include::common-options.asciidoc[]
 |======

+ 2 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java

@@ -33,8 +33,8 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
 
     public static final String ENRICH_INDEX_NAME_BASE = ".enrich-";
 
-    public static final String EXACT_MATCH_TYPE = "exact_match";
-    public static final String[] SUPPORTED_POLICY_TYPES = new String[]{EXACT_MATCH_TYPE};
+    public static final String MATCH_TYPE = "match";
+    public static final String[] SUPPORTED_POLICY_TYPES = new String[]{MATCH_TYPE};
 
     private static final ParseField QUERY = new ParseField("query");
     private static final ParseField INDICES = new ParseField("indices");

+ 5 - 4
x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java

@@ -28,13 +28,13 @@ import static org.hamcrest.Matchers.equalTo;
 public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
 
     @After
-    private void deletePolicies() throws Exception {
+    public void deletePolicies() throws Exception {
         Map<String, Object> responseMap = toMap(adminClient().performRequest(new Request("GET", "/_enrich/policy")));
         @SuppressWarnings("unchecked")
         List<Map<?,?>> policies = (List<Map<?,?>>) responseMap.get("policies");
 
         for (Map<?, ?> entry: policies) {
-            client().performRequest(new Request("DELETE", "/_enrich/policy/" + XContentMapValues.extractValue("exact_match.name", entry)));
+            client().performRequest(new Request("DELETE", "/_enrich/policy/" + XContentMapValues.extractValue("match.name", entry)));
         }
     }
 
@@ -71,7 +71,8 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
         // Check if document has been enriched
         Request getRequest = new Request("GET", "/my-index/_doc/1");
         Map<String, Object> response = toMap(client().performRequest(getRequest));
-        Map<?, ?> _source = (Map<?, ?>) ((Map<?, ?>) response.get("_source")).get("entry");
+        List<?> entries = (List<?>) ((Map<?, ?>) response.get("_source")).get("entry");
+        Map<?, ?> _source = (Map<?, ?>) entries.get(0);
         assertThat(_source.size(), equalTo(4));
         assertThat(_source.get("host"), equalTo("elastic.co"));
         assertThat(_source.get("tld"), equalTo("co"));
@@ -132,7 +133,7 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
     }
 
     public static String generatePolicySource(String index) throws IOException {
-        XContentBuilder source = jsonBuilder().startObject().startObject("exact_match");
+        XContentBuilder source = jsonBuilder().startObject().startObject("match");
         {
             source.field("indices", index);
             if (randomBoolean()) {

+ 9 - 9
x-pack/plugin/enrich/qa/rest/src/test/resources/rest-api-spec/test/enrich/10_basic.yml

@@ -5,7 +5,7 @@
       enrich.put_policy:
         name: policy-crud
         body:
-          exact_match:
+          match:
             indices: ["bar*"]
             match_field: baz
             enrich_fields: ["a", "b"]
@@ -20,18 +20,18 @@
       enrich.get_policy:
         name: policy-crud
   - length: { policies: 1 }
-  - match: { policies.0.exact_match.name: policy-crud }
-  - match: { policies.0.exact_match.indices: ["bar*"] }
-  - match: { policies.0.exact_match.match_field: baz }
-  - match: { policies.0.exact_match.enrich_fields: ["a", "b"] }
+  - match: { policies.0.match.name: policy-crud }
+  - match: { policies.0.match.indices: ["bar*"] }
+  - match: { policies.0.match.match_field: baz }
+  - match: { policies.0.match.enrich_fields: ["a", "b"] }
 
   - do:
       enrich.get_policy: {}
   - length: { policies: 1 }
-  - match: { policies.0.exact_match.name: policy-crud }
-  - match: { policies.0.exact_match.indices: ["bar*"] }
-  - match: { policies.0.exact_match.match_field: baz }
-  - match: { policies.0.exact_match.enrich_fields: ["a", "b"] }
+  - match: { policies.0.match.name: policy-crud }
+  - match: { policies.0.match.indices: ["bar*"] }
+  - match: { policies.0.match.match_field: baz }
+  - match: { policies.0.match.enrich_fields: ["a", "b"] }
 
   - do:
       enrich.delete_policy:

+ 2 - 2
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java

@@ -192,9 +192,9 @@ public class EnrichPolicyRunner implements Runnable {
     }
 
     private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
-        // Currently the only supported policy type is EnrichPolicy.EXACT_MATCH_TYPE, which is a keyword type
+        // Currently the only supported policy type is EnrichPolicy.MATCH_TYPE, which is a keyword type
         String keyType;
-        if (EnrichPolicy.EXACT_MATCH_TYPE.equals(policy.getType())) {
+        if (EnrichPolicy.MATCH_TYPE.equals(policy.getType())) {
             keyType = "keyword";
         } else {
             throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType());

+ 8 - 4
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

@@ -49,12 +49,16 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
 
         boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
         boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", true);
-        String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");;
+        String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");
+        int maxMatches = ConfigurationUtils.readIntProperty(TYPE, tag, config, "max_matches", 1);
+        if (maxMatches < 1 || maxMatches > 128) {
+            throw ConfigurationUtils.newConfigurationException(TYPE, tag, "max_matches", "should be between 1 and 1024");
+        }
 
         switch (policyType) {
-            case EnrichPolicy.EXACT_MATCH_TYPE:
-                return new ExactMatchProcessor(tag, client, policyName, field, targetField, matchField,
-                    ignoreMissing, overrideEnabled);
+            case EnrichPolicy.MATCH_TYPE:
+                return new MatchProcessor(tag, client, policyName, field, targetField, matchField,
+                    ignoreMissing, overrideEnabled, maxMatches);
             default:
                 throw new IllegalArgumentException("unsupported policy type [" + policyType + "]");
         }

+ 39 - 30
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java → x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/MatchProcessor.java

@@ -18,12 +18,12 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
 
-public final class ExactMatchProcessor extends AbstractEnrichProcessor {
-
-    static final String ENRICH_KEY_FIELD_NAME = "enrich_key_field";
+public final class MatchProcessor extends AbstractEnrichProcessor {
 
     private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
     private final String field;
@@ -31,34 +31,39 @@ public final class ExactMatchProcessor extends AbstractEnrichProcessor {
     private final String matchField;
     private final boolean ignoreMissing;
     private final boolean overrideEnabled;
-
-    ExactMatchProcessor(String tag,
-                        Client client,
-                        String policyName,
-                        String field,
-                        String targetField,
-                        String matchField,
-                        boolean ignoreMissing,
-                        boolean overrideEnabled) {
+    private final int maxMatches;
+
+    MatchProcessor(String tag,
+                   Client client,
+                   String policyName,
+                   String field,
+                   String targetField,
+                   String matchField,
+                   boolean ignoreMissing,
+                   boolean overrideEnabled,
+                   int maxMatches) {
         this(
             tag,
             createSearchRunner(client),
             policyName,
             field,
             targetField,
-            matchField, ignoreMissing,
-            overrideEnabled
+            matchField,
+            ignoreMissing,
+            overrideEnabled,
+            maxMatches
         );
     }
 
-    ExactMatchProcessor(String tag,
-                        BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
-                        String policyName,
-                        String field,
-                        String targetField,
-                        String matchField,
-                        boolean ignoreMissing,
-                        boolean overrideEnabled) {
+    MatchProcessor(String tag,
+                   BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
+                   String policyName,
+                   String field,
+                   String targetField,
+                   String matchField,
+                   boolean ignoreMissing,
+                   boolean overrideEnabled,
+                   int maxMatches) {
         super(tag, policyName);
         this.searchRunner = searchRunner;
         this.field = field;
@@ -66,6 +71,7 @@ public final class ExactMatchProcessor extends AbstractEnrichProcessor {
         this.matchField = matchField;
         this.ignoreMissing = ignoreMissing;
         this.overrideEnabled = overrideEnabled;
+        this.maxMatches = maxMatches;
     }
 
     @Override
@@ -82,7 +88,7 @@ public final class ExactMatchProcessor extends AbstractEnrichProcessor {
             ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(termQuery);
             SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
             searchBuilder.from(0);
-            searchBuilder.size(1);
+            searchBuilder.size(maxMatches);
             searchBuilder.trackScores(false);
             searchBuilder.fetchSource(true);
             searchBuilder.query(constantScore);
@@ -105,16 +111,15 @@ public final class ExactMatchProcessor extends AbstractEnrichProcessor {
                 if (searchHits.length < 1) {
                     handler.accept(ingestDocument, null);
                     return;
-                } else if (searchHits.length > 1) {
-                    handler.accept(null, new IllegalStateException("more than one doc id matching for [" + matchField + "]"));
-                    return;
                 }
 
-                // If a document is returned, add its fields to the document
-                Map<String, Object> enrichDocument = searchHits[0].getSourceAsMap();
-                assert enrichDocument != null : "enrich document for id [" + field + "] was empty despite non-zero search hits length";
                 if (overrideEnabled || ingestDocument.hasField(targetField) == false) {
-                    ingestDocument.setFieldValue(targetField, enrichDocument);
+                    List<Map<String, Object>> enrichDocuments = new ArrayList<>(searchHits.length);
+                    for (SearchHit searchHit : searchHits) {
+                        Map<String, Object> enrichDocument = searchHit.getSourceAsMap();
+                        enrichDocuments.add(enrichDocument);
+                    }
+                    ingestDocument.setFieldValue(targetField, enrichDocuments);
                 }
                 handler.accept(ingestDocument, null);
             });
@@ -153,6 +158,10 @@ public final class ExactMatchProcessor extends AbstractEnrichProcessor {
         return overrideEnabled;
     }
 
+    int getMaxMatches() {
+        return maxMatches;
+    }
+
     private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
         return (req, handler) -> {
             client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(

+ 27 - 22
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java

@@ -44,18 +44,19 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
 
     public void testIngestDataWithEnrichProcessor() {
         int numDocs = 32;
-        List<String> keys = createSourceIndex(numDocs);
+        int maxMatches = randomIntBetween(2, 8);
+        List<String> keys = createSourceIndex(numDocs, maxMatches);
 
         String policyName = "my-policy";
         EnrichPolicy enrichPolicy =
-            new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(SOURCE_INDEX_NAME), MATCH_FIELD, List.of(DECORATE_FIELDS));
+            new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(SOURCE_INDEX_NAME), MATCH_FIELD, List.of(DECORATE_FIELDS));
         PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
         client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
         client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
 
         String pipelineName = "my-pipeline";
         String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName +
-            "\", \"field\": \"" + MATCH_FIELD + "\", \"target_field\": \"user\"}}]}";
+            "\", \"field\": \"" + MATCH_FIELD + "\", \"target_field\": \"users\", \"max_matches\": " + maxMatches + "}}]}";
         PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
         client().admin().cluster().putPipeline(putPipelineRequest).actionGet();
 
@@ -74,17 +75,21 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
             assertThat(itemResponse.getId(), equalTo(Integer.toString(expectedId++)));
         }
 
-        for (int i = 0; i < numDocs; i++) {
-            GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet();
+        for (int doc = 0; doc < numDocs; doc++) {
+            GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(doc))).actionGet();
             Map<String, Object> source = getResponse.getSourceAsMap();
-            Map<?, ?> userEntry = (Map<?, ?>) source.get("user");
-            assertThat(userEntry, notNullValue());
-            assertThat(userEntry.size(), equalTo(DECORATE_FIELDS.length + 1));
-            for (int j = 0; j < 3; j++) {
-                String field = DECORATE_FIELDS[j];
-                assertThat(userEntry.get(field), equalTo(keys.get(i) + j));
+            List<?> userEntries = (List<?>) source.get("users");
+            assertThat(userEntries, notNullValue());
+            assertThat(userEntries.size(), equalTo(maxMatches));
+            for (int i = 0; i < maxMatches; i++) {
+                Map<?, ?> userEntry = (Map<?, ?>) userEntries.get(i);
+                assertThat(userEntry.size(), equalTo(DECORATE_FIELDS.length + 1));
+                for (int j = 0; j < 3; j++) {
+                    String field = DECORATE_FIELDS[j];
+                    assertThat(userEntry.get(field), equalTo(keys.get(doc) + j));
+                }
+                assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true));
             }
-            assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true));
         }
     }
 
@@ -99,7 +104,7 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
             client().admin().indices().refresh(new RefreshRequest("source-" + i)).actionGet();
 
             EnrichPolicy enrichPolicy =
-                new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("source-" + i), "key", List.of("value"));
+                new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source-" + i), "key", List.of("value"));
             PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
             client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
             client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
@@ -126,24 +131,24 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
             GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet();
             Map<String, Object> source = getResponse.getSourceAsMap();
             assertThat(source.size(), equalTo(2));
-            assertThat(source.get("target"), equalTo(Map.of("key", "key", "value", "val" + i)));
+            assertThat(source.get("target"), equalTo(List.of(Map.of("key", "key", "value", "val" + i))));
         }
     }
 
-    private List<String> createSourceIndex(int numDocs) {
+    private List<String> createSourceIndex(int numKeys, int numDocsPerKey) {
         Set<String> keys = new HashSet<>();
-        for (int i = 0; i < numDocs; i++) {
+        for (int id = 0; id < numKeys; id++) {
             String key;
             do {
                 key = randomAlphaOfLength(16);
             } while (keys.add(key) == false);
 
-            IndexRequest indexRequest = new IndexRequest(SOURCE_INDEX_NAME);
-            indexRequest.create(true);
-            indexRequest.id(key);
-            indexRequest.source(Map.of(MATCH_FIELD, key, DECORATE_FIELDS[0], key + "0",
-                DECORATE_FIELDS[1], key + "1", DECORATE_FIELDS[2], key + "2"));
-            client().index(indexRequest).actionGet();
+            for (int doc = 0; doc < numDocsPerKey; doc++) {
+                IndexRequest indexRequest = new IndexRequest(SOURCE_INDEX_NAME);
+                indexRequest.source(Map.of(MATCH_FIELD, key, DECORATE_FIELDS[0], key + "0",
+                    DECORATE_FIELDS[1], key + "1", DECORATE_FIELDS[2], key + "2"));
+                client().index(indexRequest).actionGet();
+            }
         }
         client().admin().indices().refresh(new RefreshRequest(SOURCE_INDEX_NAME)).actionGet();
         return List.copyOf(keys);

+ 4 - 3
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java

@@ -62,7 +62,7 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
         for (int i = 0; i < numPolicies; i++) {
             String policyName = POLICY_NAME + i;
             EnrichPolicy enrichPolicy =
-                new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(SOURCE_INDEX_NAME), MATCH_FIELD, List.of(DECORATE_FIELDS));
+                new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(SOURCE_INDEX_NAME), MATCH_FIELD, List.of(DECORATE_FIELDS));
             PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
             client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
             client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
@@ -134,7 +134,8 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
         for (int i = 0; i < numDocs; i++) {
             GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet();
             Map<String, Object> source = getResponse.getSourceAsMap();
-            Map<?, ?> userEntry = (Map<?, ?>) source.get("user");
+            List<?> entries = (List<?>) source.get("user");
+            Map<?, ?> userEntry = (Map<?, ?>) entries.get(0);
             assertThat(userEntry.size(), equalTo(DECORATE_FIELDS.length + 1));
             assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true));
             for (String field : DECORATE_FIELDS) {
@@ -164,7 +165,7 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
 
     private static void createAndExecutePolicy() {
         EnrichPolicy enrichPolicy =
-            new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(SOURCE_INDEX_NAME), MATCH_FIELD, List.of(DECORATE_FIELDS));
+            new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(SOURCE_INDEX_NAME), MATCH_FIELD, List.of(DECORATE_FIELDS));
         PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy);
         client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
         client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(POLICY_NAME)).actionGet();

+ 2 - 2
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java

@@ -100,7 +100,7 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
 
     public void testNonConcurrentPolicyExecution() throws InterruptedException {
         String testPolicyName = "test_policy";
-        EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("some_index"), "keyfield",
+        EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("some_index"), "keyfield",
             List.of("valuefield"));
         final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(Settings.EMPTY, null, null, testThreadPool,
             new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong);
@@ -140,7 +140,7 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
     public void testMaximumPolicyExecutionLimit() throws InterruptedException {
         String testPolicyBaseName = "test_policy_";
         Settings testSettings = Settings.builder().put(EnrichPlugin.ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS.getKey(), 2).build();
-        EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("some_index"), "keyfield",
+        EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("some_index"), "keyfield",
             List.of("valuefield"));
         final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(testSettings, null, null, testThreadPool,
             new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong);

+ 2 - 2
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceServiceTests.java

@@ -32,7 +32,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 
-import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.EXACT_MATCH_TYPE;
+import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 
@@ -113,7 +113,7 @@ public class EnrichPolicyMaintenanceServiceTests extends ESSingleNodeTestCase {
         for (int i = 0; i < randomIntBetween(1, 3); i++) {
             enrichKeys.add(randomAlphaOfLength(10));
         }
-        return new EnrichPolicy(EXACT_MATCH_TYPE, null, List.of(randomAlphaOfLength(10)), randomAlphaOfLength(10), enrichKeys);
+        return new EnrichPolicy(MATCH_TYPE, null, List.of(randomAlphaOfLength(10)), randomAlphaOfLength(10), enrichKeys);
     }
 
     private void addPolicy(String policyName, EnrichPolicy policy) throws InterruptedException {

+ 10 - 10
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java

@@ -91,7 +91,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
         assertThat(sourceDocMap.get("field5"), is(equalTo("value5")));
 
         List<String> enrichFields = List.of("field2", "field5");
-        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields);
+        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields);
         String policyName = "test1";
 
         final long createTime = randomNonNegativeLong();
@@ -187,7 +187,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
 
         String sourceIndexPattern = baseSourceName + "*";
         List<String> enrichFields = List.of("idx", "field2", "field5");
-        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(sourceIndexPattern), "field1", enrichFields);
+        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndexPattern), "field1", enrichFields);
         String policyName = "test1";
 
         final long createTime = randomNonNegativeLong();
@@ -247,7 +247,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
         final String sourceIndex = "source-index";
 
         List<String> enrichFields = List.of("field2", "field5");
-        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields);
+        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields);
         String policyName = "test1";
 
         final long createTime = randomNonNegativeLong();
@@ -274,7 +274,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
         assertTrue(createResponse.isAcknowledged());
 
         List<String> enrichFields = List.of("field2", "field5");
-        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields);
+        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields);
         String policyName = "test1";
 
         final long createTime = randomNonNegativeLong();
@@ -322,7 +322,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
 
         String policyName = "test1";
         List<String> enrichFields = List.of("field2");
-        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(sourceIndex), "nesting.key", enrichFields);
+        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "nesting.key", enrichFields);
 
         final long createTime = randomNonNegativeLong();
         final AtomicReference<Exception> exception = new AtomicReference<>();
@@ -371,7 +371,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
 
         String policyName = "test1";
         List<String> enrichFields = List.of("nesting.field2", "missingField");
-        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(sourceIndex), "key", enrichFields);
+        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "key", enrichFields);
 
         final long createTime = randomNonNegativeLong();
         final AtomicReference<Exception> exception = new AtomicReference<>();
@@ -451,7 +451,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
 
         String policyName = "test1";
         List<String> enrichFields = List.of("data.field2", "missingField");
-        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(sourceIndex), "data.field1", enrichFields);
+        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "data.field1", enrichFields);
 
         final long createTime = randomNonNegativeLong();
         final AtomicReference<Exception> exception = new AtomicReference<>();
@@ -573,7 +573,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
 
         String policyName = "test1";
         List<String> enrichFields = List.of("data.field2", "missingField");
-        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(sourceIndex), "data.field1", enrichFields);
+        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "data.field1", enrichFields);
 
         final long createTime = randomNonNegativeLong();
         final AtomicReference<Exception> exception = new AtomicReference<>();
@@ -702,7 +702,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
 
         String policyName = "test1";
         List<String> enrichFields = List.of("data.fields.field2", "missingField");
-        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(sourceIndex), "data.fields.field1",
+        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "data.fields.field1",
             enrichFields);
 
         final long createTime = randomNonNegativeLong();
@@ -824,7 +824,7 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
 
         String policyName = "test1";
         List<String> enrichFields = List.of("data.field2", "missingField");
-        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(sourceIndex), "data.field1", enrichFields);
+        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "data.field1", enrichFields);
 
         final long createTime = randomNonNegativeLong();
         final AtomicReference<Exception> exception = new AtomicReference<>();

+ 3 - 3
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java

@@ -37,7 +37,7 @@ public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {
         createIndex("index", Settings.EMPTY, "_doc", "key1", "type=keyword", "field1", "type=keyword");
 
         EnrichPolicy instance1 =
-            new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("index"), "key1", List.of("field1"));
+            new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("index"), "key1", List.of("field1"));
         PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request("my_policy", instance1);
         assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
         assertAcked(client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request("my_policy")).actionGet());
@@ -47,10 +47,10 @@ public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {
         PutPipelineRequest putPipelineRequest = new PutPipelineRequest("1", new BytesArray(pipelineConfig), XContentType.JSON);
         assertAcked(client().admin().cluster().putPipeline(putPipelineRequest).actionGet());
         Pipeline pipelineInstance1 = ingestService.getPipeline("1");
-        assertThat(pipelineInstance1.getProcessors().get(0), instanceOf(ExactMatchProcessor.class));
+        assertThat(pipelineInstance1.getProcessors().get(0), instanceOf(MatchProcessor.class));
 
         EnrichPolicy instance2 =
-            new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("index"), "key2", List.of("field2"));
+            new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("index"), "key2", List.of("field2"));
         ResourceAlreadyExistsException exc = expectThrows(ResourceAlreadyExistsException.class, () ->
             client().execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request("my_policy", instance2)).actionGet());
         assertTrue(exc.getMessage().contains("policy [my_policy] already exists"));

+ 33 - 5
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java

@@ -30,7 +30,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
 
     public void testCreateProcessorInstance() throws Exception {
         List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
-        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("source_index"), "my_key",
+        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "my_key",
             enrichValues);
         EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
         factory.metaData = createMetaData("majestic", policy);
@@ -49,13 +49,19 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
             config.put("override", overrideEnabled);
         }
 
+        Integer maxMatches = null;
+        if (randomBoolean()) {
+            maxMatches = randomIntBetween(1, 128);
+            config.put("max_matches", maxMatches);
+        }
+
         int numRandomValues = randomIntBetween(1, 8);
         List<Tuple<String, String>> randomValues = new ArrayList<>(numRandomValues);
         for (int i = 0; i < numRandomValues; i++) {
             randomValues.add(new Tuple<>(randomFrom(enrichValues), randomAlphaOfLength(4)));
         }
 
-        ExactMatchProcessor result = (ExactMatchProcessor) factory.create(Collections.emptyMap(), "_tag", config);
+        MatchProcessor result = (MatchProcessor) factory.create(Collections.emptyMap(), "_tag", config);
         assertThat(result, notNullValue());
         assertThat(result.getPolicyName(), equalTo("majestic"));
         assertThat(result.getField(), equalTo("host"));
@@ -67,6 +73,11 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
         } else {
             assertThat(result.isOverrideEnabled(), is(true));
         }
+        if (maxMatches != null) {
+            assertThat(result.getMaxMatches(), equalTo(maxMatches));
+        } else {
+            assertThat(result.getMaxMatches(), equalTo(1));
+        }
     }
 
     public void testPolicyDoesNotExist() {
@@ -146,7 +157,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
 
     public void testCompactEnrichValuesFormat() throws Exception {
         List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
-        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("source_index"), "host",
+        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "host",
             enrichValues);
         EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
         factory.metaData = createMetaData("majestic", policy);
@@ -156,7 +167,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
         config.put("field", "host");
         config.put("target_field", "entry");
 
-        ExactMatchProcessor result = (ExactMatchProcessor) factory.create(Collections.emptyMap(), "_tag", config);
+        MatchProcessor result = (MatchProcessor) factory.create(Collections.emptyMap(), "_tag", config);
         assertThat(result, notNullValue());
         assertThat(result.getPolicyName(), equalTo("majestic"));
         assertThat(result.getField(), equalTo("host"));
@@ -165,7 +176,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
 
     public void testNoTargetField() throws Exception {
         List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
-        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of("source_index"), "host",
+        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "host",
             enrichValues);
         EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
         factory.metaData = createMetaData("majestic", policy);
@@ -178,6 +189,23 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
         assertThat(e.getMessage(), equalTo("[target_field] required property is missing"));
     }
 
+    public void testIllegalMaxMatches() throws Exception {
+        List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
+        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "my_key",
+            enrichValues);
+        EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
+        factory.metaData = createMetaData("majestic", policy);
+
+        Map<String, Object> config = new HashMap<>();
+        config.put("policy_name", "majestic");
+        config.put("field", "host");
+        config.put("target_field", "entry");
+        config.put("max_matches", randomBoolean() ? between(-2048, 0) : between(129, 2048));
+
+        Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(Collections.emptyMap(), "_tag", config));
+        assertThat(e.getMessage(), equalTo("[max_matches] should be between 1 and 1024"));
+    }
+
     static MetaData createMetaData(String name, EnrichPolicy policy) throws IOException {
         Settings settings = Settings.builder()
             .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)

+ 1 - 1
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichRestartIT.java

@@ -36,7 +36,7 @@ public class EnrichRestartIT extends ESIntegTestCase {
         internalCluster().startNode();
 
         EnrichPolicy enrichPolicy =
-            new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(SOURCE_INDEX_NAME), MATCH_FIELD, List.of(DECORATE_FIELDS));
+            new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(SOURCE_INDEX_NAME), MATCH_FIELD, List.of(DECORATE_FIELDS));
         for (int i = 0; i < numPolicies; i++) {
             String policyName = POLICY_NAME + i;
             PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);

+ 15 - 12
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java → x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java

@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
 
@@ -42,11 +43,12 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 
-public class ExactMatchProcessorTests extends ESTestCase {
+public class MatchProcessorTests extends ESTestCase {
 
     public void testBasics() throws Exception {
+        int maxMatches = randomIntBetween(1, 8);
         MockSearchFunction mockSearch = mockedSearchFunction(Map.of("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co")));
-        ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true);
+        MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true, maxMatches);
         IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
             Map.of("domain", "elastic.co"));
         // Run
@@ -58,7 +60,7 @@ public class ExactMatchProcessorTests extends ESTestCase {
         assertThat(request.indices().length, equalTo(1));
         assertThat(request.indices()[0], equalTo(".enrich-_name"));
         assertThat(request.preference(), equalTo(Preference.LOCAL.type()));
-        assertThat(request.source().size(), equalTo(1));
+        assertThat(request.source().size(), equalTo(maxMatches));
         assertThat(request.source().trackScores(), equalTo(false));
         assertThat(request.source().fetchSource().fetchSource(), equalTo(true));
         assertThat(request.source().fetchSource().excludes(), emptyArray());
@@ -69,7 +71,8 @@ public class ExactMatchProcessorTests extends ESTestCase {
         assertThat(termQueryBuilder.fieldName(), equalTo("domain"));
         assertThat(termQueryBuilder.value(), equalTo("elastic.co"));
         // Check result
-        Map<?, ?> entry = ingestDocument.getFieldValue("entry", Map.class);
+        List<?> entries = ingestDocument.getFieldValue("entry", List.class);
+        Map<?, ?> entry = (Map<?, ?>) entries.get(0);
         assertThat(entry.size(), equalTo(3));
         assertThat(entry.get("globalRank"), equalTo(451));
         assertThat(entry.get("tldRank"), equalTo(23));
@@ -78,7 +81,7 @@ public class ExactMatchProcessorTests extends ESTestCase {
 
     public void testNoMatch() throws Exception {
         MockSearchFunction mockSearch = mockedSearchFunction();
-        ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true);
+        MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true, 1);
         IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
             Map.of("domain", "elastic.com"));
         int numProperties = ingestDocument.getSourceAndMetadata().size();
@@ -108,7 +111,7 @@ public class ExactMatchProcessorTests extends ESTestCase {
     public void testSearchFailure() throws Exception {
         String indexName = ".enrich-_name";
         MockSearchFunction mockSearch = mockedSearchFunction(new IndexNotFoundException(indexName));
-        ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true);
+        MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true, 1);
         IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
             Map.of("domain", "elastic.com"));
         // Run
@@ -142,8 +145,8 @@ public class ExactMatchProcessorTests extends ESTestCase {
 
     public void testIgnoreKeyMissing() throws Exception {
         {
-            ExactMatchProcessor processor =
-                new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", "domain", true, true);
+            MatchProcessor processor =
+                new MatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", "domain", true, true, 1);
             IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of());
 
             assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
@@ -153,8 +156,8 @@ public class ExactMatchProcessorTests extends ESTestCase {
             assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
         }
         {
-            ExactMatchProcessor processor =
-                new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", "domain", false, true);
+            MatchProcessor processor =
+                new MatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", "domain", false, true, 1);
             IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of());
             IngestDocument[] resultHolder = new IngestDocument[1];
             Exception[] exceptionHolder = new Exception[1];
@@ -170,7 +173,7 @@ public class ExactMatchProcessorTests extends ESTestCase {
 
     public void testExistingFieldWithOverrideDisabled() throws Exception {
         MockSearchFunction mockSearch = mockedSearchFunction(Map.of("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co")));
-        ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, false);
+        MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, false, 1);
 
         IngestDocument ingestDocument = new IngestDocument(new HashMap<>(Map.of("domain", "elastic.co", "tld", "tld")), Map.of());
         IngestDocument[] resultHolder = new IngestDocument[1];
@@ -186,7 +189,7 @@ public class ExactMatchProcessorTests extends ESTestCase {
 
     public void testExistingNullFieldWithOverrideDisabled() throws Exception {
         MockSearchFunction mockSearch = mockedSearchFunction(Map.of("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co")));
-        ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, false);
+        MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, false, 1);
 
         Map<String, Object> source = new HashMap<>();
         source.put("domain", "elastic.co");