Browse Source

Fix retry_on_conflict parameter in update API to not retry indefinitely (#96262)

Felix Barnsteiner 2 years ago
parent
commit
22d2709bb6

+ 5 - 0
docs/changelog/96262.yaml

@@ -0,0 +1,5 @@
+pr: 96262
+summary: Fix `retry_on_conflict` parameter in update API to not retry indefinitely
+area: CRUD
+type: bug
+issues: []

+ 18 - 7
server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java

@@ -58,7 +58,7 @@ class BulkPrimaryExecutionContext {
     private ItemProcessingState currentItemState;
     private DocWriteRequest<?> requestToExecute;
     private BulkItemResponse executionResult;
-    private int retryCounter;
+    private int updateRetryCounter;
 
     BulkPrimaryExecutionContext(BulkShardRequest request, IndexShard primary) {
         this.request = request;
@@ -84,7 +84,7 @@ class BulkPrimaryExecutionContext {
             : "moving to next but current item wasn't completed (state: " + currentItemState + ")";
         currentItemState = ItemProcessingState.INITIAL;
         currentIndex = findNextNonAborted(currentIndex + 1);
-        retryCounter = 0;
+        updateRetryCounter = 0;
         requestToExecute = null;
         executionResult = null;
         assert assertInvariants(ItemProcessingState.INITIAL);
@@ -105,9 +105,9 @@ class BulkPrimaryExecutionContext {
         return executionResult;
     }
 
-    /** returns the number of times the current operation has been retried */
-    public int getRetryCounter() {
-        return retryCounter;
+    /** returns the number of times the current update operation has been retried */
+    public int getUpdateRetryCounter() {
+        return updateRetryCounter;
     }
 
     /** returns true if the request needs to wait for a mapping update to arrive from the master */
@@ -178,8 +178,19 @@ class BulkPrimaryExecutionContext {
         assert assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE);
     }
 
+    public void resetForUpdateRetry() {
+        assert assertInvariants(ItemProcessingState.EXECUTED);
+        updateRetryCounter++;
+        resetForExecutionRetry();
+    }
+
+    public void resetForMappingUpdateRetry() {
+        assert assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE);
+        resetForExecutionRetry();
+    }
+
     /** resets the current item state, prepare for a new execution */
-    public void resetForExecutionForRetry() {
+    private void resetForExecutionRetry() {
         assert assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE, ItemProcessingState.EXECUTED);
         currentItemState = ItemProcessingState.INITIAL;
         requestToExecute = null;
@@ -292,7 +303,7 @@ class BulkPrimaryExecutionContext {
         assert Arrays.asList(expectedCurrentState).contains(currentItemState)
             : "expected current state [" + currentItemState + "] to be one of " + Arrays.toString(expectedCurrentState);
         assert currentIndex >= 0 : currentIndex;
-        assert retryCounter >= 0 : retryCounter;
+        assert updateRetryCounter >= 0 : updateRetryCounter;
         switch (currentItemState) {
             case INITIAL:
                 assert requestToExecute == null : requestToExecute;

+ 3 - 3
server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

@@ -385,7 +385,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
                         @Override
                         public void onResponse(Void v) {
                             assert context.requiresWaitingForMappingUpdate();
-                            context.resetForExecutionForRetry();
+                            context.resetForMappingUpdateRetry();
                         }
 
                         @Override
@@ -425,8 +425,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
         if (isUpdate
             && isFailed
             && isConflictException(executionResult.getFailure().getCause())
-            && context.getRetryCounter() < ((UpdateRequest) docWriteRequest).retryOnConflict()) {
-            context.resetForExecutionForRetry();
+            && context.getUpdateRetryCounter() < ((UpdateRequest) docWriteRequest).retryOnConflict()) {
+            context.resetForUpdateRetry();
             return;
         }
         final BulkItemResponse response;

+ 16 - 10
server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java

@@ -277,6 +277,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
         }, listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER);
         assertTrue(context.isInitial());
         assertTrue(context.hasMoreOperationsToExecute());
+        assertThat(context.getUpdateRetryCounter(), equalTo(0));
 
         assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1));
 
@@ -572,7 +573,9 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
 
     public void testUpdateRequestWithConflictFailure() throws Exception {
         IndexSettings indexSettings = new IndexSettings(indexMetadata(), Settings.EMPTY);
-        DocWriteRequest<UpdateRequest> writeRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value");
+        int retries = randomInt(4);
+        DocWriteRequest<UpdateRequest> writeRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value")
+            .retryOnConflict(retries);
         BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest);
 
         IndexRequest updateResponse = new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value");
@@ -599,16 +602,19 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
         BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
 
         randomlySetIgnoredPrimaryResponse(primaryRequest);
-
         BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
-        TransportShardBulkAction.executeBulkItemRequest(
-            context,
-            updateHelper,
-            threadPool::absoluteTimeInMillis,
-            new NoopMappingUpdatePerformer(),
-            listener -> listener.onResponse(null),
-            ASSERTING_DONE_LISTENER
-        );
+
+        for (int i = 0; i < retries + 1; i++) {
+            assertTrue(context.hasMoreOperationsToExecute());
+            TransportShardBulkAction.executeBulkItemRequest(
+                context,
+                updateHelper,
+                threadPool::absoluteTimeInMillis,
+                new NoopMappingUpdatePerformer(),
+                listener -> listener.onResponse(null),
+                ASSERTING_DONE_LISTENER
+            );
+        }
         assertFalse(context.hasMoreOperationsToExecute());
 
         assertNull(context.getLocationToSync());