Browse Source

Remove dangling AwaitsFix (#113941) (#114286)

The issue referenced seems to be closed, maybe this just wasn't removed
properly.
Christoph Büscher 1 year ago
parent
commit
029c6836d9

+ 253 - 230
server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java

@@ -258,11 +258,10 @@ public class SearchQueryThenFetchAsyncActionTests extends ESTestCase {
         }
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/101932")
     public void testMinimumVersionSameAsNewVersion() throws Exception {
         var newVersion = VersionInformation.CURRENT;
         var oldVersion = new VersionInformation(
-            VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), VersionUtils.getPreviousVersion()),
+            VersionUtils.randomCompatibleVersion(random(), VersionUtils.getPreviousVersion()),
             IndexVersions.MINIMUM_COMPATIBLE,
             IndexVersionUtils.randomCompatibleVersion(random())
         );
@@ -340,65 +339,69 @@ public class SearchQueryThenFetchAsyncActionTests extends ESTestCase {
         SearchTransportService searchTransportService = new SearchTransportService(null, null, null);
         SearchPhaseController controller = new SearchPhaseController((t, r) -> InternalAggregationTestCase.emptyReduceContextBuilder());
         SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap());
-        QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(
-            searchRequest,
-            EsExecutors.DIRECT_EXECUTOR_SERVICE,
-            new NoopCircuitBreaker(CircuitBreaker.REQUEST),
-            controller,
-            task::isCancelled,
-            task.getProgressListener(),
-            shardsIter.size(),
-            exc -> {}
-        );
-        final List<Object> responses = new ArrayList<>();
-        SearchQueryThenFetchAsyncAction newSearchAsyncAction = new SearchQueryThenFetchAsyncAction(
-            logger,
-            null,
-            searchTransportService,
-            (clusterAlias, node) -> lookup.get(node),
-            Collections.singletonMap("_na_", AliasFilter.EMPTY),
-            Collections.emptyMap(),
-            EsExecutors.DIRECT_EXECUTOR_SERVICE,
-            resultConsumer,
-            searchRequest,
-            new ActionListener<>() {
-                @Override
-                public void onFailure(Exception e) {
-                    responses.add(e);
-                }
+        try (
+            QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(
+                searchRequest,
+                EsExecutors.DIRECT_EXECUTOR_SERVICE,
+                new NoopCircuitBreaker(CircuitBreaker.REQUEST),
+                controller,
+                task::isCancelled,
+                task.getProgressListener(),
+                shardsIter.size(),
+                exc -> {}
+            )
+        ) {
+            final List<Object> responses = new ArrayList<>();
+            SearchQueryThenFetchAsyncAction newSearchAsyncAction = new SearchQueryThenFetchAsyncAction(
+                logger,
+                null,
+                searchTransportService,
+                (clusterAlias, node) -> lookup.get(node),
+                Collections.singletonMap("_na_", AliasFilter.EMPTY),
+                Collections.emptyMap(),
+                EsExecutors.DIRECT_EXECUTOR_SERVICE,
+                resultConsumer,
+                searchRequest,
+                new ActionListener<>() {
+                    @Override
+                    public void onFailure(Exception e) {
+                        responses.add(e);
+                    }
 
-                public void onResponse(SearchResponse response) {
-                    responses.add(response);
-                };
-            },
-            shardsIter,
-            timeProvider,
-            new ClusterState.Builder(new ClusterName("test")).build(),
-            task,
-            SearchResponse.Clusters.EMPTY,
-            null
-        );
+                    public void onResponse(SearchResponse response) {
+                        responses.add(response);
+                    }
 
-        newSearchAsyncAction.start();
-        assertThat(responses, hasSize(1));
-        assertThat(responses.get(0), instanceOf(SearchPhaseExecutionException.class));
-        SearchPhaseExecutionException e = (SearchPhaseExecutionException) responses.get(0);
-        assertThat(e.getCause(), instanceOf(VersionMismatchException.class));
-        assertThat(
-            e.getCause().getMessage(),
-            equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]")
-        );
+                ;
+                },
+                shardsIter,
+                timeProvider,
+                new ClusterState.Builder(new ClusterName("test")).build(),
+                task,
+                SearchResponse.Clusters.EMPTY,
+                null
+            );
+
+            newSearchAsyncAction.start();
+            assertThat(responses, hasSize(1));
+            assertThat(responses.get(0), instanceOf(SearchPhaseExecutionException.class));
+            SearchPhaseExecutionException e = (SearchPhaseExecutionException) responses.get(0);
+            assertThat(e.getCause(), instanceOf(VersionMismatchException.class));
+            assertThat(
+                e.getCause().getMessage(),
+                equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]")
+            );
+        }
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/101932")
     public void testMinimumVersionSameAsOldVersion() throws Exception {
-        Version newVersion = Version.CURRENT;
-        Version oldVersion = VersionUtils.randomVersionBetween(
-            random(),
-            Version.CURRENT.minimumCompatibilityVersion(),
-            VersionUtils.getPreviousVersion(newVersion)
+        var newVersion = VersionInformation.CURRENT;
+        var oldVersion = new VersionInformation(
+            VersionUtils.randomCompatibleVersion(random(), VersionUtils.getPreviousVersion()),
+            IndexVersions.MINIMUM_COMPATIBLE,
+            IndexVersionUtils.randomCompatibleVersion(random())
         );
-        Version minVersion = oldVersion;
+        Version minVersion = oldVersion.nodeVersion();
 
         final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(
             0,
@@ -456,98 +459,106 @@ public class SearchQueryThenFetchAsyncActionTests extends ESTestCase {
                     new SearchShardTarget("node1", new ShardId("idx", "na", shardId), null),
                     null
                 );
-                SortField sortField = new SortField("timestamp", SortField.Type.LONG);
-                if (shardId == 0) {
-                    queryResult.topDocs(
-                        new TopDocsAndMaxScore(
-                            new TopFieldDocs(
-                                new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO),
-                                new FieldDoc[] { new FieldDoc(randomInt(1000), Float.NaN, new Object[] { shardId }) },
-                                new SortField[] { sortField }
+                try {
+                    SortField sortField = new SortField("timestamp", SortField.Type.LONG);
+                    if (shardId == 0) {
+                        queryResult.topDocs(
+                            new TopDocsAndMaxScore(
+                                new TopFieldDocs(
+                                    new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO),
+                                    new FieldDoc[] { new FieldDoc(randomInt(1000), Float.NaN, new Object[] { shardId }) },
+                                    new SortField[] { sortField }
+                                ),
+                                Float.NaN
                             ),
-                            Float.NaN
-                        ),
-                        new DocValueFormat[] { DocValueFormat.RAW }
-                    );
-                } else if (shardId == 1) {
-                    queryResult.topDocs(
-                        new TopDocsAndMaxScore(
-                            new TopFieldDocs(
-                                new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO),
-                                new FieldDoc[] { new FieldDoc(randomInt(1000), Float.NaN, new Object[] { shardId }) },
-                                new SortField[] { sortField }
+                            new DocValueFormat[] { DocValueFormat.RAW }
+                        );
+                    } else if (shardId == 1) {
+                        queryResult.topDocs(
+                            new TopDocsAndMaxScore(
+                                new TopFieldDocs(
+                                    new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO),
+                                    new FieldDoc[] { new FieldDoc(randomInt(1000), Float.NaN, new Object[] { shardId }) },
+                                    new SortField[] { sortField }
+                                ),
+                                Float.NaN
                             ),
-                            Float.NaN
-                        ),
-                        new DocValueFormat[] { DocValueFormat.RAW }
-                    );
+                            new DocValueFormat[] { DocValueFormat.RAW }
+                        );
+                    }
+                    queryResult.from(0);
+                    queryResult.size(1);
+                    successfulOps.incrementAndGet();
+                    queryResult.incRef();
+                    new Thread(() -> ActionListener.respondAndRelease(listener, queryResult)).start();
+                } finally {
+                    queryResult.decRef();
                 }
-                queryResult.from(0);
-                queryResult.size(1);
-                successfulOps.incrementAndGet();
-                new Thread(() -> listener.onResponse(queryResult)).start();
             }
         };
         SearchPhaseController controller = new SearchPhaseController((t, r) -> InternalAggregationTestCase.emptyReduceContextBuilder());
         SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap());
