Pārlūkot izejas kodu

Remove Test-Only waitForProcessedOpsToComplete from LocalCheckpointTracker (#70070)

We only used this method in tests and it's somewhat needless to have a potentially
slow `notifyAll` in the hot path for assertions only when we can just busy assert in tests instead.

closes #69963
Armin Braun 4 gadi atpakaļ
vecāks
revīzija
5697643c3e

+ 20 - 41
server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java

@@ -9,7 +9,6 @@
 package org.elasticsearch.index.seqno;
 
 import com.carrotsearch.hppc.LongObjectHashMap;
-import org.elasticsearch.common.SuppressForbidden;
 
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -162,20 +161,6 @@ public class LocalCheckpointTracker {
         return new SeqNoStats(getMaxSeqNo(), getPersistedCheckpoint(), globalCheckpoint);
     }
 
-    /**
-     * Waits for all operations up to the provided sequence number to complete.
-     *
-     * @param seqNo the sequence number that the checkpoint must advance to before this method returns
-     * @throws InterruptedException if the thread was interrupted while blocking on the condition
-     */
-    @SuppressForbidden(reason = "Object#wait")
-    public synchronized void waitForProcessedOpsToComplete(final long seqNo) throws InterruptedException {
-        while (processedCheckpoint.get() < seqNo) {
-            // notified by updateCheckpoint
-            this.wait();
-        }
-    }
-
     /**
      * Checks if the given sequence number was marked as processed in this tracker.
      */
@@ -203,37 +188,31 @@ public class LocalCheckpointTracker {
      * Moves the checkpoint to the last consecutively processed sequence number. This method assumes that the sequence number
      * following the current checkpoint is processed.
      */
-    @SuppressForbidden(reason = "Object#notifyAll")
     private void updateCheckpoint(AtomicLong checkPoint, LongObjectHashMap<CountedBitSet> bitSetMap) {
         assert Thread.holdsLock(this);
         assert getBitSetForSeqNo(bitSetMap, checkPoint.get() + 1).get(seqNoToBitSetOffset(checkPoint.get() + 1)) :
-            "updateCheckpoint is called but the bit following the checkpoint is not set";
-        try {
-            // keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words
-            long bitSetKey = getBitSetKey(checkPoint.get());
-            CountedBitSet current = bitSetMap.get(bitSetKey);
-            if (current == null) {
-                // the bit set corresponding to the checkpoint has already been removed, set ourselves up for the next bit set
-                assert checkPoint.get() % BIT_SET_SIZE == BIT_SET_SIZE - 1;
+                "updateCheckpoint is called but the bit following the checkpoint is not set";
+        // keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words
+        long bitSetKey = getBitSetKey(checkPoint.get());
+        CountedBitSet current = bitSetMap.get(bitSetKey);
+        if (current == null) {
+            // the bit set corresponding to the checkpoint has already been removed, set ourselves up for the next bit set
+            assert checkPoint.get() % BIT_SET_SIZE == BIT_SET_SIZE - 1;
+            current = bitSetMap.get(++bitSetKey);
+        }
+        do {
+            checkPoint.incrementAndGet();
+            /*
+             * The checkpoint always falls in the current bit set or we have already cleaned it; if it falls on the last bit of the
+             * current bit set, we can clean it.
+             */
+            if (checkPoint.get() == lastSeqNoInBitSet(bitSetKey)) {
+                assert current != null;
+                final CountedBitSet removed = bitSetMap.remove(bitSetKey);
+                assert removed == current;
                 current = bitSetMap.get(++bitSetKey);
             }
-            do {
-                checkPoint.incrementAndGet();
-                /*
-                 * The checkpoint always falls in the current bit set or we have already cleaned it; if it falls on the last bit of the
-                 * current bit set, we can clean it.
-                 */
-                if (checkPoint.get() == lastSeqNoInBitSet(bitSetKey)) {
-                    assert current != null;
-                    final CountedBitSet removed = bitSetMap.remove(bitSetKey);
-                    assert removed == current;
-                    current = bitSetMap.get(++bitSetKey);
-                }
-            } while (current != null && current.get(seqNoToBitSetOffset(checkPoint.get() + 1)));
-        } finally {
-            // notifies waiters in waitForProcessedOpsToComplete
-            this.notifyAll();
-        }
+        } while (current != null && current.get(seqNoToBitSetOffset(checkPoint.get() + 1)));
     }
 
     private static long lastSeqNoInBitSet(final long bitSetKey) {

+ 4 - 1
server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java

@@ -35,6 +35,7 @@ import java.util.Collections;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.instanceOf;
 
 public class NoOpEngineTests extends EngineTestCase {
@@ -123,7 +124,9 @@ public class NoOpEngineTests extends EngineTestCase {
                         deletions += 1;
                     }
                 }
-                engine.getLocalCheckpointTracker().waitForProcessedOpsToComplete(numDocs + deletions - 1);
+                final long awaitedCheckpoint = numDocs + deletions - 1;
+                assertBusy(() ->
+                        assertThat(engine.getLocalCheckpointTracker().getProcessedCheckpoint(), greaterThanOrEqualTo(awaitedCheckpoint)));
                 engine.flush(true, true);
             }
 

+ 0 - 40
server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java

@@ -9,7 +9,6 @@
 package org.elasticsearch.index.seqno;
 
 import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.test.ESTestCase;
 import org.junit.Before;
@@ -19,9 +18,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -252,43 +249,6 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
         }
     }
 
-    public void testWaitForOpsToComplete() throws BrokenBarrierException, InterruptedException {
-        final int seqNo = randomIntBetween(0, 32);
-        final CyclicBarrier barrier = new CyclicBarrier(2);
-        final AtomicBoolean complete = new AtomicBoolean();
-        final Thread thread = new Thread(() -> {
-            try {
-                // sychronize starting with the test thread
-                barrier.await();
-                tracker.waitForProcessedOpsToComplete(seqNo);
-                complete.set(true);
-                // synchronize with the test thread checking if we are no longer waiting
-                barrier.await();
-            } catch (BrokenBarrierException | InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        });
-
-        thread.start();
-
-        // synchronize starting with the waiting thread
-        barrier.await();
-
-        final List<Integer> elements = IntStream.rangeClosed(0, seqNo).boxed().collect(Collectors.toList());
-        Randomness.shuffle(elements);
-        for (int i = 0; i < elements.size() - 1; i++) {
-            tracker.markSeqNoAsProcessed(elements.get(i));
-            assertFalse(complete.get());
-        }
-
-        tracker.markSeqNoAsProcessed(elements.get(elements.size() - 1));
-        // synchronize with the waiting thread to mark that it is complete
-        barrier.await();
-        assertTrue(complete.get());
-
-        thread.join();
-    }
-
     public void testContains() {
         final long maxSeqNo = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, 100);
         final long localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo);

+ 3 - 2
test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

@@ -1212,8 +1212,9 @@ public abstract class EngineTestCase extends ESTestCase {
      * @param seqNo the sequence number that the checkpoint must advance to before this method returns
      * @throws InterruptedException if the thread was interrupted while blocking on the condition
      */
-    public static void waitForOpsToComplete(InternalEngine engine, long seqNo) throws InterruptedException {
-        engine.getLocalCheckpointTracker().waitForProcessedOpsToComplete(seqNo);
+    public static void waitForOpsToComplete(InternalEngine engine, long seqNo) throws Exception {
+        assertBusy(() ->
+                assertThat(engine.getLocalCheckpointTracker().getProcessedCheckpoint(), greaterThanOrEqualTo(seqNo)));
     }
 
     public static boolean hasSnapshottedCommits(Engine engine) {