소스 검색

Fix BackGroundInder Thread-Leak on Close (#80399)

More of a cosmetic issue but I noticed that this thing still has background
indexing threads running after the `close` returns in many cases. This is a little
confusing and also randomly causes leaked thread warnings after tests here and there.

-> I think it's much cleaner to move this to waiting for full stoppage on close
Armin Braun 4 년 전
부모
커밋
f003bd01bd
13개의 변경된 파일22개의 추가작업 그리고 31개의 파일을 삭제
  1. 1 1
      server/src/internalClusterTest/java/org/elasticsearch/blocks/SimpleBlocksIT.java
  2. 0 1
      server/src/internalClusterTest/java/org/elasticsearch/discovery/DiskDisruptionIT.java
  3. 1 1
      server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java
  4. 1 1
      server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/ReplicaToPrimaryPromotionIT.java
  5. 2 2
      server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseIndexIT.java
  6. 1 1
      server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java
  7. 4 4
      server/src/internalClusterTest/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java
  8. 1 1
      server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java
  9. 2 2
      test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java
  10. 5 9
      test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java
  11. 2 6
      x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java
  12. 1 1
      x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPersistentCacheIntegTests.java
  13. 1 1
      x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/shared/NodesCachesStatsIntegTests.java

+ 1 - 1
server/src/internalClusterTest/java/org/elasticsearch/blocks/SimpleBlocksIT.java

@@ -395,7 +395,7 @@ public class SimpleBlocksIT extends ESIntegTestCase {
         final APIBlock block = randomAddableBlock();
         int nbDocs = 0;
         try {
-            try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), 1000)) {
+            try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, client(), 1000)) {
                 indexer.setFailureAssertion(t -> {
                     Throwable cause = ExceptionsHelper.unwrapCause(t);
                     assertThat(cause, instanceOf(ClusterBlockException.class));

+ 0 - 1
server/src/internalClusterTest/java/org/elasticsearch/discovery/DiskDisruptionIT.java

@@ -128,7 +128,6 @@ public class DiskDisruptionIT extends AbstractDisruptionTestCase {
         try (
             BackgroundIndexer indexer = new BackgroundIndexer(
                 "test",
-                "_doc",
                 client(),
                 -1,
                 RandomizedTest.scaledRandomIntBetween(2, 5),

+ 1 - 1
server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java

@@ -282,7 +282,7 @@ public class IndexRecoveryIT extends AbstractIndexRecoveryIntegTestCase {
         ensureGreen(INDEX_NAME);
 
         final int numOfDocs = scaledRandomIntBetween(0, 200);
-        try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), numOfDocs)) {
+        try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, client(), numOfDocs)) {
             waitForDocs(numOfDocs, indexer);
         }
 

+ 1 - 1
server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/ReplicaToPrimaryPromotionIT.java

@@ -38,7 +38,7 @@ public class ReplicaToPrimaryPromotionIT extends ESIntegTestCase {
 
         final int numOfDocs = scaledRandomIntBetween(0, 200);
         if (numOfDocs > 0) {
-            try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), numOfDocs)) {
+            try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, client(), numOfDocs)) {
                 waitForDocs(numOfDocs, indexer);
             }
             refresh(indexName);

+ 2 - 2
server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseIndexIT.java

@@ -220,7 +220,7 @@ public class CloseIndexIT extends ESIntegTestCase {
         createIndex(indexName);
 
         int nbDocs = 0;
-        try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), MAX_DOCS)) {
+        try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, client(), MAX_DOCS)) {
             indexer.setFailureAssertion(t -> assertException(t, indexName));
 
             waitForDocs(randomIntBetween(10, 50), indexer);
@@ -298,7 +298,7 @@ public class CloseIndexIT extends ESIntegTestCase {
         final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
         createIndex(indexName);
 
-        final BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), MAX_DOCS);
+        final BackgroundIndexer indexer = new BackgroundIndexer(indexName, client(), MAX_DOCS);
         indexer.setFailureAssertion(e -> {});
         waitForDocs(1, indexer);
 

+ 1 - 1
server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java

