Browse Source

[8.x] Fix handling of bulk requests with semantic text fields and delete ops (#116960)

* Fix handling of bulk requests with semantic text fields and delete ops (#116942)

Previously, delete operations were not processed correctly when followed by operations containing semantic text fields. This issue caused the positions of subsequent operations in the items array to shift incorrectly by one.

This PR resolves the discrepancy and includes additional tests to ensure proper behavior.

* fix compilation
Jim Ferenczi 11 months ago
parent
commit
c6e4bb4465

+ 5 - 0
docs/changelog/116942.yaml

@@ -0,0 +1,5 @@
+pr: 116942
+summary: Fix handling of bulk requests with semantic text fields and delete ops
+area: Relevance
+type: bug
+issues: []

+ 17 - 6
x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterIT.java

@@ -11,6 +11,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchResponse;
@@ -30,8 +31,10 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Locale;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map;
+import java.util.Set;
 
 
 import static org.elasticsearch.xpack.inference.mapper.SemanticTextFieldTests.randomSemanticTextInput;
 import static org.elasticsearch.xpack.inference.mapper.SemanticTextFieldTests.randomSemanticTextInput;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.equalTo;
@@ -87,30 +90,38 @@ public class ShardBulkInferenceActionFilterIT extends ESIntegTestCase {
 
 
         int totalBulkReqs = randomIntBetween(2, 100);
         int totalBulkReqs = randomIntBetween(2, 100);
         long totalDocs = 0;
         long totalDocs = 0;
+        Set<String> ids = new HashSet<>();
         for (int bulkReqs = 0; bulkReqs < totalBulkReqs; bulkReqs++) {
         for (int bulkReqs = 0; bulkReqs < totalBulkReqs; bulkReqs++) {
             BulkRequestBuilder bulkReqBuilder = client().prepareBulk();
             BulkRequestBuilder bulkReqBuilder = client().prepareBulk();
             int totalBulkSize = randomIntBetween(1, 100);
             int totalBulkSize = randomIntBetween(1, 100);
             for (int bulkSize = 0; bulkSize < totalBulkSize; bulkSize++) {
             for (int bulkSize = 0; bulkSize < totalBulkSize; bulkSize++) {
-                String id = Long.toString(totalDocs);
+                if (ids.size() > 0 && rarely(random())) {
+                    String id = randomFrom(ids);
+                    ids.remove(id);
+                    DeleteRequestBuilder request = new DeleteRequestBuilder(client(), INDEX_NAME).setId(id);
+                    bulkReqBuilder.add(request);
+                    continue;
+                }
+                String id = Long.toString(totalDocs++);
                 boolean isIndexRequest = randomBoolean();
                 boolean isIndexRequest = randomBoolean();
                 Map<String, Object> source = new HashMap<>();
                 Map<String, Object> source = new HashMap<>();
                 source.put("sparse_field", isIndexRequest && rarely() ? null : randomSemanticTextInput());
                 source.put("sparse_field", isIndexRequest && rarely() ? null : randomSemanticTextInput());
                 source.put("dense_field", isIndexRequest && rarely() ? null : randomSemanticTextInput());
                 source.put("dense_field", isIndexRequest && rarely() ? null : randomSemanticTextInput());
                 if (isIndexRequest) {
                 if (isIndexRequest) {
                     bulkReqBuilder.add(new IndexRequestBuilder(client()).setIndex(INDEX_NAME).setId(id).setSource(source));
                     bulkReqBuilder.add(new IndexRequestBuilder(client()).setIndex(INDEX_NAME).setId(id).setSource(source));
-                    totalDocs++;
+                    ids.add(id);
                 } else {
                 } else {
                     boolean isUpsert = randomBoolean();
                     boolean isUpsert = randomBoolean();
                     UpdateRequestBuilder request = new UpdateRequestBuilder(client()).setIndex(INDEX_NAME).setDoc(source);
                     UpdateRequestBuilder request = new UpdateRequestBuilder(client()).setIndex(INDEX_NAME).setDoc(source);
-                    if (isUpsert || totalDocs == 0) {
+                    if (isUpsert || ids.size() == 0) {
                         request.setDocAsUpsert(true);
                         request.setDocAsUpsert(true);
-                        totalDocs++;
                     } else {
                     } else {
                         // Update already existing document
                         // Update already existing document
-                        id = Long.toString(randomLongBetween(0, totalDocs - 1));
+                        id = randomFrom(ids);
                     }
                     }
                     request.setId(id);
                     request.setId(id);
                     bulkReqBuilder.add(request);
                     bulkReqBuilder.add(request);
+                    ids.add(id);
                 }
                 }
             }
             }
             BulkResponse bulkResponse = bulkReqBuilder.get();
             BulkResponse bulkResponse = bulkReqBuilder.get();
@@ -135,7 +146,7 @@ public class ShardBulkInferenceActionFilterIT extends ESIntegTestCase {
         SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0).trackTotalHits(true);
         SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0).trackTotalHits(true);
         SearchResponse searchResponse = client().search(new SearchRequest(INDEX_NAME).source(sourceBuilder)).get();
         SearchResponse searchResponse = client().search(new SearchRequest(INDEX_NAME).source(sourceBuilder)).get();
         try {
         try {
-            assertThat(searchResponse.getHits().getTotalHits().value, equalTo(totalDocs));
+            assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) ids.size()));
         } finally {
         } finally {
             searchResponse.decRef();
             searchResponse.decRef();
         }
         }

+ 2 - 1
x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java

@@ -35,7 +35,8 @@ public class InferenceFeatures implements FeatureSpecification {
     public Set<NodeFeature> getTestFeatures() {
     public Set<NodeFeature> getTestFeatures() {
         return Set.of(
         return Set.of(
             SemanticTextFieldMapper.SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX,
             SemanticTextFieldMapper.SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX,
-            SemanticTextFieldMapper.SEMANTIC_TEXT_SINGLE_FIELD_UPDATE_FIX
+            SemanticTextFieldMapper.SEMANTIC_TEXT_SINGLE_FIELD_UPDATE_FIX,
+            SemanticTextFieldMapper.SEMANTIC_TEXT_DELETE_FIX
         );
         );
     }
     }
 }
 }

