|
|
@@ -54,6 +54,7 @@ import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.Semaphore;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
@@ -762,6 +763,11 @@ public class ClusterServiceIT extends ESIntegTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ int numberOfThreads = randomIntBetween(2, 8);
|
|
|
+ int tasksSubmittedPerThread = randomIntBetween(1, 1024);
|
|
|
+ int numberOfExecutors = Math.max(1, numberOfThreads / 4);
|
|
|
+ final Semaphore semaphore = new Semaphore(numberOfExecutors);
|
|
|
+
|
|
|
class TaskExecutor implements ClusterStateTaskExecutor<Task> {
|
|
|
private AtomicInteger counter = new AtomicInteger();
|
|
|
private AtomicInteger batches = new AtomicInteger();
|
|
|
@@ -775,6 +781,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
|
|
|
if (randomBoolean()) {
|
|
|
maybeUpdatedClusterState = ClusterState.builder(currentState).build();
|
|
|
batches.incrementAndGet();
|
|
|
+ semaphore.acquire();
|
|
|
}
|
|
|
return BatchResult.<Task>builder().successes(tasks).build(maybeUpdatedClusterState);
|
|
|
}
|
|
|
@@ -787,10 +794,9 @@ public class ClusterServiceIT extends ESIntegTestCase {
|
|
|
@Override
|
|
|
public void clusterStatePublished(ClusterState newClusterState) {
|
|
|
published.incrementAndGet();
|
|
|
+ semaphore.release();
|
|
|
}
|
|
|
}
|
|
|
- int numberOfThreads = randomIntBetween(2, 8);
|
|
|
- int tasksSubmittedPerThread = randomIntBetween(1, 1024);
|
|
|
|
|
|
ConcurrentMap<String, AtomicInteger> counters = new ConcurrentHashMap<>();
|
|
|
CountDownLatch updateLatch = new CountDownLatch(numberOfThreads * tasksSubmittedPerThread);
|
|
|
@@ -807,7 +813,6 @@ public class ClusterServiceIT extends ESIntegTestCase {
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- int numberOfExecutors = Math.max(1, numberOfThreads / 4);
|
|
|
List<TaskExecutor> executors = new ArrayList<>();
|
|
|
for (int i = 0; i < numberOfExecutors; i++) {
|
|
|
executors.add(new TaskExecutor());
|
|
|
@@ -853,6 +858,8 @@ public class ClusterServiceIT extends ESIntegTestCase {
|
|
|
|
|
|
// wait until all the cluster state updates have been processed
|
|
|
updateLatch.await();
|
|
|
+ // and until all of the publication callbacks have completed
|
|
|
+ semaphore.acquire(numberOfExecutors);
|
|
|
|
|
|
// assert the number of executed tasks is correct
|
|
|
assertEquals(numberOfThreads * tasksSubmittedPerThread, counter.get());
|