|
@@ -11,8 +11,8 @@ import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
import org.elasticsearch.ElasticsearchTimeoutException;
|
|
|
import org.elasticsearch.ExceptionsHelper;
|
|
|
+import org.elasticsearch.TransportVersion;
|
|
|
import org.elasticsearch.TransportVersions;
|
|
|
-import org.elasticsearch.Version;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.ActionListenerResponseHandler;
|
|
|
import org.elasticsearch.action.ActionRequest;
|
|
@@ -74,6 +74,7 @@ import java.util.Queue;
|
|
|
import java.util.Random;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.Semaphore;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.function.Consumer;
|
|
@@ -98,6 +99,7 @@ public class RepositoryAnalyzeAction extends ActionType<RepositoryAnalyzeAction.
|
|
|
public static final RepositoryAnalyzeAction INSTANCE = new RepositoryAnalyzeAction();
|
|
|
public static final String NAME = "cluster:admin/repository/analyze";
|
|
|
|
|
|
+ static final String UNCONTENDED_REGISTER_NAME_PREFIX = "test-register-uncontended-";
|
|
|
static final String CONTENDED_REGISTER_NAME_PREFIX = "test-register-contended-";
|
|
|
|
|
|
private RepositoryAnalyzeAction() {
|
|
@@ -147,6 +149,7 @@ public class RepositoryAnalyzeAction extends ActionType<RepositoryAnalyzeAction.
|
|
|
(CancellableTask) task,
|
|
|
request,
|
|
|
state.nodes(),
|
|
|
+ state.getMinTransportVersion(),
|
|
|
threadPool::relativeTimeInMillis,
|
|
|
listener
|
|
|
).run();
|
|
@@ -368,6 +371,7 @@ public class RepositoryAnalyzeAction extends ActionType<RepositoryAnalyzeAction.
|
|
|
private final CancellableTask task;
|
|
|
private final Request request;
|
|
|
private final DiscoveryNodes discoveryNodes;
|
|
|
+ private final TransportVersion minClusterTransportVersion;
|
|
|
private final LongSupplier currentTimeMillisSupplier;
|
|
|
private final ActionListener<Response> listener;
|
|
|
private final SubscribableListener<Void> cancellationListener;
|
|
@@ -391,6 +395,7 @@ public class RepositoryAnalyzeAction extends ActionType<RepositoryAnalyzeAction.
|
|
|
CancellableTask task,
|
|
|
Request request,
|
|
|
DiscoveryNodes discoveryNodes,
|
|
|
+ TransportVersion minClusterTransportVersion,
|
|
|
LongSupplier currentTimeMillisSupplier,
|
|
|
ActionListener<Response> listener
|
|
|
) {
|
|
@@ -399,6 +404,7 @@ public class RepositoryAnalyzeAction extends ActionType<RepositoryAnalyzeAction.
|
|
|
this.task = task;
|
|
|
this.request = request;
|
|
|
this.discoveryNodes = discoveryNodes;
|
|
|
+ this.minClusterTransportVersion = minClusterTransportVersion;
|
|
|
this.currentTimeMillisSupplier = currentTimeMillisSupplier;
|
|
|
this.timeoutTimeMillis = currentTimeMillisSupplier.getAsLong() + request.getTimeout().millis();
|
|
|
|
|
@@ -482,22 +488,35 @@ public class RepositoryAnalyzeAction extends ActionType<RepositoryAnalyzeAction.
|
|
|
final Random random = new Random(request.getSeed());
|
|
|
final List<DiscoveryNode> nodes = getSnapshotNodes(discoveryNodes);
|
|
|
|
|
|
- final String contendedRegisterName = CONTENDED_REGISTER_NAME_PREFIX + UUIDs.randomBase64UUID(random);
|
|
|
- try (
|
|
|
- var registerRefs = new RefCountingRunnable(finalRegisterValueVerifier(contendedRegisterName, random, requestRefs.acquire()))
|
|
|
- ) {
|
|
|
- final int registerOperations = Math.max(nodes.size(), request.getConcurrency());
|
|
|
- for (int i = 0; i < registerOperations; i++) {
|
|
|
- final ContendedRegisterAnalyzeAction.Request registerAnalyzeRequest = new ContendedRegisterAnalyzeAction.Request(
|
|
|
- request.getRepositoryName(),
|
|
|
- blobPath,
|
|
|
- contendedRegisterName,
|
|
|
- registerOperations,
|
|
|
- random.nextInt((registerOperations + 1) * 2)
|
|
|
- );
|
|
|
- final DiscoveryNode node = nodes.get(i < nodes.size() ? i : random.nextInt(nodes.size()));
|
|
|
- final Releasable registerRef = registerRefs.acquire();
|
|
|
- queue.add(ref -> runContendedRegisterAnalysis(Releasables.wrap(registerRef, ref), registerAnalyzeRequest, node));
|
|
|
+ if (minClusterTransportVersion.onOrAfter(TransportVersions.V_8_8_0)) {
|
|
|
+ final String contendedRegisterName = CONTENDED_REGISTER_NAME_PREFIX + UUIDs.randomBase64UUID(random);
|
|
|
+ final AtomicBoolean contendedRegisterAnalysisComplete = new AtomicBoolean();
|
|
|
+ try (
|
|
|
+ var registerRefs = new RefCountingRunnable(
|
|
|
+ finalRegisterValueVerifier(
|
|
|
+ contendedRegisterName,
|
|
|
+ random,
|
|
|
+ Releasables.wrap(requestRefs.acquire(), () -> contendedRegisterAnalysisComplete.set(true))
|
|
|
+ )
|
|
|
+ )
|
|
|
+ ) {
|
|
|
+ final int registerOperations = Math.max(nodes.size(), request.getConcurrency());
|
|
|
+ for (int i = 0; i < registerOperations; i++) {
|
|
|
+ final ContendedRegisterAnalyzeAction.Request registerAnalyzeRequest = new ContendedRegisterAnalyzeAction.Request(
|
|
|
+ request.getRepositoryName(),
|
|
|
+ blobPath,
|
|
|
+ contendedRegisterName,
|
|
|
+ registerOperations,
|
|
|
+ random.nextInt((registerOperations + 1) * 2)
|
|
|
+ );
|
|
|
+ final DiscoveryNode node = nodes.get(i < nodes.size() ? i : random.nextInt(nodes.size()));
|
|
|
+ final Releasable registerRef = registerRefs.acquire();
|
|
|
+ queue.add(ref -> runContendedRegisterAnalysis(Releasables.wrap(registerRef, ref), registerAnalyzeRequest, node));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (minClusterTransportVersion.onOrAfter(TransportVersions.UNCONTENDED_REGISTER_ANALYSIS_ADDED)) {
|
|
|
+ new UncontendedRegisterAnalysis(new Random(random.nextLong()), nodes, contendedRegisterAnalysisComplete).run();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -600,7 +619,7 @@ public class RepositoryAnalyzeAction extends ActionType<RepositoryAnalyzeAction.
|
|
|
}
|
|
|
|
|
|
private void runContendedRegisterAnalysis(Releasable ref, ContendedRegisterAnalyzeAction.Request request, DiscoveryNode node) {
|
|
|
- if (node.getVersion().onOrAfter(Version.V_8_8_0) && isRunning()) {
|
|
|
+ if (isRunning()) {
|
|
|
transportService.sendChildRequest(
|
|
|
node,
|
|
|
ContendedRegisterAnalyzeAction.NAME,
|
|
@@ -690,6 +709,59 @@ public class RepositoryAnalyzeAction extends ActionType<RepositoryAnalyzeAction.
|
|
|
};
|
|
|
}
|
|
|
|
|
|
+ private class UncontendedRegisterAnalysis implements Runnable {
|
|
|
+ private final Random random;
|
|
|
+ private final String registerName;
|
|
|
+ private final List<DiscoveryNode> nodes;
|
|
|
+ private final AtomicBoolean otherAnalysisComplete;
|
|
|
+ private int currentValue; // actions run in strict sequence so no need for synchronization
|
|
|
+
|
|
|
+ UncontendedRegisterAnalysis(Random random, List<DiscoveryNode> nodes, AtomicBoolean otherAnalysisComplete) {
|
|
|
+ this.random = random;
|
|
|
+ this.registerName = UNCONTENDED_REGISTER_NAME_PREFIX + UUIDs.randomBase64UUID(random);
|
|
|
+ this.nodes = nodes;
|
|
|
+ this.otherAnalysisComplete = otherAnalysisComplete;
|
|
|
+ }
|
|
|
+
|
|
|
+ private final ActionListener<ActionResponse.Empty> stepListener = new ActionListener<>() {
|
|
|
+ @Override
|
|
|
+ public void onResponse(ActionResponse.Empty ignored) {
|
|
|
+ currentValue += 1;
|
|
|
+ run();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e) {
|
|
|
+ fail(e);
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ if (isRunning() == false) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // complete at least request.getConcurrency() steps, but we may as well keep running for longer too
|
|
|
+ if (currentValue > request.getConcurrency() && otherAnalysisComplete.get()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ transportService.sendChildRequest(
|
|
|
+ nodes.get(currentValue < nodes.size() ? currentValue : random.nextInt(nodes.size())),
|
|
|
+ UncontendedRegisterAnalyzeAction.NAME,
|
|
|
+ new UncontendedRegisterAnalyzeAction.Request(request.getRepositoryName(), blobPath, registerName, currentValue),
|
|
|
+ task,
|
|
|
+ TransportRequestOptions.EMPTY,
|
|
|
+ new ActionListenerResponseHandler<>(
|
|
|
+ ActionListener.releaseAfter(stepListener, requestRefs.acquire()),
|
|
|
+ in -> ActionResponse.Empty.INSTANCE,
|
|
|
+ TransportResponseHandler.TRANSPORT_WORKER
|
|
|
+ )
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void runCleanUp() {
|
|
|
transportService.getThreadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
|
|
|
final long listingStartTimeNanos = System.nanoTime();
|