+ 3 - 3
x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilter.java

@@ -413,8 +413,8 @@ public class ShardBulkInferenceActionFilter implements MappedActionFilter {
          */
          */
         private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(BulkShardRequest bulkShardRequest) {
         private Map<String, List<FieldInferenceRequest>> createFieldInferenceRequests(BulkShardRequest bulkShardRequest) {
             Map<String, List<FieldInferenceRequest>> fieldRequestsMap = new LinkedHashMap<>();
             Map<String, List<FieldInferenceRequest>> fieldRequestsMap = new LinkedHashMap<>();
-            int itemIndex = 0;
-            for (var item : bulkShardRequest.items()) {
+            for (int itemIndex = 0; itemIndex < bulkShardRequest.items().length; itemIndex++) {
+                var item = bulkShardRequest.items()[itemIndex];
                 if (item.getPrimaryResponse() != null) {
                 if (item.getPrimaryResponse() != null) {
                     // item was already aborted/processed by a filter in the chain upstream (e.g. security)
                     // item was already aborted/processed by a filter in the chain upstream (e.g. security)
                     continue;
                     continue;
@@ -441,6 +441,7 @@ public class ShardBulkInferenceActionFilter implements MappedActionFilter {
                     // ignore delete request
                     // ignore delete request
                     continue;
                     continue;
                 }
                 }
+
                 final Map<String, Object> docMap = indexRequest.sourceAsMap();
                 final Map<String, Object> docMap = indexRequest.sourceAsMap();
                 for (var entry : fieldInferenceMap.values()) {
                 for (var entry : fieldInferenceMap.values()) {
                     String field = entry.getName();
                     String field = entry.getName();
@@ -483,7 +484,6 @@ public class ShardBulkInferenceActionFilter implements MappedActionFilter {
                         }
                         }
                     }
                     }
                 }
                 }
-                itemIndex++;
             }
             }
             return fieldRequestsMap;
             return fieldRequestsMap;
         }
         }

