Browse Source

Wait for dynamic mapping update more precisely (#110187)

We ran into a situation where dynamic mapping updates where retried in a
fairly hot loop. The problem that triggered this was waiting for any cluster state
update in this logic. This is mostly fine but adds a lot of overhead for
retries when there's other actions running at a higher priority than the
mapping update. Lets make it specific so that we at least wait for there
to be any mapping and for its version to be different from the version
that made us request a mapping update in the first place.
Also added a breakout in case the index got concurrently deleted so we
don't run out the clock in that case.
Armin Braun 1 year ago
parent
commit
7e81229b7f

+ 11 - 6
server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

@@ -70,6 +70,7 @@ import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
+import java.util.function.ObjLongConsumer;
 
 import static org.elasticsearch.core.Strings.format;
 
@@ -150,7 +151,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
             assert update != null;
             assert shardId != null;
             mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), update, mappingListener);
-        }, mappingUpdateListener -> observer.waitForNextChange(new ClusterStateObserver.Listener() {
+        }, (mappingUpdateListener, initialMappingVersion) -> observer.waitForNextChange(new ClusterStateObserver.Listener() {
             @Override
             public void onNewClusterState(ClusterState state) {
                 mappingUpdateListener.onResponse(null);
@@ -165,6 +166,9 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
             public void onTimeout(TimeValue timeout) {
                 mappingUpdateListener.onFailure(new MapperException("timed out while waiting for a dynamic mapping update"));
             }
+        }, clusterState -> {
+            var indexMetadata = clusterState.metadata().index(primary.shardId().getIndex());
+            return indexMetadata == null || (indexMetadata.mapping() != null && indexMetadata.getMappingVersion() != initialMappingVersion);
         }), listener, executor(primary), postWriteRefresh, postWriteAction, documentParsingProvider);
     }
 
@@ -184,7 +188,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
         UpdateHelper updateHelper,
         LongSupplier nowInMillisSupplier,
         MappingUpdatePerformer mappingUpdater,
-        Consumer<ActionListener<Void>> waitForMappingUpdate,
+        ObjLongConsumer<ActionListener<Void>> waitForMappingUpdate,
         ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener,
         Executor executor
     ) {
@@ -209,7 +213,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
         UpdateHelper updateHelper,
         LongSupplier nowInMillisSupplier,
         MappingUpdatePerformer mappingUpdater,
-        Consumer<ActionListener<Void>> waitForMappingUpdate,
+        ObjLongConsumer<ActionListener<Void>> waitForMappingUpdate,
         ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener,
         Executor executor,
         @Nullable PostWriteRefresh postWriteRefresh,
@@ -308,7 +312,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
         UpdateHelper updateHelper,
         LongSupplier nowInMillisSupplier,
         MappingUpdatePerformer mappingUpdater,
-        Consumer<ActionListener<Void>> waitForMappingUpdate,
+        ObjLongConsumer<ActionListener<Void>> waitForMappingUpdate,
         ActionListener<Void> itemDoneListener,
         DocumentParsingProvider documentParsingProvider
     ) throws Exception {
@@ -398,7 +402,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
     private static boolean handleMappingUpdateRequired(
         BulkPrimaryExecutionContext context,
         MappingUpdatePerformer mappingUpdater,
-        Consumer<ActionListener<Void>> waitForMappingUpdate,
+        ObjLongConsumer<ActionListener<Void>> waitForMappingUpdate,
         ActionListener<Void> itemDoneListener,
         IndexShard primary,
         Engine.Result result,
@@ -406,6 +410,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
         UpdateHelper.Result updateResult
     ) {
         final var mapperService = primary.mapperService();
+        final long initialMappingVersion = mapperService.mappingVersion();
         try {
             CompressedXContent mergedSource = mapperService.merge(
                 MapperService.SINGLE_MAPPING_NAME,
@@ -439,7 +444,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
                     public void onFailure(Exception e) {
                         context.failOnMappingUpdate(e);
                     }
-                }, () -> itemDoneListener.onResponse(null)));
+                }, () -> itemDoneListener.onResponse(null)), initialMappingVersion);
             }
 
             @Override

+ 21 - 21
server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java

@@ -120,7 +120,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             null,
             threadPool::absoluteTimeInMillis,
             new NoopMappingUpdatePerformer(),
-            listener -> {},
+            (listener, mappingVersion) -> {},
             ASSERTING_DONE_LISTENER,
             DocumentParsingProvider.EMPTY_INSTANCE
         );
