|
@@ -24,6 +24,7 @@ import org.elasticsearch.indices.recovery.RecoveryState;
|
|
|
import org.elasticsearch.repositories.fs.FsRepository;
|
|
|
import org.elasticsearch.snapshots.SnapshotInfo;
|
|
|
import org.elasticsearch.test.BackgroundIndexer;
|
|
|
+import org.elasticsearch.test.ESIntegTestCase;
|
|
|
import org.elasticsearch.test.InternalTestCluster;
|
|
|
import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase;
|
|
|
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
|
|
@@ -33,6 +34,7 @@ import java.nio.file.DirectoryStream;
|
|
|
import java.nio.file.Files;
|
|
|
import java.nio.file.Path;
|
|
|
import java.util.HashSet;
|
|
|
+import java.util.List;
|
|
|
import java.util.Locale;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
@@ -46,6 +48,7 @@ import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.greaterThan;
|
|
|
import static org.hamcrest.Matchers.notNullValue;
|
|
|
|
|
|
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
|
|
|
public class SearchableSnapshotsPersistentCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase {
|
|
|
|
|
|
@Override
|
|
@@ -87,6 +90,9 @@ public class SearchableSnapshotsPersistentCacheIntegTests extends BaseSearchable
|
|
|
);
|
|
|
ensureGreen(restoredIndexName);
|
|
|
|
|
|
+ assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
|
|
|
+ assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME);
|
|
|
+
|
|
|
final Index restoredIndex = client().admin()
|
|
|
.cluster()
|
|
|
.prepareState()
|
|
@@ -147,20 +153,20 @@ public class SearchableSnapshotsPersistentCacheIntegTests extends BaseSearchable
|
|
|
}
|
|
|
});
|
|
|
|
|
|
+ ensureGreen(restoredIndexName);
|
|
|
+
|
|
|
+ assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
|
|
|
+ assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME);
|
|
|
+
|
|
|
final CacheService cacheServiceAfterRestart = internalCluster().getInstance(CacheService.class, dataNode);
|
|
|
final PersistentCache persistentCacheAfterRestart = cacheServiceAfterRestart.getPersistentCache();
|
|
|
- ensureGreen(restoredIndexName);
|
|
|
|
|
|
cacheFiles.forEach(cacheFile -> assertTrue(cacheFile + " should have survived node restart", Files.exists(cacheFile)));
|
|
|
assertThat("Cache files should be loaded in cache", persistentCacheAfterRestart.getNumDocs(), equalTo((long) cacheFiles.size()));
|
|
|
|
|
|
assertAcked(client().admin().indices().prepareDelete(restoredIndexName));
|
|
|
-
|
|
|
- assertBusy(() -> {
|
|
|
- cacheFiles.forEach(cacheFile -> assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile)));
|
|
|
- cacheServiceAfterRestart.synchronizeCache();
|
|
|
- assertThat(persistentCacheAfterRestart.getNumDocs(), equalTo(0L));
|
|
|
- });
|
|
|
+ assertBusy(() -> cacheFiles.forEach(cacheFile -> assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile))));
|
|
|
+ assertEmptyPersistentCacheOnDataNodes();
|
|
|
}
|
|
|
|
|
|
public void testPersistentCacheCleanUpAfterRelocation() throws Exception {
|
|
@@ -186,6 +192,7 @@ public class SearchableSnapshotsPersistentCacheIntegTests extends BaseSearchable
|
|
|
final int numDocs = scaledRandomIntBetween(1_000, 5_000);
|
|
|
try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), numDocs)) {
|
|
|
waitForDocs(numDocs, indexer);
|
|
|
+ indexer.stopAndAwaitStopped();
|
|
|
}
|
|
|
refresh(indexName);
|
|
|
|
|
@@ -212,7 +219,6 @@ public class SearchableSnapshotsPersistentCacheIntegTests extends BaseSearchable
|
|
|
.cluster()
|
|
|
.prepareState()
|
|
|
.clear()
|
|
|
- .setRoutingTable(true)
|
|
|
.setMetadata(true)
|
|
|
.setIndices(mountedIndexName)
|
|
|
.get();
|
|
@@ -246,6 +252,9 @@ public class SearchableSnapshotsPersistentCacheIntegTests extends BaseSearchable
|
|
|
|
|
|
ensureGreen(mountedIndexName);
|
|
|
|
|
|
+ assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
|
|
|
+ assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME);
|
|
|
+
|
|
|
recoveryResponse = client().admin().indices().prepareRecoveries(mountedIndexName).get();
|
|
|
assertTrue(recoveryResponse.shardRecoveryStates().containsKey(mountedIndexName));
|
|
|
assertTrue(
|
|
@@ -269,12 +278,26 @@ public class SearchableSnapshotsPersistentCacheIntegTests extends BaseSearchable
|
|
|
|
|
|
logger.info("--> deleting mounted index {}", mountedIndex);
|
|
|
assertAcked(client().admin().indices().prepareDelete(mountedIndexName));
|
|
|
+ assertEmptyPersistentCacheOnDataNodes();
|
|
|
+ }
|
|
|
|
|
|
- assertBusy(() -> {
|
|
|
- for (CacheService cacheService : internalCluster().getDataNodeInstances(CacheService.class)) {
|
|
|
- cacheService.synchronizeCache();
|
|
|
- assertThat(cacheService.getPersistentCache().getNumDocs(), equalTo(0L));
|
|
|
- }
|
|
|
- });
|
|
|
+ private void assertEmptyPersistentCacheOnDataNodes() throws Exception {
|
|
|
+ final Set<DiscoveryNode> dataNodes = new HashSet<>(getDiscoveryNodes().getDataNodes().values());
|
|
|
+ logger.info("--> verifying persistent caches are empty on nodes... {}", dataNodes);
|
|
|
+ try {
|
|
|
+ assertBusy(() -> {
|
|
|
+ for (DiscoveryNode node : List.copyOf(dataNodes)) {
|
|
|
+ final CacheService cacheService = internalCluster().getInstance(CacheService.class, node.getName());
|
|
|
+ cacheService.synchronizeCache();
|
|
|
+ assertThat(cacheService.getPersistentCache().getNumDocs(), equalTo(0L));
|
|
|
+ logger.info("--> persistent cache is empty on node {}", node);
|
|
|
+ dataNodes.remove(node);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ logger.info("--> all persistent caches are empty");
|
|
|
+ } catch (AssertionError ae) {
|
|
|
+ logger.error("--> persistent caches not empty on nodes: {}", dataNodes);
|
|
|
+ throw ae;
|
|
|
+ }
|
|
|
}
|
|
|
}
|