瀏覽代碼

AbstractSearchableSnapshotsRestTestCase.testClearCache() should wait for cache writes to complete (#70541)

The test AbstractSearchableSnapshotsRestTestCase.testClearCache() 
sometimes fails because it assumes that all cache writes are completed 
when retrieving and comparing the searchable snapshots stat 
cached_bytes_written.

This commit changes the test so that it now waits for searchable 
snapshots thread pools to finish to process cache related tasks 
before retrieving the stats.

Closes #70130
Tanguy Leroux 4 年之前
父節點
當前提交
1e461af9b1

+ 37 - 0
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java

@@ -31,12 +31,15 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.notNullValue;
 
 public abstract class AbstractSearchableSnapshotsRestTestCase extends ESRestTestCase {
 
@@ -212,6 +215,8 @@ public abstract class AbstractSearchableSnapshotsRestTestCase extends ESRestTest
             Map<String, Object> searchResults = search(restoredIndexName, QueryBuilders.matchAllQuery(), Boolean.TRUE);
             assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
 
+            waitForIdlingSearchableSnapshotsThreadPools();
+
             final long bytesInCacheBeforeClear = sumCachedBytesWritten.apply(searchableSnapshotStats(restoredIndexName));
             assertThat(bytesInCacheBeforeClear, greaterThan(0L));
 
@@ -223,6 +228,8 @@ public abstract class AbstractSearchableSnapshotsRestTestCase extends ESRestTest
             searchResults = search(restoredIndexName, QueryBuilders.matchAllQuery(), Boolean.TRUE);
             assertThat(extractValue(searchResults, "hits.total.value"), equalTo(numDocs));
 
+            waitForIdlingSearchableSnapshotsThreadPools();
+
             assertBusy(() -> {
                 final long bytesInCacheAfterSearch = sumCachedBytesWritten.apply(searchableSnapshotStats(restoredIndexName));
                 assertThat(bytesInCacheAfterSearch, greaterThan(bytesInCacheBeforeClear));
@@ -458,6 +465,36 @@ public abstract class AbstractSearchableSnapshotsRestTestCase extends ESRestTest
         return extractValue(responseAsMap(response), index + ".settings");
     }
 
+    @SuppressWarnings("unchecked")
+    protected static void waitForIdlingSearchableSnapshotsThreadPools() throws Exception {
+        final Set<String> searchableSnapshotsThreadPools = Set.of(
+            SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME,
+            SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_NAME
+        );
+        assertBusy(() -> {
+            final Response response = client().performRequest(new Request(HttpGet.METHOD_NAME, "/_nodes/stats/thread_pool"));
+            assertThat(response.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus()));
+
+            final Map<String, Object> nodes = extractValue(responseAsMap(response), "nodes");
+            assertThat(nodes, notNullValue());
+
+            for (String node : nodes.keySet()) {
+                final Map<String, Object> threadPools = extractValue((Map<String, Object>) nodes.get(node), "thread_pool");
+                searchableSnapshotsThreadPools.forEach(threadPoolName -> {
+                    final Map<String, Object> threadPoolStats = (Map<String, Object>) threadPools.get(threadPoolName);
+                    assertThat(threadPoolStats, notNullValue());
+
+                    final Number active = extractValue(threadPoolStats, "active");
+                    assertThat(threadPoolName + " has still active tasks", active.longValue(), equalTo(0L));
+
+                    final Number queue = extractValue(threadPoolStats, "queue");
+                    assertThat(threadPoolName + " has still enqueued tasks", queue.longValue(), equalTo(0L));
+
+                });
+            }
+        }, 30L, TimeUnit.SECONDS);
+    }
+
     @SuppressWarnings("unchecked")
     protected static <T> T extractValue(Map<String, Object> map, String path) {
         return (T) XContentMapValues.extractValue(path, map);