@@ -102,7 +102,7 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
                     break;
                 default:
                     logger.debug("creating index {} with background indexing", indexName);
-                    final BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), -1, 1);
+                    final BackgroundIndexer indexer = new BackgroundIndexer(indexName, client(), -1, 1);
                     indexers.put(indexName, indexer);
                     indexer.setFailureAssertion(t -> assertException(t, indexName));
                     waitForDocs(1, indexer);

+ 4 - 4
server/src/internalClusterTest/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java

@@ -82,7 +82,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
         final int totalNumDocs = scaledRandomIntBetween(200, 10000);
         int waitFor = totalNumDocs / 10;
         int extraDocs = waitFor;
-        try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client(), extraDocs)) {
+        try (BackgroundIndexer indexer = new BackgroundIndexer("test", client(), extraDocs)) {
             logger.info("--> waiting for {} docs to be indexed ...", waitFor);
             waitForDocs(waitFor, indexer);
             indexer.assertNoFailures();
@@ -146,7 +146,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
         final int totalNumDocs = scaledRandomIntBetween(200, 10000);
         int waitFor = totalNumDocs / 10;
         int extraDocs = waitFor;
-        try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client(), extraDocs)) {
+        try (BackgroundIndexer indexer = new BackgroundIndexer("test", client(), extraDocs)) {
             logger.info("--> waiting for {} docs to be indexed ...", waitFor);
             waitForDocs(waitFor, indexer);
             indexer.assertNoFailures();
@@ -207,7 +207,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
         final int totalNumDocs = scaledRandomIntBetween(200, 10000);
         int waitFor = totalNumDocs / 10;
         int extraDocs = waitFor;
-        try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client(), extraDocs)) {
+        try (BackgroundIndexer indexer = new BackgroundIndexer("test", client(), extraDocs)) {
             logger.info("--> waiting for {} docs to be indexed ...", waitFor);
             waitForDocs(waitFor, indexer);
             indexer.assertNoFailures();
@@ -323,7 +323,7 @@ public class RecoveryWhileUnderLoadIT extends ESIntegTestCase {
 
         final int numDocs = scaledRandomIntBetween(200, 9999);
 
-        try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client(), numDocs)) {
+        try (BackgroundIndexer indexer = new BackgroundIndexer("test", client(), numDocs)) {
 
             for (int i = 0; i < numDocs; i += scaledRandomIntBetween(100, Math.min(1000, numDocs))) {
                 indexer.assertNoFailures();

+ 1 - 1
server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java

@@ -203,7 +203,7 @@ public class RelocationIT extends ESIntegTestCase {
         }
 
         int numDocs = scaledRandomIntBetween(200, 2500);
-        try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type1", client(), numDocs)) {
+        try (BackgroundIndexer indexer = new BackgroundIndexer("test", client(), numDocs)) {
             logger.info("--> waiting for {} docs to be indexed ...", numDocs);
             waitForDocs(numDocs, indexer);
             logger.info("--> {} docs indexed", numDocs);

+ 2 - 2
test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java

@@ -162,7 +162,7 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
         );
 
         final long nbDocs = randomLongBetween(10_000L, 20_000L);
-        try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
+        try (BackgroundIndexer indexer = new BackgroundIndexer(index, client(), (int) nbDocs)) {
             waitForDocs(nbDocs, indexer);
         }
 
@@ -194,7 +194,7 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
         );
 
         final long nbDocs = randomLongBetween(10_000L, 20_000L);
-        try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
+        try (BackgroundIndexer indexer = new BackgroundIndexer(index, client(), (int) nbDocs)) {
             waitForDocs(nbDocs, indexer);
         }
 

+ 5 - 9
test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java

@@ -68,12 +68,11 @@ public class BackgroundIndexer implements AutoCloseable {
      * been indexed.
      *
      * @param index     index name to index into
-     * @param type      document type
      * @param client    client to use
      * @param numOfDocs number of document to index before pausing. Set to -1 to have no limit.
      */
-    public BackgroundIndexer(String index, String type, Client client, int numOfDocs) {
-        this(index, type, client, numOfDocs, RandomizedTest.scaledRandomIntBetween(2, 5));
+    public BackgroundIndexer(String index, Client client, int numOfDocs) {
+        this(index, client, numOfDocs, RandomizedTest.scaledRandomIntBetween(2, 5));
     }
 
     /**
@@ -81,13 +80,12 @@ public class BackgroundIndexer implements AutoCloseable {
      * been indexed.
      *
      * @param index       index name to index into
-     * @param type        document type
      * @param client      client to use
      * @param numOfDocs   number of document to index before pausing. Set to -1 to have no limit.
      * @param writerCount number of indexing threads to use
      */
-    public BackgroundIndexer(String index, String type, Client client, int numOfDocs, final int writerCount) {
-        this(index, type, client, numOfDocs, writerCount, true, null);
+    public BackgroundIndexer(String index, Client client, int numOfDocs, final int writerCount) {
+        this(index, client, numOfDocs, writerCount, true, null);
     }
 
     /**
@@ -95,7 +93,6 @@ public class BackgroundIndexer implements AutoCloseable {
      * been indexed.
      *
      * @param index       index name to index into
-     * @param type        document type
      * @param client      client to use
      * @param numOfDocs   number of document to index before pausing. Set to -1 to have no limit.
      * @param writerCount number of indexing threads to use
@@ -104,7 +101,6 @@ public class BackgroundIndexer implements AutoCloseable {
      */
     public BackgroundIndexer(
         final String index,
-        final String type,
         final Client client,
         final int numOfDocs,
         final int writerCount,
@@ -352,7 +348,7 @@ public class BackgroundIndexer implements AutoCloseable {
 
     @Override
     public void close() throws Exception {
-        stop();
+        stopAndAwaitStopped();
     }
 
     public Client getClient() {

+ 2 - 6
x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java

@@ -169,9 +169,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
         }
 
         logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
-        try (
-            BackgroundIndexer indexer = new BackgroundIndexer("index1", "_doc", leaderClient(), firstBatchNumDocs, randomIntBetween(1, 5))
-        ) {
+        try (BackgroundIndexer indexer = new BackgroundIndexer("index1", leaderClient(), firstBatchNumDocs, randomIntBetween(1, 5))) {
             waitForDocs(randomInt(firstBatchNumDocs), indexer);
             leaderClient().admin().indices().prepareFlush("index1").setWaitIfOngoing(true).get();
             waitForDocs(firstBatchNumDocs, indexer);
@@ -1502,9 +1500,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
         assertTrue(response.isIndexFollowingStarted());
 
         logger.info("Indexing [{}] docs while updating remote config", firstBatchNumDocs);
-        try (
-            BackgroundIndexer indexer = new BackgroundIndexer("index1", "_doc", leaderClient(), firstBatchNumDocs, randomIntBetween(1, 5))
-        ) {
+        try (BackgroundIndexer indexer = new BackgroundIndexer("index1", leaderClient(), firstBatchNumDocs, randomIntBetween(1, 5))) {
 
             ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest().masterNodeTimeout(TimeValue.MAX_VALUE);
             String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();

+ 1 - 1
x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPersistentCacheIntegTests.java

@@ -190,7 +190,7 @@ public class SearchableSnapshotsPersistentCacheIntegTests extends BaseSearchable
         ensureGreen(indexName);
 
         final int numDocs = scaledRandomIntBetween(1_000, 5_000);
-        try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), numDocs)) {
+        try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, client(), numDocs)) {
             waitForDocs(numDocs, indexer);
             indexer.stopAndAwaitStopped();
         }

+ 1 - 1
x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/shared/NodesCachesStatsIntegTests.java

@@ -49,7 +49,7 @@ public class NodesCachesStatsIntegTests extends BaseFrozenSearchableSnapshotsInt
         createIndex(index, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
 
         final int nbDocs = randomIntBetween(1_000, 10_000);
-        try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), nbDocs)) {
+        try (BackgroundIndexer indexer = new BackgroundIndexer(index, client(), nbDocs)) {
             waitForDocs(nbDocs, indexer);
         }
         refresh(index);