+ 1 - 1
x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java

@@ -89,8 +89,8 @@ public class SemanticTextFieldMapper extends FieldMapper implements InferenceFie
     public static final NodeFeature SEMANTIC_TEXT_SEARCH_INFERENCE_ID = new NodeFeature("semantic_text.search_inference_id");
     public static final NodeFeature SEMANTIC_TEXT_SEARCH_INFERENCE_ID = new NodeFeature("semantic_text.search_inference_id");
     public static final NodeFeature SEMANTIC_TEXT_DEFAULT_ELSER_2 = new NodeFeature("semantic_text.default_elser_2");
     public static final NodeFeature SEMANTIC_TEXT_DEFAULT_ELSER_2 = new NodeFeature("semantic_text.default_elser_2");
     public static final NodeFeature SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX = new NodeFeature("semantic_text.in_object_field_fix");
     public static final NodeFeature SEMANTIC_TEXT_IN_OBJECT_FIELD_FIX = new NodeFeature("semantic_text.in_object_field_fix");
-
     public static final NodeFeature SEMANTIC_TEXT_SINGLE_FIELD_UPDATE_FIX = new NodeFeature("semantic_text.single_field_update_fix");
     public static final NodeFeature SEMANTIC_TEXT_SINGLE_FIELD_UPDATE_FIX = new NodeFeature("semantic_text.single_field_update_fix");
+    public static final NodeFeature SEMANTIC_TEXT_DELETE_FIX = new NodeFeature("semantic_text.delete_fix");
 
 
     public static final String CONTENT_TYPE = "semantic_text";
     public static final String CONTENT_TYPE = "semantic_text";
     public static final String DEFAULT_ELSER_2_INFERENCE_ID = DEFAULT_ELSER_ID;
     public static final String DEFAULT_ELSER_2_INFERENCE_ID = DEFAULT_ELSER_ID;

+ 52 - 0
x-pack/plugin/inference/src/yamlRestTest/resources/rest-api-spec/test/inference/30_semantic_text_inference.yml

@@ -624,3 +624,55 @@ setup:
   - match: { _source.level_1.dense_field.text: "another inference test" }
   - match: { _source.level_1.dense_field.text: "another inference test" }
   - exists: _source.level_1.dense_field.inference.chunks.0.embeddings
   - exists: _source.level_1.dense_field.inference.chunks.0.embeddings
   - match: { _source.level_1.dense_field.inference.chunks.0.text: "another inference test" }
   - match: { _source.level_1.dense_field.inference.chunks.0.text: "another inference test" }
+
+---
+"Deletes on bulk operation":
+  - requires:
+      cluster_features: semantic_text.delete_fix
+      reason: Delete operations are properly applied when subsequent operations include a semantic text field.
+
+  - do:
+      bulk:
+        index: test-index
+        refresh: true
+        body: |
+          {"index":{"_id": "1"}}
+          {"dense_field": ["you know, for testing", "now with chunks"]}
+          {"index":{"_id": "2"}}
+          {"dense_field": ["some more tests", "that include chunks"]}
+
+  - do:
+      search:
+        index: test-index
+        body:
+          query:
+            semantic:
+              field: dense_field
+              query: "you know, for testing"
+
+  - match: { hits.total.value: 2 }
+  - match: { hits.total.relation: eq }
+  - match: { hits.hits.0._source.dense_field.text: ["you know, for testing", "now with chunks"] }
+  - match: { hits.hits.1._source.dense_field.text: ["some more tests", "that include chunks"] }
+
+  - do:
+      bulk:
+        index: test-index
+        refresh: true
+        body: |
+          {"delete":{ "_id": "2"}}
+          {"update":{"_id": "1"}}
+          {"doc":{"dense_field": "updated text"}}
+
+  - do:
+      search:
+        index: test-index
+        body:
+          query:
+            semantic:
+              field: dense_field
+              query: "you know, for testing"
+
+  - match: { hits.total.value: 1 }
+  - match: { hits.total.relation: eq }
+  - match: { hits.hits.0._source.dense_field.text: "updated text" }