Browse Source

Add an FS repository that simulates network download latencies (#98164)

This commit adds a new repository type `latency-simulating` that
duplicates the behaviour of an fs repository but adds a configurable
latency to blobstore  reads, simulating the time that a call to a remote
object store would take.
Alan Woodward 2 years ago
parent
commit
71e9e7344d

+ 22 - 0
test/external-modules/latency-simulating-directory/build.gradle

@@ -0,0 +1,22 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+esplugin {
+  description 'A test module that simulates read latencies from an object-store based repository'
+  classname 'org.elasticsearch.test.simulatedlatencyrepo.LatencySimulatingRepositoryPlugin'
+}
+
+apply plugin: 'elasticsearch.internal-cluster-test'
+apply plugin: 'elasticsearch.internal-es-plugin'
+
+dependencies {
+  internalClusterTestImplementation project(path: ":x-pack:plugin:core")
+  internalClusterTestImplementation(testArtifact(project(":x-pack:plugin:core")))
+  internalClusterTestImplementation project(path: ":x-pack:plugin:searchable-snapshots")
+  internalClusterTestImplementation project(path: ":x-pack:plugin:blob-cache")
+}

+ 142 - 0
test/external-modules/latency-simulating-directory/src/internalClusterTest/java/org/elasticsearch/test/simulatedlatencyrepo/LatencySimulatingBlobStoreRepositoryTests.java

@@ -0,0 +1,142 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.test.simulatedlatencyrepo;
+
+import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.indices.recovery.RecoverySettings;
+import org.elasticsearch.license.LicenseSettings;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.plugins.RepositoryPlugin;
+import org.elasticsearch.repositories.Repository;
+import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
+import org.elasticsearch.snapshots.SnapshotInfo;
+import org.elasticsearch.xcontent.NamedXContentRegistry;
+import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
+import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;
+import org.junit.AfterClass;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.LongAdder;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+
+public class LatencySimulatingBlobStoreRepositoryTests extends AbstractSnapshotIntegTestCase {
+
+    @Override
+    protected boolean addMockInternalEngine() {
+        return false;
+    }
+
+    public static final String REPO_TYPE = "countingFs";
+
+    public static final LongAdder COUNTS = new LongAdder();
+
+    public static class TestPlugin extends Plugin implements RepositoryPlugin {
+
+        @Override
+        public Map<String, Repository.Factory> getRepositories(
+            Environment env,
+            NamedXContentRegistry namedXContentRegistry,
+            ClusterService clusterService,
+            BigArrays bigArrays,
+            RecoverySettings recoverySettings
+        ) {
+            return Map.of(
+                REPO_TYPE,
+                metadata -> new LatencySimulatingBlobStoreRepository(
+                    metadata,
+                    env,
+                    namedXContentRegistry,
+                    clusterService,
+                    bigArrays,
+                    recoverySettings,
+                    COUNTS::increment
+                )
+            );
+        }
+    }
+
+    @AfterClass
+    public static void resetCounts() {
+        COUNTS.reset();
+    }
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
+        plugins.add(TestPlugin.class);
+        plugins.add(LocalStateSearchableSnapshots.class);
+        return plugins;
+    }
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
+        Settings.Builder builder = Settings.builder();
+        builder.put(otherSettings);
+        builder.put(LicenseSettings.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
+        return builder.build();
+    }
+
+    public void testRetrieveSnapshots() throws Exception {
+        final Client client = client();
+        final String repositoryName = "test-repo";
+
+        logger.info("-->  creating repository");
+        createRepository(repositoryName, REPO_TYPE, Settings.builder().put("location", randomRepoPath()));
+
+        logger.info("--> creating an index and indexing documents");
+        final String indexName = "test-idx";
+        createIndex(indexName);
+        ensureGreen();
+        int numDocs = randomIntBetween(10, 20);
+        for (int i = 0; i < numDocs; i++) {
+            String id = Integer.toString(i);
+            client().prepareIndex(indexName).setId(id).setSource("text", "sometext").get();
+        }
+        indicesAdmin().prepareFlush(indexName).get();
+
+        SnapshotInfo si = createSnapshot(repositoryName, "test-snap-1", List.of(indexName));
+
+        logger.info("--> delete index");
+        assertAcked(client.admin().indices().prepareDelete("test-idx").get());
+
+        logger.info("--> mount snapshot");
+        final MountSearchableSnapshotRequest req = new MountSearchableSnapshotRequest(
+            "test-idx",
+            repositoryName,
+            si.snapshotId().getName(),
+            indexName,
+            Settings.EMPTY,
+            Strings.EMPTY_ARRAY,
+            true,
+            MountSearchableSnapshotRequest.Storage.FULL_COPY
+        );
+
+        final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get();
+        assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
+
+        logger.info("--> run a search");
+        var searchResponse = client.prepareSearch("test-idx").setQuery(QueryBuilders.termQuery("text", "sometext")).get();
+
+        assertThat(searchResponse.getHits().getTotalHits().value, greaterThan(0L));
+        assertThat(COUNTS.intValue(), greaterThan(0));
+    }
+}

