瀏覽代碼

Allow wait for future checkpoint when timeout is set (#96145)

Enables waiting for a future checkpoint when a timeout <= the default wait_for_checkpoint timeout is given.
Armin Braun 2 年之前
父節點
當前提交
4192fdc1b6

+ 3 - 2
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -3859,9 +3859,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
      * Add a listener for refreshes.
      *
      * @param checkpoint the seqNo checkpoint to listen for
+     * @param allowUnIssuedSequenceNumber whether to allow waiting for checkpoints larger than the processed local checkpoint
      * @param listener for the refresh.
      */
-    public void addRefreshListener(long checkpoint, ActionListener<Void> listener) {
+    public void addRefreshListener(long checkpoint, boolean allowUnIssuedSequenceNumber, ActionListener<Void> listener) {
         final boolean readAllowed;
         if (isReadAllowed()) {
             readAllowed = true;
@@ -3874,7 +3875,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             }
         }
         if (readAllowed) {
-            refreshListeners.addOrNotify(checkpoint, listener);
+            refreshListeners.addOrNotify(checkpoint, allowUnIssuedSequenceNumber, listener);
         } else {
             // we're not yet ready for reads, fail to notify client
             listener.onFailure(new IllegalIndexShardStateException(shardId, state, "Read not allowed on IndexShard"));

+ 18 - 15
server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java

@@ -168,31 +168,34 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
 
     /**
      * Add a listener for refreshes, calling it immediately if the location is already visible. If this runs out of listener slots then it
-     * fails the listener immediately. The checkpoint cannot be greater than the processed local checkpoint. This method does not respect
-     * the forceRefreshes state. It will NEVER force a refresh on the calling thread. Instead, it will simply add listeners or rejected
-     * them if too many listeners are already waiting.
+     * fails the listener immediately. The checkpoint cannot be greater than the processed local checkpoint if
+     * {@code allowUnIssuedSequenceNumber} is {@code false}. This method does not respect the forceRefreshes state. It will NEVER force a
+     * refresh on the calling thread. Instead, it will simply add listeners or rejected them if too many listeners are already waiting.
      *
      * @param checkpoint the seqNo checkpoint to listen for
+     * @param allowUnIssuedSequenceNumber if true allow listening for checkpoint newer than the processed local checkpoint
      * @param listener for the refresh.
      * @return did we call the listener (true) or register the listener to call later (false)?
      */
-    public boolean addOrNotify(long checkpoint, ActionListener<Void> listener) {
+    public boolean addOrNotify(long checkpoint, boolean allowUnIssuedSequenceNumber, ActionListener<Void> listener) {
         assert checkpoint >= SequenceNumbers.NO_OPS_PERFORMED;
         if (checkpoint <= lastRefreshedCheckpoint) {
             listener.onResponse(null);
             return true;
         }
-        long maxIssuedSequenceNumber = maxIssuedSeqNoSupplier.getAsLong();
-        if (checkpoint > maxIssuedSequenceNumber) {
-            IllegalArgumentException e = new IllegalArgumentException(
-                "Cannot wait for unissued seqNo checkpoint [wait_for_checkpoint="
-                    + checkpoint
-                    + ", max_issued_seqNo="
-                    + maxIssuedSequenceNumber
-                    + "]"
-            );
-            listener.onFailure(e);
-            return true;
+        if (allowUnIssuedSequenceNumber == false) {
+            long maxIssuedSequenceNumber = maxIssuedSeqNoSupplier.getAsLong();
+            if (checkpoint > maxIssuedSequenceNumber) {
+                IllegalArgumentException e = new IllegalArgumentException(
+                    "Cannot wait for unissued seqNo checkpoint [wait_for_checkpoint="
+                        + checkpoint
+                        + ", max_issued_seqNo="
+                        + maxIssuedSequenceNumber
+                        + "]"
+                );
+                listener.onFailure(e);
+                return true;
+            }
         }
 
         synchronized (this) {

+ 9 - 1
server/src/main/java/org/elasticsearch/search/SearchService.java

@@ -515,6 +515,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
             }
 
             final AtomicBoolean isDone = new AtomicBoolean(false);
+            // TODO: this logic should be improved, the timeout should be handled in a way that removes the listener from the logic in the
+            // index shard on timeout so that a timed-out listener does not use up any listener slots.
             final TimeValue timeout = request.getWaitForCheckpointsTimeout();
             final Scheduler.ScheduledCancellable timeoutTask = NO_TIMEOUT.equals(timeout) ? null : threadPool.schedule(() -> {
                 if (isDone.compareAndSet(false, true)) {
@@ -523,7 +525,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
                     );
                 }
             }, timeout, Names.SAME);
-            shard.addRefreshListener(waitForCheckpoint, new ActionListener<>() {
+
+            // allow waiting for not-yet-issued sequence number if shard isn't promotable to primary and the timeout is less than or equal
+            // to 30s
+            final boolean allowWaitForNotYetIssued = shard.routingEntry().isPromotableToPrimary() == false
+                && NO_TIMEOUT.equals(timeout) == false
+                && timeout.getSeconds() <= 30L;
+            shard.addRefreshListener(waitForCheckpoint, allowWaitForNotYetIssued, new ActionListener<>() {
                 @Override
                 public void onResponse(Void unused) {
                     // We must check that the sequence number is smaller than or equal to the global checkpoint. If it is not,

+ 3 - 3
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -2954,7 +2954,7 @@ public class IndexShardTests extends IndexShardTestCase {
             });
 
             PlainActionFuture<Void> listener = PlainActionFuture.newFuture();
-            shard.addRefreshListener(10, listener);
+            shard.addRefreshListener(10, randomBoolean(), listener);
             expectThrows(IllegalIndexShardStateException.class, listener::actionGet);
         };
         IndexShard replica = newShard(primary.shardId(), false, "n2", metadata, null);
@@ -3801,7 +3801,7 @@ public class IndexShardTests extends IndexShardTestCase {
         if (randomBoolean()) {
             primary.addRefreshListener(doc.getTranslogLocation(), r -> latch.countDown());
         } else {
-            primary.addRefreshListener(doc.getSeqNo(), new ActionListener<Void>() {
+            primary.addRefreshListener(doc.getSeqNo(), randomBoolean(), new ActionListener<Void>() {
                 @Override
                 public void onResponse(Void unused) {
                     latch.countDown();
@@ -3827,7 +3827,7 @@ public class IndexShardTests extends IndexShardTestCase {
         if (randomBoolean()) {
             primary.addRefreshListener(doc.getTranslogLocation(), r -> latch1.countDown());
         } else {
-            primary.addRefreshListener(doc.getSeqNo(), ActionListener.running(latch1::countDown));
+            primary.addRefreshListener(doc.getSeqNo(), randomBoolean(), ActionListener.running(latch1::countDown));
         }
         assertEquals(1, latch1.getCount());
         assertTrue(primary.getEngine().refreshNeeded());

+ 28 - 17
server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java

@@ -178,7 +178,7 @@ public class RefreshListenersTests extends ESTestCase {
         assertEquals(1, listeners.pendingCount());
 
         TestSeqNoListener seqNoListener = new TestSeqNoListener();
-        assertFalse(listeners.addOrNotify(index.getSeqNo(), seqNoListener));
+        assertFalse(listeners.addOrNotify(index.getSeqNo(), randomBoolean(), seqNoListener));
         assertEquals(2, listeners.pendingCount());
         engine.refresh("I said so");
         assertFalse(listener.forcedRefresh.get());
@@ -201,7 +201,7 @@ public class RefreshListenersTests extends ESTestCase {
         assertTrue(listeners.addOrNotify(index.getTranslogLocation(), listener));
         assertFalse(listener.forcedRefresh.get());
         TestSeqNoListener seqNoListener = new TestSeqNoListener();
-        assertTrue(listeners.addOrNotify(index.getSeqNo(), seqNoListener));
+        assertTrue(listeners.addOrNotify(index.getSeqNo(), randomBoolean(), seqNoListener));
         assertTrue(seqNoListener.isDone.get());
         listener.assertNoError();
         assertEquals(0, listeners.pendingCount());
@@ -217,7 +217,7 @@ public class RefreshListenersTests extends ESTestCase {
                 assertEquals("foobar", threadPool.getThreadContext().getHeader("test"));
                 latch.countDown();
             }));
-            assertFalse(listeners.addOrNotify(index.getSeqNo(), new ActionListener<>() {
+            assertFalse(listeners.addOrNotify(index.getSeqNo(), randomBoolean(), new ActionListener<>() {
                 @Override
                 public void onResponse(Void unused) {
                     assertEquals("foobar", threadPool.getThreadContext().getHeader("test"));
@@ -252,7 +252,7 @@ public class RefreshListenersTests extends ESTestCase {
             } else {
                 TestSeqNoListener listener = new TestSeqNoListener();
                 nonSeqNoLocationListeners.add(listener);
-                listeners.addOrNotify(index.getSeqNo(), listener);
+                listeners.addOrNotify(index.getSeqNo(), randomBoolean(), listener);
             }
             assertTrue(listeners.refreshNeeded());
         }
@@ -268,7 +268,7 @@ public class RefreshListenersTests extends ESTestCase {
 
         // Checkpoint version produces error if too many listeners registered
         TestSeqNoListener rejectedListener = new TestSeqNoListener();
-        listeners.addOrNotify(index.getSeqNo(), rejectedListener);
+        listeners.addOrNotify(index.getSeqNo(), randomBoolean(), rejectedListener);
         Exception error = rejectedListener.error;
         assertThat(error, instanceOf(IllegalStateException.class));
 
@@ -303,7 +303,7 @@ public class RefreshListenersTests extends ESTestCase {
             assertNull(listener.forcedRefresh.get());
 
             TestSeqNoListener seqNoListener = new TestSeqNoListener();
-            assertFalse(listeners.addOrNotify(unrefreshedOperation.getSeqNo(), seqNoListener));
+            assertFalse(listeners.addOrNotify(unrefreshedOperation.getSeqNo(), randomBoolean(), seqNoListener));
             assertFalse(seqNoListener.isDone.get());
 
             listeners.close();
@@ -321,7 +321,7 @@ public class RefreshListenersTests extends ESTestCase {
             assertFalse(listener.forcedRefresh.get());
             listener.assertNoError();
             TestSeqNoListener seqNoListener = new TestSeqNoListener();
-            assertTrue(listeners.addOrNotify(refreshedOperation.getSeqNo(), seqNoListener));
+            assertTrue(listeners.addOrNotify(refreshedOperation.getSeqNo(), randomBoolean(), seqNoListener));
             assertTrue(seqNoListener.isDone.get());
 
             assertFalse(listeners.refreshNeeded());
@@ -339,7 +339,7 @@ public class RefreshListenersTests extends ESTestCase {
 
             // But adding a seqNo listener to a non-refreshed location will fail listener
             TestSeqNoListener seqNoListener = new TestSeqNoListener();
-            listeners.addOrNotify(unrefreshedOperation.getSeqNo(), seqNoListener);
+            listeners.addOrNotify(unrefreshedOperation.getSeqNo(), randomBoolean(), seqNoListener);
             assertEquals("can't wait for refresh on a closed index", seqNoListener.error.getMessage());
 
             assertFalse(listeners.refreshNeeded());
@@ -376,7 +376,7 @@ public class RefreshListenersTests extends ESTestCase {
                     }
                 } else {
                     TestSeqNoListener seqNoListener = new TestSeqNoListener();
-                    immediate = listeners.addOrNotify(index.getSeqNo(), seqNoListener);
+                    immediate = listeners.addOrNotify(index.getSeqNo(), randomBoolean(), seqNoListener);
                     doneSupplier = seqNoListener.isDone::get;
                 }
                 if (immediate == false) {
@@ -415,7 +415,7 @@ public class RefreshListenersTests extends ESTestCase {
                         listeners.addOrNotify(index.getTranslogLocation(), listener);
                         TestSeqNoListener seqNoListener = new TestSeqNoListener();
                         long processedLocalCheckpoint = engine.getProcessedLocalCheckpoint();
-                        listeners.addOrNotify(processedLocalCheckpoint, seqNoListener);
+                        listeners.addOrNotify(processedLocalCheckpoint, randomBoolean(), seqNoListener);
                         assertBusy(() -> {
                             assertNotNull("location listener never called", listener.forcedRefresh.get());
                             assertTrue("seqNo listener never called", seqNoListener.isDone.get() || seqNoListener.error != null);
@@ -461,7 +461,7 @@ public class RefreshListenersTests extends ESTestCase {
         TestLocationListener listener = new TestLocationListener();
         assertFalse(listeners.addOrNotify(index("1").getTranslogLocation(), listener));
         TestSeqNoListener seqNoListener = new TestSeqNoListener();
-        assertFalse(listeners.addOrNotify(index("1").getSeqNo(), seqNoListener));
+        assertFalse(listeners.addOrNotify(index("1").getSeqNo(), randomBoolean(), seqNoListener));
         engine.refresh("I said so");
         assertFalse(listener.forcedRefresh.get());
         listener.assertNoError();
@@ -474,7 +474,7 @@ public class RefreshListenersTests extends ESTestCase {
             listener.assertNoError();
             seqNoListener = new TestSeqNoListener();
             // SeqNo listeners are not forced
-            assertFalse(listeners.addOrNotify(index("1").getSeqNo(), seqNoListener));
+            assertFalse(listeners.addOrNotify(index("1").getSeqNo(), randomBoolean(), seqNoListener));
             assertFalse(seqNoListener.isDone.get());
             assertEquals(1, listeners.pendingCount());
 
@@ -485,7 +485,7 @@ public class RefreshListenersTests extends ESTestCase {
                 listener.assertNoError();
                 seqNoListener = new TestSeqNoListener();
                 // SeqNo listeners are not forced
-                assertFalse(listeners.addOrNotify(index("1").getSeqNo(), seqNoListener));
+                assertFalse(listeners.addOrNotify(index("1").getSeqNo(), randomBoolean(), seqNoListener));
                 assertFalse(seqNoListener.isDone.get());
                 assertEquals(1, listeners.pendingCount());
             }
@@ -496,7 +496,7 @@ public class RefreshListenersTests extends ESTestCase {
             listener.assertNoError();
             seqNoListener = new TestSeqNoListener();
             // SeqNo listeners are not forced
-            assertFalse(listeners.addOrNotify(index("1").getSeqNo(), seqNoListener));
+            assertFalse(listeners.addOrNotify(index("1").getSeqNo(), randomBoolean(), seqNoListener));
             assertFalse(seqNoListener.isDone.get());
             assertEquals(1, listeners.pendingCount());
         }
@@ -506,12 +506,12 @@ public class RefreshListenersTests extends ESTestCase {
         if (listeners.pendingCount() == maxListeners) {
             // Rejected
             TestSeqNoListener rejected = new TestSeqNoListener();
-            assertTrue(listeners.addOrNotify(index("1").getSeqNo(), rejected));
+            assertTrue(listeners.addOrNotify(index("1").getSeqNo(), randomBoolean(), rejected));
             assertNotNull(rejected.error);
             expectedPending = 2;
         } else {
             TestSeqNoListener acceptedListener = new TestSeqNoListener();
-            assertFalse(listeners.addOrNotify(index("1").getSeqNo(), acceptedListener));
+            assertFalse(listeners.addOrNotify(index("1").getSeqNo(), randomBoolean(), acceptedListener));
             assertFalse(acceptedListener.isDone.get());
             assertNull(acceptedListener.error);
             expectedPending = 3;
@@ -523,7 +523,7 @@ public class RefreshListenersTests extends ESTestCase {
         assertEquals(0, listeners.pendingCount());
         TestSeqNoListener seqNoListener = new TestSeqNoListener();
         long issued = index("1").getSeqNo();
-        assertTrue(listeners.addOrNotify(issued + 1, seqNoListener));
+        assertTrue(listeners.addOrNotify(issued + 1, false, seqNoListener));
         assertThat(seqNoListener.error, instanceOf(IllegalArgumentException.class));
         String message = "Cannot wait for unissued seqNo checkpoint [wait_for_checkpoint="
             + (issued + 1)
@@ -533,6 +533,17 @@ public class RefreshListenersTests extends ESTestCase {
         assertThat(seqNoListener.error.getMessage(), equalTo(message));
     }
 
+    public void testSkipSequenceNumberMustBeIssuedCheck() throws Exception {
+        assertEquals(0, listeners.pendingCount());
+        TestSeqNoListener seqNoListener = new TestSeqNoListener();
+        long issued = index("1").getSeqNo();
+        assertFalse(listeners.addOrNotify(issued + 1, true, seqNoListener));
+        index("2");
+        assertFalse(seqNoListener.isDone.get());
+        engine.refresh("test");
+        assertTrue(seqNoListener.isDone.get());
+    }
+
     private Engine.IndexResult index(String id) throws IOException {
         return index(id, "test");
     }