|
@@ -42,6 +42,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
|
|
import org.elasticsearch.common.util.concurrent.ThrottledIterator;
|
|
|
+import org.elasticsearch.core.CheckedConsumer;
|
|
|
import org.elasticsearch.core.Releasable;
|
|
|
import org.elasticsearch.core.Releasables;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
@@ -647,67 +648,110 @@ public class RepositoryAnalyzeAction extends HandledTransportAction<RepositoryAn
|
|
|
}
|
|
|
|
|
|
private Runnable finalRegisterValueVerifier(String registerName, Random random, Releasable ref) {
|
|
|
- return () -> {
|
|
|
- if (isRunning()) {
|
|
|
- final var expectedFinalRegisterValue = expectedRegisterValue.get();
|
|
|
- transportService.getThreadPool()
|
|
|
- .executor(ThreadPool.Names.SNAPSHOT)
|
|
|
- .execute(ActionRunnable.wrap(ActionListener.releaseAfter(new ActionListener<OptionalBytesReference>() {
|
|
|
- @Override
|
|
|
- public void onResponse(OptionalBytesReference actualFinalRegisterValue) {
|
|
|
- if (actualFinalRegisterValue.isPresent() == false
|
|
|
- || longFromBytes(actualFinalRegisterValue.bytesReference()) != expectedFinalRegisterValue) {
|
|
|
- fail(
|
|
|
- new RepositoryVerificationException(
|
|
|
- request.getRepositoryName(),
|
|
|
- Strings.format(
|
|
|
- "register [%s] should have value [%d] but instead had value [%s]",
|
|
|
- registerName,
|
|
|
- expectedFinalRegisterValue,
|
|
|
- actualFinalRegisterValue
|
|
|
+ return new Runnable() {
|
|
|
+
|
|
|
+ final CheckedConsumer<ActionListener<OptionalBytesReference>, Exception> finalValueReader = switch (random.nextInt(3)) {
|
|
|
+ case 0 -> new CheckedConsumer<ActionListener<OptionalBytesReference>, Exception>() {
|
|
|
+ @Override
|
|
|
+ public void accept(ActionListener<OptionalBytesReference> listener) {
|
|
|
+ getBlobContainer().getRegister(OperationPurpose.REPOSITORY_ANALYSIS, registerName, listener);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return "getRegister";
|
|
|
+ }
|
|
|
+ };
|
|
|
+ case 1 -> new CheckedConsumer<ActionListener<OptionalBytesReference>, Exception>() {
|
|
|
+ @Override
|
|
|
+ public void accept(ActionListener<OptionalBytesReference> listener) {
|
|
|
+ getBlobContainer().compareAndExchangeRegister(
|
|
|
+ OperationPurpose.REPOSITORY_ANALYSIS,
|
|
|
+ registerName,
|
|
|
+ bytesFromLong(expectedFinalRegisterValue),
|
|
|
+ new BytesArray(new byte[] { (byte) 0xff }),
|
|
|
+ listener
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return "compareAndExchangeRegister";
|
|
|
+ }
|
|
|
+ };
|
|
|
+ case 2 -> new CheckedConsumer<ActionListener<OptionalBytesReference>, Exception>() {
|
|
|
+ @Override
|
|
|
+ public void accept(ActionListener<OptionalBytesReference> listener) {
|
|
|
+ getBlobContainer().compareAndSetRegister(
|
|
|
+ OperationPurpose.REPOSITORY_ANALYSIS,
|
|
|
+ registerName,
|
|
|
+ bytesFromLong(expectedFinalRegisterValue),
|
|
|
+ new BytesArray(new byte[] { (byte) 0xff }),
|
|
|
+ listener.map(
|
|
|
+ b -> b
|
|
|
+ ? OptionalBytesReference.of(bytesFromLong(expectedFinalRegisterValue))
|
|
|
+ : OptionalBytesReference.MISSING
|
|
|
+ )
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return "compareAndSetRegister";
|
|
|
+ }
|
|
|
+ };
|
|
|
+ default -> {
|
|
|
+ assert false;
|
|
|
+ throw new IllegalStateException();
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ long expectedFinalRegisterValue = Long.MIN_VALUE;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ if (isRunning()) {
|
|
|
+ expectedFinalRegisterValue = expectedRegisterValue.get();
|
|
|
+ transportService.getThreadPool()
|
|
|
+ .executor(ThreadPool.Names.SNAPSHOT)
|
|
|
+ .execute(ActionRunnable.wrap(ActionListener.releaseAfter(new ActionListener<>() {
|
|
|
+ @Override
|
|
|
+ public void onResponse(OptionalBytesReference actualFinalRegisterValue) {
|
|
|
+ if (actualFinalRegisterValue.isPresent() == false
|
|
|
+ || longFromBytes(actualFinalRegisterValue.bytesReference()) != expectedFinalRegisterValue) {
|
|
|
+ fail(
|
|
|
+ new RepositoryVerificationException(
|
|
|
+ request.getRepositoryName(),
|
|
|
+ Strings.format(
|
|
|
+ """
|
|
|
+ Successfully completed all [%d] atomic increments of register [%s] so its expected \
|
|
|
+ value is [%s], but reading its value with [%s] unexpectedly yielded [%s]. This \
|
|
|
+ anomaly may indicate an atomicity failure amongst concurrent compare-and-exchange \
|
|
|
+ operations on registers in this repository.""",
|
|
|
+ expectedFinalRegisterValue,
|
|
|
+ registerName,
|
|
|
+ OptionalBytesReference.of(bytesFromLong(expectedFinalRegisterValue)),
|
|
|
+ finalValueReader.toString(),
|
|
|
+ actualFinalRegisterValue
|
|
|
+ )
|
|
|
)
|
|
|
- )
|
|
|
- );
|
|
|
+ );
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- @Override
|
|
|
- public void onFailure(Exception exp) {
|
|
|
- // Registers are not supported on all repository types, and that's ok.
|
|
|
- if (exp instanceof UnsupportedOperationException == false) {
|
|
|
- fail(exp);
|
|
|
- }
|
|
|
- }
|
|
|
- }, ref), listener -> {
|
|
|
- switch (random.nextInt(3)) {
|
|
|
- case 0 -> getBlobContainer().getRegister(OperationPurpose.REPOSITORY_ANALYSIS, registerName, listener);
|
|
|
- case 1 -> getBlobContainer().compareAndExchangeRegister(
|
|
|
- OperationPurpose.REPOSITORY_ANALYSIS,
|
|
|
- registerName,
|
|
|
- bytesFromLong(expectedFinalRegisterValue),
|
|
|
- new BytesArray(new byte[] { (byte) 0xff }),
|
|
|
- listener
|
|
|
- );
|
|
|
- case 2 -> getBlobContainer().compareAndSetRegister(
|
|
|
- OperationPurpose.REPOSITORY_ANALYSIS,
|
|
|
- registerName,
|
|
|
- bytesFromLong(expectedFinalRegisterValue),
|
|
|
- new BytesArray(new byte[] { (byte) 0xff }),
|
|
|
- listener.map(
|
|
|
- b -> b
|
|
|
- ? OptionalBytesReference.of(bytesFromLong(expectedFinalRegisterValue))
|
|
|
- : OptionalBytesReference.MISSING
|
|
|
- )
|
|
|
- );
|
|
|
- default -> {
|
|
|
- assert false;
|
|
|
- throw new IllegalStateException();
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception exp) {
|
|
|
+ // Registers are not supported on all repository types, and that's ok.
|
|
|
+ if (exp instanceof UnsupportedOperationException == false) {
|
|
|
+ fail(exp);
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- }));
|
|
|
- } else {
|
|
|
- ref.close();
|
|
|
+ }, ref), finalValueReader));
|
|
|
+ } else {
|
|
|
+ ref.close();
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
};
|
|
|
}
|
|
|
|