+ 43 - 0
test/external-modules/latency-simulating-directory/src/internalClusterTest/java/org/elasticsearch/test/simulatedlatencyrepo/LatencySimulatingRespositoryPluginTests.java

@@ -0,0 +1,43 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.test.simulatedlatencyrepo;
+
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.repositories.Repository;
+import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.hamcrest.Matchers.instanceOf;
+
+public class LatencySimulatingRespositoryPluginTests extends AbstractSnapshotIntegTestCase {
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
+        plugins.add(LatencySimulatingRepositoryPlugin.class);
+        return plugins;
+    }
+
+    public void testRepositoryCanBeConfigured() {
+        String dataNode = internalCluster().startDataOnlyNode();
+        final String repositoryName = "test-repo";
+
+        logger.info("-->  creating repository");
+        createRepository(repositoryName, "latency-simulating", Settings.builder().put("location", randomRepoPath()).put("latency", 150));
+
+        Repository repo = getRepositoryOnNode(repositoryName, dataNode);
+        assertThat(repo, instanceOf(LatencySimulatingBlobStoreRepository.class));
+
+        disableRepoConsistencyCheck("This test checks an empty repository");
+    }
+}

+ 57 - 0
test/external-modules/latency-simulating-directory/src/internalClusterTest/java/org/elasticsearch/test/simulatedlatencyrepo/LocalStateSearchableSnapshots.java

@@ -0,0 +1,57 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.test.simulatedlatencyrepo;
+
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.indices.SystemIndexDescriptor;
+import org.elasticsearch.license.License;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.license.internal.XPackLicenseStatus;
+import org.elasticsearch.plugins.SystemIndexPlugin;
+import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
+import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
+
+import java.nio.file.Path;
+import java.util.Collection;
+
+public class LocalStateSearchableSnapshots extends LocalStateCompositeXPackPlugin implements SystemIndexPlugin {
+
+    private final SearchableSnapshots plugin;
+
+    public LocalStateSearchableSnapshots(final Settings settings, final Path configPath) {
+        super(settings, configPath);
+        this.plugin = new SearchableSnapshots(settings) {
+
+            @Override
+            protected XPackLicenseState getLicenseState() {
+                return new XPackLicenseState(
+                    () -> getEpochMillisSupplier().getAsLong(),
+                    new XPackLicenseStatus(License.OperationMode.TRIAL, true, null)
+                );
+            }
+
+        };
+        plugins.add(plugin);
+    }
+
+    @Override
+    public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
+        return plugin.getSystemIndexDescriptors(settings);
+    }
+
+    @Override
+    public String getFeatureName() {
+        return plugin.getFeatureName();
+    }
+
+    @Override
+    public String getFeatureDescription() {
+        return plugin.getFeatureDescription();
+    }
+}

+ 83 - 0
test/external-modules/latency-simulating-directory/src/main/java/org/elasticsearch/test/simulatedlatencyrepo/LatencySimulatingBlobStoreRepository.java

