|
@@ -43,6 +43,9 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
+import java.nio.file.Files;
|
|
|
+import java.nio.file.Path;
|
|
|
import java.util.Collection;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
@@ -51,6 +54,8 @@ import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.BrokenBarrierException;
|
|
|
import java.util.concurrent.CyclicBarrier;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
@@ -171,6 +176,78 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void testCacheMissOnPopulate() throws Exception {
|
|
|
+ Settings settings = Settings.builder()
|
|
|
+ .put(NODE_NAME_SETTING.getKey(), "node")
|
|
|
+ .put(SharedBlobCacheService.SHARED_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(50)).getStringRep())
|
|
|
+ .put(SharedBlobCacheService.SHARED_CACHE_REGION_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(size(10)).getStringRep())
|
|
|
+ .put("path.home", createTempDir())
|
|
|
+ .build();
|
|
|
+ final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
|
|
|
+ RecordingMeterRegistry recordingMeterRegistry = new RecordingMeterRegistry();
|
|
|
+ BlobCacheMetrics metrics = new BlobCacheMetrics(recordingMeterRegistry);
|
|
|
+ ExecutorService ioExecutor = Executors.newCachedThreadPool();
|
|
|
+ try (
|
|
|
+ NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
|
|
|
+ var cacheService = new SharedBlobCacheService<>(environment, settings, taskQueue.getThreadPool(), ioExecutor, metrics)
|
|
|
+ ) {
|
|
|
+ ByteRange rangeRead = ByteRange.of(0L, 1L);
|
|
|
+ ByteRange rangeWrite = ByteRange.of(0L, 1L);
|
|
|
+ Path tempFile = createTempFile("test", "other");
|
|
|
+ String resourceDescription = tempFile.toAbsolutePath().toString();
|
|
|
+ final var cacheKey = generateCacheKey();
|
|
|
+ SharedBlobCacheService<Object>.CacheFile cacheFile = cacheService.getCacheFile(cacheKey, 1L);
|
|
|
+
|
|
|
+ ByteBuffer writeBuffer = ByteBuffer.allocate(1);
|
|
|
+
|
|
|
+ final int bytesRead = cacheFile.populateAndRead(
|
|
|
+ rangeRead,
|
|
|
+ rangeWrite,
|
|
|
+ (channel, pos, relativePos, len) -> len,
|
|
|
+ (channel, channelPos, streamFactory, relativePos, len, progressUpdater, completionListener) -> {
|
|
|
+ try (var in = Files.newInputStream(tempFile)) {
|
|
|
+ SharedBytes.copyToCacheFileAligned(channel, in, channelPos, progressUpdater, writeBuffer.clear());
|
|
|
+ }
|
|
|
+ ActionListener.completeWith(completionListener, () -> null);
|
|
|
+ },
|
|
|
+ resourceDescription
|
|
|
+ );
|
|
|
+ assertThat(bytesRead, is(1));
|
|
|
+ List<Measurement> measurements = recordingMeterRegistry.getRecorder()
|
|
|
+ .getMeasurements(InstrumentType.LONG_COUNTER, "es.blob_cache.miss_that_triggered_read.total");
|
|
|
+ Measurement first = measurements.getFirst();
|
|
|
+ assertThat(first.attributes().get("file_extension"), is("other"));
|
|
|
+ assertThat(first.value(), is(1L));
|
|
|
+
|
|
|
+ Path tempFile2 = createTempFile("test", "cfs");
|
|
|
+ resourceDescription = tempFile2.toAbsolutePath().toString();
|
|
|
+ cacheFile = cacheService.getCacheFile(generateCacheKey(), 1L);
|
|
|
+
|
|
|
+ ByteBuffer writeBuffer2 = ByteBuffer.allocate(1);
|
|
|
+
|
|
|
+ final int bytesRead2 = cacheFile.populateAndRead(
|
|
|
+ rangeRead,
|
|
|
+ rangeWrite,
|
|
|
+ (channel, pos, relativePos, len) -> len,
|
|
|
+ (channel, channelPos, streamFactory, relativePos, len, progressUpdater, completionListener) -> {
|
|
|
+ try (var in = Files.newInputStream(tempFile2)) {
|
|
|
+ SharedBytes.copyToCacheFileAligned(channel, in, channelPos, progressUpdater, writeBuffer2.clear());
|
|
|
+ }
|
|
|
+ ActionListener.completeWith(completionListener, () -> null);
|
|
|
+ },
|
|
|
+ resourceDescription
|
|
|
+ );
|
|
|
+ assertThat(bytesRead2, is(1));
|
|
|
+
|
|
|
+ measurements = recordingMeterRegistry.getRecorder()
|
|
|
+ .getMeasurements(InstrumentType.LONG_COUNTER, "es.blob_cache.miss_that_triggered_read.total");
|
|
|
+ Measurement measurement = measurements.get(1);
|
|
|
+ assertThat(measurement.attributes().get("file_extension"), is("cfs"));
|
|
|
+ assertThat(measurement.value(), is(1L));
|
|
|
+ }
|
|
|
+ ioExecutor.shutdown();
|
|
|
+ }
|
|
|
+
|
|
|
private static boolean tryEvict(SharedBlobCacheService.CacheFileRegion<Object> region1) {
|
|
|
if (randomBoolean()) {
|
|
|
return region1.tryEvict();
|