@@ -152,7 +152,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             null,
             threadPool::absoluteTimeInMillis,
             new ThrowingMappingUpdatePerformer(new RuntimeException("fail")),
-            listener -> {},
+            (listener, mappingVersion) -> {},
             ASSERTING_DONE_LISTENER,
             DocumentParsingProvider.EMPTY_INSTANCE
         );
@@ -209,7 +209,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             null,
             threadPool::absoluteTimeInMillis,
             new NoopMappingUpdatePerformer(),
-            listener -> {},
+            (listener, mappingVersion) -> {},
             ActionListener.runAfter(ActionTestUtils.assertNoFailureListener(result -> {
                 // since at least 1 item passed, the tran log location should exist,
                 assertThat(((WritePrimaryResult<BulkShardRequest, BulkShardResponse>) result).location, notNullValue());
@@ -285,7 +285,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             assertNotNull(update);
             updateCalled.incrementAndGet();
             listener.onResponse(null);
-        }, listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER, DocumentParsingProvider.EMPTY_INSTANCE);
+        }, (listener, mappingVersion) -> listener.onResponse(null), ASSERTING_DONE_LISTENER, DocumentParsingProvider.EMPTY_INSTANCE);
         assertTrue(context.isInitial());
         assertTrue(context.hasMoreOperationsToExecute());
         assertThat(context.getUpdateRetryCounter(), equalTo(0));
@@ -304,7 +304,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             null,
             threadPool::absoluteTimeInMillis,
             (update, shardId, listener) -> fail("should not have had to update the mappings"),
-            listener -> {},
+            (listener, mappingVersion) -> {},
             ASSERTING_DONE_LISTENER,
             DocumentParsingProvider.EMPTY_INSTANCE
         );
@@ -345,7 +345,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             null,
             threadPool::absoluteTimeInMillis,
             errorOnWait == false ? new ThrowingMappingUpdatePerformer(err) : new NoopMappingUpdatePerformer(),
-            errorOnWait ? listener -> listener.onFailure(err) : listener -> listener.onResponse(null),
+            errorOnWait ? (listener, mappingVersion) -> listener.onFailure(err) : (listener, mappingVersion) -> listener.onResponse(null),
             new LatchedActionListener<>(new ActionListener<Void>() {
                 @Override
                 public void onResponse(Void aVoid) {}
@@ -398,7 +398,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             null,
             threadPool::absoluteTimeInMillis,
             new NoopMappingUpdatePerformer(),
-            listener -> {},
+            (listener, mappingVersion) -> {},
             ASSERTING_DONE_LISTENER,
             DocumentParsingProvider.EMPTY_INSTANCE
         );
@@ -446,7 +446,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             null,
             threadPool::absoluteTimeInMillis,
             new NoopMappingUpdatePerformer(),
-            listener -> {},
+            (listener, mappingVersion) -> {},
             ASSERTING_DONE_LISTENER,
             DocumentParsingProvider.EMPTY_INSTANCE
         );
@@ -510,7 +510,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             updateHelper,
             threadPool::absoluteTimeInMillis,
             new NoopMappingUpdatePerformer(),
-            listener -> {},
+            (listener, mappingVersion) -> {},
             ASSERTING_DONE_LISTENER,
             DocumentParsingProvider.EMPTY_INSTANCE
         );
@@ -566,7 +566,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             updateHelper,
             threadPool::absoluteTimeInMillis,
             new NoopMappingUpdatePerformer(),
-            listener -> {},
+            (listener, mappingVersion) -> {},
             ASSERTING_DONE_LISTENER,
             DocumentParsingProvider.EMPTY_INSTANCE
         );
@@ -631,7 +631,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
                 updateHelper,
                 threadPool::absoluteTimeInMillis,
                 new NoopMappingUpdatePerformer(),
-                listener -> listener.onResponse(null),
+                (listener, mappingVersion) -> listener.onResponse(null),
                 ASSERTING_DONE_LISTENER,
                 documentParsingProvider
             );
@@ -697,7 +697,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             updateHelper,
             threadPool::absoluteTimeInMillis,
             new NoopMappingUpdatePerformer(),
-            listener -> {},
+            (listener, mappingVersion) -> {},
             ASSERTING_DONE_LISTENER,
             documentParsingProvider
         );
@@ -756,7 +756,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             updateHelper,
             threadPool::absoluteTimeInMillis,
             new NoopMappingUpdatePerformer(),
-            listener -> listener.onResponse(null),
+            (listener, mappingVersion) -> listener.onResponse(null),
             ASSERTING_DONE_LISTENER,
             DocumentParsingProvider.EMPTY_INSTANCE
         );