-        QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(
-            searchRequest,
-            EsExecutors.DIRECT_EXECUTOR_SERVICE,
-            new NoopCircuitBreaker(CircuitBreaker.REQUEST),
-            controller,
-            task::isCancelled,
-            task.getProgressListener(),
-            shardsIter.size(),
-            exc -> {}
-        );
-        CountDownLatch latch = new CountDownLatch(1);
-        SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(
-            logger,
-            null,
-            searchTransportService,
-            (clusterAlias, node) -> lookup.get(node),
-            Collections.singletonMap("_na_", AliasFilter.EMPTY),
-            Collections.emptyMap(),
-            EsExecutors.DIRECT_EXECUTOR_SERVICE,
-            resultConsumer,
-            searchRequest,
-            null,
-            shardsIter,
-            timeProvider,
-            new ClusterState.Builder(new ClusterName("test")).build(),
-            task,
-            SearchResponse.Clusters.EMPTY,
-            null
+        try (
+            QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(
+                searchRequest,
+                EsExecutors.DIRECT_EXECUTOR_SERVICE,
+                new NoopCircuitBreaker(CircuitBreaker.REQUEST),
+                controller,
+                task::isCancelled,
+                task.getProgressListener(),
+                shardsIter.size(),
+                exc -> {}
+            )
         ) {
-            @Override
-            protected SearchPhase getNextPhase(SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
-                return new SearchPhase("test") {
-                    @Override
-                    public void run() {
-                        latch.countDown();
-                    }
-                };
-            }
-        };
+            CountDownLatch latch = new CountDownLatch(1);
+            SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(
+                logger,
+                null,
+                searchTransportService,
+                (clusterAlias, node) -> lookup.get(node),
+                Collections.singletonMap("_na_", AliasFilter.EMPTY),
+                Collections.emptyMap(),
+                EsExecutors.DIRECT_EXECUTOR_SERVICE,
+                resultConsumer,
+                searchRequest,
+                null,
+                shardsIter,
+                timeProvider,
+                new ClusterState.Builder(new ClusterName("test")).build(),
+                task,
+                SearchResponse.Clusters.EMPTY,
+                null
+            ) {
+                @Override
+                protected SearchPhase getNextPhase(SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
+                    return new SearchPhase("test") {
+                        @Override
+                        public void run() {
+                            latch.countDown();
+                        }
+                    };
+                }
+            };
 
-        action.start();
-        latch.await();
-        assertThat(successfulOps.get(), equalTo(2));
-        SearchPhaseController.ReducedQueryPhase phase = action.results.reduce();
-        assertThat(phase.numReducePhases(), greaterThanOrEqualTo(1));
-        assertThat(phase.totalHits().value, equalTo(2L));
-        assertThat(phase.totalHits().relation, equalTo(TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO));
+            action.start();
+            latch.await();
+            assertThat(successfulOps.get(), equalTo(2));
+            SearchPhaseController.ReducedQueryPhase phase = action.results.reduce();
+            assertThat(phase.numReducePhases(), greaterThanOrEqualTo(1));
+            assertThat(phase.totalHits().value, equalTo(2L));
+            assertThat(phase.totalHits().relation, equalTo(TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO));
+        }
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/101932")
     public void testMinimumVersionShardDuringPhaseExecution() throws Exception {
-        Version newVersion = Version.CURRENT;
-        Version oldVersion = VersionUtils.randomVersionBetween(
-            random(),
-            Version.CURRENT.minimumCompatibilityVersion(),
-            VersionUtils.getPreviousVersion(newVersion)
+        var newVersion = VersionInformation.CURRENT;
+        var oldVersion = new VersionInformation(
+            VersionUtils.randomCompatibleVersion(random(), VersionUtils.getPreviousVersion()),
+            IndexVersions.MINIMUM_COMPATIBLE,
+            IndexVersionUtils.randomCompatibleVersion(random())
         );
-        Version minVersion = newVersion;
+
+        Version minVersion = newVersion.nodeVersion();
 
         final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(
             0,
@@ -607,111 +618,123 @@ public class SearchQueryThenFetchAsyncActionTests extends ESTestCase {
                     new SearchShardTarget("node1", new ShardId("idx", "na", shardId), null),
                     null
                 );
-                SortField sortField = new SortField("timestamp", SortField.Type.LONG);
-                if (shardId == 0) {
-                    queryResult.topDocs(
-                        new TopDocsAndMaxScore(
-                            new TopFieldDocs(
-                                new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO),
-                                new FieldDoc[] { new FieldDoc(randomInt(1000), Float.NaN, new Object[] { shardId }) },
-                                new SortField[] { sortField }
+                try {
+                    SortField sortField = new SortField("timestamp", SortField.Type.LONG);
+                    if (shardId == 0) {
+                        queryResult.topDocs(
+                            new TopDocsAndMaxScore(
+                                new TopFieldDocs(
+                                    new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO),
+                                    new FieldDoc[] { new FieldDoc(randomInt(1000), Float.NaN, new Object[] { shardId }) },
+                                    new SortField[] { sortField }
+                                ),
+                                Float.NaN
                             ),
-                            Float.NaN
-                        ),
-                        new DocValueFormat[] { DocValueFormat.RAW }
-                    );
-                } else if (shardId == 1) {
-                    queryResult.topDocs(
-                        new TopDocsAndMaxScore(
-                            new TopFieldDocs(
-                                new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO),
-                                new FieldDoc[] { new FieldDoc(randomInt(1000), Float.NaN, new Object[] { shardId }) },
-                                new SortField[] { sortField }
+                            new DocValueFormat[] { DocValueFormat.RAW }
+                        );
+                    } else if (shardId == 1) {
+                        queryResult.topDocs(
+                            new TopDocsAndMaxScore(
+                                new TopFieldDocs(
+                                    new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO),
+                                    new FieldDoc[] { new FieldDoc(randomInt(1000), Float.NaN, new Object[] { shardId }) },
+                                    new SortField[] { sortField }
+                                ),
+                                Float.NaN
                             ),
-                            Float.NaN
-                        ),
-                        new DocValueFormat[] { DocValueFormat.RAW }
-                    );
+                            new DocValueFormat[] { DocValueFormat.RAW }
+                        );
+                    }
+                    queryResult.from(0);
+                    queryResult.size(1);
+                    successfulOps.incrementAndGet();
+                    queryResult.incRef();
+                    new Thread(() -> ActionListener.respondAndRelease(listener, queryResult)).start();
+                } finally {
+                    queryResult.decRef();
                 }
-                queryResult.from(0);
-                queryResult.size(1);
-                successfulOps.incrementAndGet();
-                new Thread(() -> listener.onResponse(queryResult)).start();
             }
         };
         SearchPhaseController controller = new SearchPhaseController((t, r) -> InternalAggregationTestCase.emptyReduceContextBuilder());
         SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap());
