|
@@ -77,7 +77,6 @@ import java.util.Random;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.Semaphore;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
-import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.function.Consumer;
|
|
|
import java.util.function.LongSupplier;
|
|
@@ -380,7 +379,6 @@ public class RepositoryAnalyzeAction extends HandledTransportAction<RepositoryAn
|
|
|
// choose the blob path nondeterministically to avoid clashes, assuming that the actual path doesn't matter for reproduction
|
|
|
private final String blobPath = "temp-analysis-" + UUIDs.randomBase64UUID();
|
|
|
|
|
|
- private final AtomicLong expectedRegisterValue = new AtomicLong();
|
|
|
private final Queue<Consumer<Releasable>> queue = ConcurrentCollections.newQueue();
|
|
|
private final AtomicReference<Exception> failure = new AtomicReference<>();
|
|
|
private final Semaphore innerFailures = new Semaphore(5); // limit the number of suppressed failures
|
|
@@ -486,16 +484,17 @@ public class RepositoryAnalyzeAction extends HandledTransportAction<RepositoryAn
|
|
|
if (minClusterTransportVersion.onOrAfter(TransportVersions.V_8_8_0)) {
|
|
|
final String contendedRegisterName = CONTENDED_REGISTER_NAME_PREFIX + UUIDs.randomBase64UUID(random);
|
|
|
final AtomicBoolean contendedRegisterAnalysisComplete = new AtomicBoolean();
|
|
|
+ final int registerOperations = Math.max(nodes.size(), request.getRegisterOperationCount());
|
|
|
try (
|
|
|
var registerRefs = new RefCountingRunnable(
|
|
|
finalRegisterValueVerifier(
|
|
|
contendedRegisterName,
|
|
|
+ registerOperations,
|
|
|
random,
|
|
|
Releasables.wrap(requestRefs.acquire(), () -> contendedRegisterAnalysisComplete.set(true))
|
|
|
)
|
|
|
)
|
|
|
) {
|
|
|
- final int registerOperations = Math.max(nodes.size(), request.getRegisterOperationCount());
|
|
|
for (int i = 0; i < registerOperations; i++) {
|
|
|
final ContendedRegisterAnalyzeAction.Request registerAnalyzeRequest = new ContendedRegisterAnalyzeAction.Request(
|
|
|
request.getRepositoryName(),
|
|
@@ -631,9 +630,7 @@ public class RepositoryAnalyzeAction extends HandledTransportAction<RepositoryAn
|
|
|
TransportRequestOptions.EMPTY,
|
|
|
new ActionListenerResponseHandler<>(ActionListener.releaseAfter(new ActionListener<>() {
|
|
|
@Override
|
|
|
- public void onResponse(ActionResponse.Empty response) {
|
|
|
- expectedRegisterValue.incrementAndGet();
|
|
|
- }
|
|
|
+ public void onResponse(ActionResponse.Empty response) {}
|
|
|
|
|
|
@Override
|
|
|
public void onFailure(Exception exp) {
|
|
@@ -647,7 +644,7 @@ public class RepositoryAnalyzeAction extends HandledTransportAction<RepositoryAn
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private Runnable finalRegisterValueVerifier(String registerName, Random random, Releasable ref) {
|
|
|
+ private Runnable finalRegisterValueVerifier(String registerName, int expectedFinalRegisterValue, Random random, Releasable ref) {
|
|
|
return new Runnable() {
|
|
|
|
|
|
final CheckedConsumer<ActionListener<OptionalBytesReference>, Exception> finalValueReader = switch (random.nextInt(3)) {
|
|
@@ -706,12 +703,9 @@ public class RepositoryAnalyzeAction extends HandledTransportAction<RepositoryAn
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- long expectedFinalRegisterValue = Long.MIN_VALUE;
|
|
|
-
|
|
|
@Override
|
|
|
public void run() {
|
|
|
if (isRunning()) {
|
|
|
- expectedFinalRegisterValue = expectedRegisterValue.get();
|
|
|
transportService.getThreadPool()
|
|
|
.executor(ThreadPool.Names.SNAPSHOT)
|
|
|
.execute(ActionRunnable.wrap(ActionListener.releaseAfter(new ActionListener<>() {
|