@@ -794,7 +794,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             updateHelper,
             threadPool::absoluteTimeInMillis,
             new NoopMappingUpdatePerformer(),
-            listener -> {},
+            (listener, mappingVersion) -> {},
             ASSERTING_DONE_LISTENER,
             DocumentParsingProvider.EMPTY_INSTANCE
         );
@@ -834,7 +834,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
                     null,
                     threadPool::absoluteTimeInMillis,
                     new NoopMappingUpdatePerformer(),
-                    listener -> {},
+                    (listener, mappingVersion) -> {},
                     ASSERTING_DONE_LISTENER,
                     DocumentParsingProvider.EMPTY_INSTANCE
                 );
@@ -937,7 +937,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             updateHelper,
             threadPool::absoluteTimeInMillis,
             new NoopMappingUpdatePerformer(),
-            listener -> listener.onResponse(null),
+            (listener, mappingVersion) -> listener.onResponse(null),
             new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> {
                 assertThat(((WritePrimaryResult<BulkShardRequest, BulkShardResponse>) result).location, equalTo(resultLocation));
                 BulkItemResponse primaryResponse = result.replicaRequest().items()[0].getPrimaryResponse();
@@ -1034,7 +1034,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
                         throw new IllegalStateException(e);
                     }
                 },
-                listener -> listener.onResponse(null),
+                (listener, mappingVersion) -> listener.onResponse(null),
                 new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result ->
                 // Assert that we still need to fsync the location that was successfully written
                 assertThat(((WritePrimaryResult<BulkShardRequest, BulkShardResponse>) result).location, equalTo(resultLocation1))), latch),
@@ -1096,7 +1096,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
                     listener.onResponse(null);
                 }
             },
-            listener -> listener.onFailure(new IllegalStateException("no failure expected")),
+            (listener, mappingVersion) -> listener.onFailure(new IllegalStateException("no failure expected")),
             new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> {
                 try {
                     BulkStats bulkStats = shard.bulkStats();
@@ -1156,7 +1156,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
                 updateHelper,
                 threadPool::absoluteTimeInMillis,
                 (update, shardId, listener) -> fail("the master should not be contacted as the operation yielded a noop mapping update"),
-                listener -> listener.onResponse(null),
+                (listener, mappingVersion) -> listener.onResponse(null),
                 ActionTestUtils.assertNoFailureListener(result -> {}),
                 threadPool.executor(Names.WRITE)
             )
@@ -1200,7 +1200,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
         when(mapperService.merge(any(), any(CompressedXContent.class), any())).thenReturn(documentMapper);
         // on the second invocation, the mapping version is incremented
         // so that the second mapping update attempt doesn't trigger the infinite loop prevention
-        when(mapperService.mappingVersion()).thenReturn(0L, 1L);
+        when(mapperService.mappingVersion()).thenReturn(0L, 0L, 1L);
 
         UpdateHelper updateHelper = mock(UpdateHelper.class);
         when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(
@@ -1223,7 +1223,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             updateHelper,
             threadPool::absoluteTimeInMillis,
             (update, shardId, listener) -> fail("the master should not be contacted as the operation yielded a noop mapping update"),
-            listener -> listener.onFailure(new IllegalStateException("no failure expected")),
+            (listener, mappingVersion) -> listener.onFailure(new IllegalStateException("no failure expected")),
             new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> {
                 BulkItemResponse primaryResponse = result.replicaRequest().items()[0].getPrimaryResponse();
                 assertFalse(primaryResponse.isFailed());

+ 2 - 2
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java

@@ -203,7 +203,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
-import java.util.function.Consumer;
+import java.util.function.ObjLongConsumer;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 
@@ -1571,7 +1571,7 @@ public class AuthorizationServiceTests extends ESTestCase {
         authorize(authentication, TransportShardBulkAction.ACTION_NAME, request);
 
         MappingUpdatePerformer mappingUpdater = (m, s, l) -> l.onResponse(null);
-        Consumer<ActionListener<Void>> waitForMappingUpdate = l -> l.onResponse(null);
+        ObjLongConsumer<ActionListener<Void>> waitForMappingUpdate = (l, mappingVersion) -> l.onResponse(null);
         PlainActionFuture<TransportReplicationAction.PrimaryResult<BulkShardRequest, BulkShardResponse>> future = new PlainActionFuture<>();
         IndexShard indexShard = mock(IndexShard.class);
         when(indexShard.getBulkOperationListener()).thenReturn(new BulkOperationListener() {