-        QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(
-            searchRequest,
-            EsExecutors.DIRECT_EXECUTOR_SERVICE,
-            new NoopCircuitBreaker(CircuitBreaker.REQUEST),
-            controller,
-            task::isCancelled,
-            task.getProgressListener(),
-            shardsIter.size(),
-            exc -> {}
-        );
+
         CountDownLatch latch = new CountDownLatch(1);
-        SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(
-            logger,
-            null,
-            searchTransportService,
-            (clusterAlias, node) -> lookup.get(node),
-            Collections.singletonMap("_na_", AliasFilter.EMPTY),
-            Collections.emptyMap(),
-            EsExecutors.DIRECT_EXECUTOR_SERVICE,
-            resultConsumer,
-            searchRequest,
-            null,
-            shardsIter,
-            timeProvider,
-            new ClusterState.Builder(new ClusterName("test")).build(),
-            task,
-            SearchResponse.Clusters.EMPTY,
-            null
+        try (
+            QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer(
+                searchRequest,
+                EsExecutors.DIRECT_EXECUTOR_SERVICE,
+                new NoopCircuitBreaker(CircuitBreaker.REQUEST),
+                controller,
+                task::isCancelled,
+                task.getProgressListener(),
+                shardsIter.size(),
+                exc -> {}
+            )
         ) {
-            @Override
-            protected SearchPhase getNextPhase(SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
-                return new SearchPhase("test") {
-                    @Override
-                    public void run() {
-                        latch.countDown();
-                    }
-                };
-            }
-        };
-        ShardRouting routingOldVersionShard = ShardRouting.newUnassigned(
-            new ShardId(new Index("idx", "_na_"), 2),
-            true,
-            RecoverySource.EmptyStoreRecoverySource.INSTANCE,
-            new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar"),
-            ShardRouting.Role.DEFAULT
-        );
-        SearchShardIterator shardIt = new SearchShardIterator(
-            null,
-            new ShardId(new Index("idx", "_na_"), 2),
-            singletonList(routingOldVersionShard),
-            idx
-        );
-        routingOldVersionShard = routingOldVersionShard.initialize(oldVersionNode.getId(), "p2", 0);
-        routingOldVersionShard.started();
-        action.start();
-        latch.await();
-        assertThat(successfulOps.get(), equalTo(2));
-        SearchPhaseController.ReducedQueryPhase phase = action.results.reduce();
-        assertThat(phase.numReducePhases(), greaterThanOrEqualTo(1));
-        assertThat(phase.totalHits().value, equalTo(2L));
-        assertThat(phase.totalHits().relation, equalTo(TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO));
-
-        SearchShardTarget searchShardTarget = new SearchShardTarget("node3", shardIt.shardId(), null);
-        SearchActionListener<SearchPhaseResult> listener = new SearchActionListener<SearchPhaseResult>(searchShardTarget, 0) {
-            @Override
-            public void onFailure(Exception e) {}
+            SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(
+                logger,
+                null,
+                searchTransportService,
+                (clusterAlias, node) -> lookup.get(node),
+                Collections.singletonMap("_na_", AliasFilter.EMPTY),
+                Collections.emptyMap(),
+                EsExecutors.DIRECT_EXECUTOR_SERVICE,
+                resultConsumer,
+                searchRequest,
+                null,
+                shardsIter,
+                timeProvider,
+                new ClusterState.Builder(new ClusterName("test")).build(),
+                task,
+                SearchResponse.Clusters.EMPTY,
+                null
+            ) {
+                @Override
+                protected SearchPhase getNextPhase(SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
+                    return new SearchPhase("test") {
+                        @Override
+                        public void run() {
+                            latch.countDown();
+                        }
+                    };
+                }
+            };
+            ShardRouting routingOldVersionShard = ShardRouting.newUnassigned(
+                new ShardId(new Index("idx", "_na_"), 2),
+                true,
+                RecoverySource.EmptyStoreRecoverySource.INSTANCE,
+                new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar"),
+                ShardRouting.Role.DEFAULT
+            );
+            SearchShardIterator shardIt = new SearchShardIterator(
+                null,
+                new ShardId(new Index("idx", "_na_"), 2),
+                singletonList(routingOldVersionShard),
+                idx
+            );
+            routingOldVersionShard = routingOldVersionShard.initialize(oldVersionNode.getId(), "p2", 0);
+            routingOldVersionShard.started();
+            action.start();
+            latch.await();
+            assertThat(successfulOps.get(), equalTo(2));
+            SearchPhaseController.ReducedQueryPhase phase = action.results.reduce();
+            assertThat(phase.numReducePhases(), greaterThanOrEqualTo(1));
+            assertThat(phase.totalHits().value, equalTo(2L));
+            assertThat(phase.totalHits().relation, equalTo(TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO));
 
-            @Override
-            protected void innerOnResponse(SearchPhaseResult response) {}
-        };
-        Exception e = expectThrows(VersionMismatchException.class, () -> action.executePhaseOnShard(shardIt, searchShardTarget, listener));
-        assertThat(e.getMessage(), equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]"));
+            SearchShardTarget searchShardTarget = new SearchShardTarget("node3", shardIt.shardId(), null);
+            SearchActionListener<SearchPhaseResult> listener = new SearchActionListener<SearchPhaseResult>(searchShardTarget, 0) {
+                @Override
+                public void onFailure(Exception e) {}
+
+                @Override
+                protected void innerOnResponse(SearchPhaseResult response) {}
+            };
+            Exception e = expectThrows(
+                VersionMismatchException.class,
+                () -> action.executePhaseOnShard(shardIt, searchShardTarget, listener)
+            );
+            assertThat(e.getMessage(), equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]"));
+        }
     }
 }