@@ -0,0 +1,83 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.test.simulatedlatencyrepo;
+
+import org.elasticsearch.cluster.metadata.RepositoryMetadata;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.blobstore.BlobContainer;
+import org.elasticsearch.common.blobstore.BlobPath;
+import org.elasticsearch.common.blobstore.BlobStore;
+import org.elasticsearch.common.blobstore.support.FilterBlobContainer;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.indices.recovery.RecoverySettings;
+import org.elasticsearch.repositories.fs.FsRepository;
+import org.elasticsearch.xcontent.NamedXContentRegistry;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+class LatencySimulatingBlobStoreRepository extends FsRepository {
+
+    private final Runnable simulator;
+
+    protected LatencySimulatingBlobStoreRepository(
+        RepositoryMetadata metadata,
+        Environment env,
+        NamedXContentRegistry namedXContentRegistry,
+        ClusterService clusterService,
+        BigArrays bigArrays,
+        RecoverySettings recoverySettings,
+        Runnable simulator
+    ) {
+        super(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
+        this.simulator = simulator;
+    }
+
+    @Override
+    protected BlobStore createBlobStore() throws Exception {
+        BlobStore fsBlobStore = super.createBlobStore();
+        return new BlobStore() {
+            @Override
+            public BlobContainer blobContainer(BlobPath path) {
+                BlobContainer blobContainer = fsBlobStore.blobContainer(path);
+                return new LatencySimulatingBlobContainer(blobContainer);
+            }
+
+            @Override
+            public void close() throws IOException {
+                fsBlobStore.close();
+            }
+        };
+    }
+
+    private class LatencySimulatingBlobContainer extends FilterBlobContainer {
+
+        LatencySimulatingBlobContainer(BlobContainer delegate) {
+            super(delegate);
+        }
+
+        @Override
+        public InputStream readBlob(String blobName) throws IOException {
+            simulator.run();
+            return super.readBlob(blobName);
+        }
+
+        @Override
+        public InputStream readBlob(String blobName, long position, long length) throws IOException {
+            simulator.run();
+            return super.readBlob(blobName, position, length);
+        }
+
+        @Override
+        protected BlobContainer wrapChild(BlobContainer child) {
+            return new LatencySimulatingBlobContainer(child);
+        }
+    }
+}

+ 66 - 0
test/external-modules/latency-simulating-directory/src/main/java/org/elasticsearch/test/simulatedlatencyrepo/LatencySimulatingRepositoryPlugin.java

@@ -0,0 +1,66 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.test.simulatedlatencyrepo;
+
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.indices.recovery.RecoverySettings;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.plugins.RepositoryPlugin;
+import org.elasticsearch.repositories.Repository;
+import org.elasticsearch.xcontent.NamedXContentRegistry;
+
+import java.util.Map;
+
+/**
+ * Repository wrapping an FsRepository and adding a configurable latency to each
+ * blob store read.
+ */
+public class LatencySimulatingRepositoryPlugin extends Plugin implements RepositoryPlugin {
+
+    public static final String TYPE = "latency-simulating";
+
+    @Override
+    public Map<String, Repository.Factory> getRepositories(
+        Environment env,
+        NamedXContentRegistry namedXContentRegistry,
+        ClusterService clusterService,
+        BigArrays bigArrays,
+        RecoverySettings recoverySettings
+    ) {
+        return Map.of(
+            TYPE,
+            metadata -> new LatencySimulatingBlobStoreRepository(
+                metadata,
+                env,
+                namedXContentRegistry,
+                clusterService,
+                bigArrays,
+                recoverySettings,
+                buildSimulator(metadata.settings())
+            )
+        );
+    }
+
+    private static Runnable buildSimulator(Settings settings) {
+        long sleepyTime = settings.getAsLong("latency", 0L);
+        if (sleepyTime == 0L) {
+            return () -> {};
+        }
+        return () -> {
+            try {
+                Thread.sleep(sleepyTime);
+            } catch (InterruptedException e) {
+                throw new AssertionError("BlobRepository read interrupted!");
+            }
+        };
+    }
+}