فهرست منبع

Introduce safeAcquire with permits for tests (#109793)

Iraklis Psaroudakis 1 سال پیش
والد
کامیت
52e53e25e7

+ 1 - 1
server/src/test/java/org/elasticsearch/cluster/coordination/JoinValidationServiceTests.java

@@ -234,7 +234,7 @@ public class JoinValidationServiceTests extends ESTestCase {
             for (final var thread : threads) {
                 thread.join();
             }
-            assertTrue(validationPermits.tryAcquire(permitCount, 10, TimeUnit.SECONDS));
+            safeAcquire(permitCount, validationPermits);
             assertBusy(() -> assertTrue(joinValidationService.isIdle()));
         } finally {
             Collections.reverse(releasables);

+ 3 - 3
server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java

@@ -1927,9 +1927,9 @@ public class MasterServiceTests extends ESTestCase {
             );
 
             barrier.await(10, TimeUnit.SECONDS);
-            assertTrue(smallBatchExecutor.semaphore.tryAcquire(4, 10, TimeUnit.SECONDS));
-            assertTrue(manySourceExecutor.semaphore.tryAcquire(2048, 10, TimeUnit.SECONDS));
-            assertTrue(manyTasksPerSourceExecutor.semaphore.tryAcquire(2048, 10, TimeUnit.SECONDS));
+            safeAcquire(4, smallBatchExecutor.semaphore);
+            safeAcquire(2048, manySourceExecutor.semaphore);
+            safeAcquire(2048, manyTasksPerSourceExecutor.semaphore);
             mockLog.assertAllExpectationsMatched();
         }
     }

+ 2 - 2
server/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java

@@ -78,7 +78,7 @@ public class AsyncIOProcessorTests extends ESTestCase {
         for (int i = 0; i < thread.length; i++) {
             thread[i].join();
         }
-        assertTrue(semaphore.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
+        safeAcquire(10, semaphore);
         assertEquals(count * thread.length, received.get());
     }
 
@@ -131,7 +131,7 @@ public class AsyncIOProcessorTests extends ESTestCase {
         for (int i = 0; i < thread.length; i++) {
             thread[i].join();
         }
-        assertTrue(semaphore.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
+        safeAcquire(Integer.MAX_VALUE, semaphore);
         assertEquals(count * thread.length, received.get());
         assertEquals(actualFailed.get(), failed.get());
     }

+ 2 - 2
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -1546,7 +1546,7 @@ public class IndexShardTests extends IndexShardTestCase {
                 thread[i].join();
             }
         }
-        assertTrue(semaphore.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
+        safeAcquire(Integer.MAX_VALUE, semaphore);
 
         closeShards(shard);
     }
@@ -1604,7 +1604,7 @@ public class IndexShardTests extends IndexShardTestCase {
         for (int i = 0; i < thread.length; i++) {
             thread[i].join();
         }
-        assertTrue(semaphore.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
+        safeAcquire(Integer.MAX_VALUE, semaphore);
         assertEquals(shard.getLastKnownGlobalCheckpoint(), shard.getLastSyncedGlobalCheckpoint());
 
         closeShards(shard);

+ 2 - 2
server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java

@@ -309,7 +309,7 @@ public class ClusterConnectionManagerTests extends ESTestCase {
 
         if (nodeConnectedCount.get() == 0) {
             // Any successful connections were closed
-            assertTrue(pendingCloses.tryAcquire(threadCount, 10, TimeUnit.SECONDS));
+            safeAcquire(threadCount, pendingCloses);
             pendingCloses.release(threadCount);
             assertTrue(connections.stream().allMatch(Transport.Connection::isClosed));
             assertEquals(0, connectionManager.size());
@@ -320,7 +320,7 @@ public class ClusterConnectionManagerTests extends ESTestCase {
 
         if (randomBoolean()) {
             Releasables.close(releasables);
-            assertTrue(pendingCloses.tryAcquire(threadCount, 10, TimeUnit.SECONDS));
+            safeAcquire(threadCount, pendingCloses);
             pendingCloses.release(threadCount);
             assertEquals(0, connectionManager.size());
             assertTrue(connections.stream().allMatch(Transport.Connection::isClosed));

+ 1 - 1
server/src/test/java/org/elasticsearch/transport/TransportServiceLifecycleTests.java

@@ -120,7 +120,7 @@ public class TransportServiceLifecycleTests extends ESTestCase {
             // every handler is completed even if the request or response are being handled concurrently with shutdown
 
             keepGoing.set(false);
-            assertTrue(requestPermits.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
+            safeAcquire(Integer.MAX_VALUE, requestPermits);
             for (final var thread : threads) {
                 thread.join();
             }

+ 6 - 2
test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

@@ -2211,14 +2211,18 @@ public abstract class ESTestCase extends LuceneTestCase {
     }
 
     public static void safeAcquire(Semaphore semaphore) {
+        safeAcquire(1, semaphore);
+    }
+
+    public static void safeAcquire(int permits, Semaphore semaphore) {
         try {
             assertTrue(
                 "safeAcquire: Semaphore did not acquire permit within the timeout",
-                semaphore.tryAcquire(SAFE_AWAIT_TIMEOUT.millis(), TimeUnit.MILLISECONDS)
+                semaphore.tryAcquire(permits, SAFE_AWAIT_TIMEOUT.millis(), TimeUnit.MILLISECONDS)
             );
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-            fail(e, "safeAcquire: interrupted waiting for Semaphore to acquire permit");
+            fail(e, "safeAcquire: interrupted waiting for Semaphore to acquire " + permits + " permit(s)");
         }
     }
 

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

@@ -1257,7 +1257,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         }
         waitForever.countDown();
         doneWaitingForever.await();
-        assertTrue(inFlight.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
+        safeAcquire(Integer.MAX_VALUE, inFlight);
     }
 
     @TestLogging(