|
@@ -57,9 +57,13 @@ import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.function.Consumer;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
+import static org.elasticsearch.repositories.blobstore.testkit.ContendedRegisterAnalyzeAction.longFromBytes;
|
|
|
import static org.elasticsearch.repositories.blobstore.testkit.RepositoryAnalysisFailureIT.isContendedRegisterKey;
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
|
+import static org.hamcrest.Matchers.allOf;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
+import static org.hamcrest.Matchers.greaterThan;
|
|
|
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
|
|
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
|
|
import static org.hamcrest.Matchers.nullValue;
|
|
|
import static org.hamcrest.Matchers.startsWith;
|
|
@@ -110,6 +114,11 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase {
|
|
|
blobStore.setMaxBlobCount(request.getBlobCount());
|
|
|
}
|
|
|
|
|
|
+ if (randomBoolean()) {
|
|
|
+ request.registerOperationCount(between(internalCluster().size(), request.getRegisterOperationCount() * 2));
|
|
|
+ blobStore.setExpectedRegisterOperationCount(request.getRegisterOperationCount());
|
|
|
+ }
|
|
|
+
|
|
|
if (request.getBlobCount() > 3 || randomBoolean()) {
|
|
|
// only use the default blob size of 10MB if writing a small number of blobs, since this is all in-memory
|
|
|
request.maxBlobSize(ByteSizeValue.ofBytes(between(1, 2048)));
|
|
@@ -209,6 +218,7 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase {
|
|
|
private int maxBlobCount = new RepositoryAnalyzeAction.Request("dummy").getBlobCount();
|
|
|
private long maxBlobSize = new RepositoryAnalyzeAction.Request("dummy").getMaxBlobSize().getBytes();
|
|
|
private long maxTotalBlobSize = new RepositoryAnalyzeAction.Request("dummy").getMaxTotalDataSize().getBytes();
|
|
|
+ private int expectedRegisterOperationCount = new RepositoryAnalyzeAction.Request("dummy").getRegisterOperationCount();
|
|
|
|
|
|
AssertingBlobStore(@Nullable String basePath) {
|
|
|
this.pathPrefix = basePath == null ? "" : basePath + "/";
|
|
@@ -227,7 +237,8 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase {
|
|
|
writeSemaphore,
|
|
|
maxBlobCount,
|
|
|
maxBlobSize,
|
|
|
- maxTotalBlobSize
|
|
|
+ maxTotalBlobSize,
|
|
|
+ expectedRegisterOperationCount
|
|
|
);
|
|
|
}
|
|
|
assertThat(path.buildAsString(), equalTo(currentPath));
|
|
@@ -266,6 +277,10 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase {
|
|
|
public void setMaxTotalBlobSize(long maxTotalBlobSize) {
|
|
|
this.maxTotalBlobSize = maxTotalBlobSize;
|
|
|
}
|
|
|
+
|
|
|
+ public void setExpectedRegisterOperationCount(int expectedRegisterOperationCount) {
|
|
|
+ this.expectedRegisterOperationCount = expectedRegisterOperationCount;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static class AssertingBlobContainer implements BlobContainer {
|
|
@@ -278,18 +293,24 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase {
|
|
|
private final int maxBlobCount;
|
|
|
private final long maxBlobSize;
|
|
|
private final long maxTotalBlobSize;
|
|
|
+ private final long expectedRegisterOperationCount;
|
|
|
private final Map<String, byte[]> blobs = ConcurrentCollections.newConcurrentMap();
|
|
|
private final AtomicLong totalBytesWritten = new AtomicLong();
|
|
|
private final Map<String, BytesRegister> registers = ConcurrentCollections.newConcurrentMap();
|
|
|
private final AtomicBoolean firstRegisterRead = new AtomicBoolean(true);
|
|
|
|
|
|
+ private final Object registerMutex = new Object();
|
|
|
+ private long contendedRegisterValue = 0L;
|
|
|
+ private long uncontendedRegisterValue = 0L;
|
|
|
+
|
|
|
AssertingBlobContainer(
|
|
|
BlobPath path,
|
|
|
Consumer<AssertingBlobContainer> deleteContainer,
|
|
|
Semaphore writeSemaphore,
|
|
|
int maxBlobCount,
|
|
|
long maxBlobSize,
|
|
|
- long maxTotalBlobSize
|
|
|
+ long maxTotalBlobSize,
|
|
|
+ long expectedRegisterOperationCount
|
|
|
) {
|
|
|
this.path = path;
|
|
|
this.deleteContainer = deleteContainer;
|
|
@@ -297,6 +318,7 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase {
|
|
|
this.maxBlobCount = maxBlobCount;
|
|
|
this.maxBlobSize = maxBlobSize;
|
|
|
this.maxTotalBlobSize = maxTotalBlobSize;
|
|
|
+ this.expectedRegisterOperationCount = expectedRegisterOperationCount;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -406,6 +428,10 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase {
|
|
|
@Override
|
|
|
public DeleteResult delete(OperationPurpose purpose) {
|
|
|
assertPurpose(purpose);
|
|
|
+ synchronized (registerMutex) {
|
|
|
+ assertThat(contendedRegisterValue, equalTo(expectedRegisterOperationCount));
|
|
|
+ assertThat(uncontendedRegisterValue, greaterThanOrEqualTo(expectedRegisterOperationCount));
|
|
|
+ }
|
|
|
deleteContainer.accept(this);
|
|
|
final DeleteResult deleteResult = new DeleteResult(blobs.size(), blobs.values().stream().mapToLong(b -> b.length).sum());
|
|
|
blobs.clear();
|
|
@@ -477,11 +503,38 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- listener.onResponse(
|
|
|
- OptionalBytesReference.of(
|
|
|
- registers.computeIfAbsent(key, ignored -> new BytesRegister()).compareAndExchange(expected, updated)
|
|
|
- )
|
|
|
- );
|
|
|
+ final BytesReference witness;
|
|
|
+ synchronized (registerMutex) {
|
|
|
+ // synchronized to avoid concurrent updates from interfering with the assertions which follow this update, but NB we aren't
|
|
|
+ // testing the atomicity of this particular compareAndExchange() operation (itself implemented with a lock), we're testing
|
|
|
+ // the sequence of how these operations are executed, so the mutex here is fine.
|
|
|
+
|
|
|
+ witness = registers.computeIfAbsent(key, ignored -> new BytesRegister()).compareAndExchange(expected, updated);
|
|
|
+
|
|
|
+ if (isContendedRegisterKey(key)) {
|
|
|
+ if (expected.equals(witness) // CAS succeeded
|
|
|
+ && expected.equals(updated) == false // CAS was a genuine update
|
|
|
+ && updated.length() != 1 // CAS was not the final verification step, which sometimes writes {0xff}
|
|
|
+ ) {
|
|
|
+ final var updatedValue = longFromBytes(updated);
|
|
|
+ assertThat(
|
|
|
+ updatedValue,
|
|
|
+ allOf(greaterThan(0L), lessThanOrEqualTo(expectedRegisterOperationCount), equalTo(contendedRegisterValue + 1))
|
|
|
+ );
|
|
|
+ contendedRegisterValue = updatedValue;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ assertEquals(expected, witness); // uncontended writes always succeed
|
|
|
+ assertNotEquals(expected, updated); // uncontended register sees only updates
|
|
|
+ if (updated.length() != 0) {
|
|
|
+ final var updatedValue = longFromBytes(updated);
|
|
|
+ assertThat(updatedValue, allOf(greaterThan(0L), equalTo(uncontendedRegisterValue + 1)));
|
|
|
+ uncontendedRegisterValue = updatedValue;
|
|
|
+ } // else this was the final step which writes an empty register
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ listener.onResponse(OptionalBytesReference.of(witness));
|
|
|
}
|
|
|
}
|
|
|
|