Ver código fonte

Fix failing test `RollupActionSingleNodeTests` `testCannotRollupToExistingIndex` (#89025)

The root cause of this failure was that test testCannotRollupWhileOtherRollupInProgress
would finish before the asynchronously submitted rollup action had not completed. In this
case the test would finish and delete the rollup index, while the rollup process was still trying
 to populate or replicate it. It looks like using the ActionListener.NOOP was not a good choice.

Fixes https://github.com/elastic/elasticsearch/issues/88844
Christos Soulios 3 anos atrás
pai
commit
c741b8c531

+ 94 - 71
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java

@@ -94,9 +94,11 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_METRIC_PARAM;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
 import static org.hamcrest.Matchers.containsString;
 
@@ -163,57 +165,59 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
          * check that the value of the label (last value) matches the value
          * of the corresponding metric which uses a last_value metric type.
          */
-        client().admin()
-            .indices()
-            .prepareCreate(sourceIndex)
-            .setSettings(
-                Settings.builder()
-                    .put("index.number_of_shards", numOfShards)
-                    .put("index.number_of_replicas", numOfReplicas)
-                    .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
-                    .putList(IndexMetadata.INDEX_ROUTING_PATH.getKey(), List.of(FIELD_DIMENSION_1))
-                    .put(
-                        IndexSettings.TIME_SERIES_START_TIME.getKey(),
-                        DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(Instant.ofEpochMilli(startTime).toEpochMilli())
-                    )
-                    .put(IndexSettings.TIME_SERIES_END_TIME.getKey(), "2106-01-08T23:40:53.384Z")
-                    .build()
-            )
-            .setMapping(
-                FIELD_TIMESTAMP,
-                "type=date",
-                FIELD_DIMENSION_1,
-                "type=keyword,time_series_dimension=true",
-                FIELD_DIMENSION_2,
-                "type=long,time_series_dimension=true",
-                FIELD_NUMERIC_1,
-                "type=long,time_series_metric=gauge",
-                FIELD_NUMERIC_2,
-                "type=double,time_series_metric=counter",
-                FIELD_LABEL_DOUBLE,
-                "type=double",
-                FIELD_LABEL_INTEGER,
-                "type=integer",
-                FIELD_LABEL_KEYWORD,
-                "type=keyword",
-                FIELD_LABEL_TEXT,
-                "type=text",
-                FIELD_LABEL_BOOLEAN,
-                "type=boolean",
-                FIELD_METRIC_LABEL_DOUBLE, /* numeric label indexed as a metric */
-                "type=double,time_series_metric=counter",
-                FIELD_LABEL_IPv4_ADDRESS,
-                "type=ip",
-                FIELD_LABEL_IPv6_ADDRESS,
-                "type=ip",
-                FIELD_LABEL_DATE,
-                "type=date,format=date_optional_time",
-                FIELD_LABEL_KEYWORD_ARRAY,
-                "type=keyword",
-                FIELD_LABEL_DOUBLE_ARRAY,
-                "type=double"
-            )
-            .get();
+        assertAcked(
+            client().admin()
+                .indices()
+                .prepareCreate(sourceIndex)
+                .setSettings(
+                    Settings.builder()
+                        .put("index.number_of_shards", numOfShards)
+                        .put("index.number_of_replicas", numOfReplicas)
+                        .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
+                        .putList(IndexMetadata.INDEX_ROUTING_PATH.getKey(), List.of(FIELD_DIMENSION_1))
+                        .put(
+                            IndexSettings.TIME_SERIES_START_TIME.getKey(),
+                            DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(Instant.ofEpochMilli(startTime).toEpochMilli())
+                        )
+                        .put(IndexSettings.TIME_SERIES_END_TIME.getKey(), "2106-01-08T23:40:53.384Z")
+                        .build()
+                )
+                .setMapping(
+                    FIELD_TIMESTAMP,
+                    "type=date",
+                    FIELD_DIMENSION_1,
+                    "type=keyword,time_series_dimension=true",
+                    FIELD_DIMENSION_2,
+                    "type=long,time_series_dimension=true",
+                    FIELD_NUMERIC_1,
+                    "type=long,time_series_metric=gauge",
+                    FIELD_NUMERIC_2,
+                    "type=double,time_series_metric=counter",
+                    FIELD_LABEL_DOUBLE,
+                    "type=double",
+                    FIELD_LABEL_INTEGER,
+                    "type=integer",
+                    FIELD_LABEL_KEYWORD,
+                    "type=keyword",
+                    FIELD_LABEL_TEXT,
+                    "type=text",
+                    FIELD_LABEL_BOOLEAN,
+                    "type=boolean",
+                    FIELD_METRIC_LABEL_DOUBLE, /* numeric label indexed as a metric */
+                    "type=double,time_series_metric=counter",
+                    FIELD_LABEL_IPv4_ADDRESS,
+                    "type=ip",
+                    FIELD_LABEL_IPv6_ADDRESS,
+                    "type=ip",
+                    FIELD_LABEL_DATE,
+                    "type=date,format=date_optional_time",
+                    FIELD_LABEL_KEYWORD_ARRAY,
+                    "type=keyword",
+                    FIELD_LABEL_DOUBLE_ARRAY,
+                    "type=double"
+                )
+                .get()
+        );
     }
 
     public void testRollupIndex() throws IOException {
@@ -285,8 +289,7 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
         logger.info("Updating index [{}] with settings [{}]", sourceIndex, settings);
 
         var updateSettingsReq = new UpdateSettingsRequest(settings, sourceIndex);
-        var r = client().admin().indices().updateSettings(updateSettingsReq).actionGet();
-        assertTrue("Update settings not acked", r.isAcknowledged());
+        assertAcked(client().admin().indices().updateSettings(updateSettingsReq).actionGet());
 
         RollupActionConfig config = new RollupActionConfig(randomInterval());
         SourceSupplier sourceSupplier = () -> {
@@ -361,7 +364,13 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
         prepareSourceIndex(sourceIndex);
 
         // Create an empty index with the same name as the rollup index
-        client().admin().indices().prepareCreate(rollupIndex).get();
+        assertAcked(
+            client().admin()
+                .indices()
+                .prepareCreate(rollupIndex)
+                .setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build())
+                .get()
+        );
         ResourceAlreadyExistsException exception = expectThrows(
             ResourceAlreadyExistsException.class,
             () -> rollup(sourceIndex, rollupIndex, config)
@@ -433,12 +442,31 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
             .endObject();
         bulkIndex(sourceSupplier);
         prepareSourceIndex(sourceIndex);
-        client().execute(RollupAction.INSTANCE, new RollupAction.Request(sourceIndex, rollupIndex, config), ActionListener.noop());
+        var rollupListener = new ActionListener<AcknowledgedResponse>() {
+            boolean success;
+
+            @Override
+            public void onResponse(AcknowledgedResponse acknowledgedResponse) {
+                if (acknowledgedResponse.isAcknowledged()) {
+                    success = true;
+                } else {
+                    fail("Failed to receive rollup acknowledgement");
+                }
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                fail("Rollup failed: " + e.getMessage());
+            }
+        };
+        client().execute(RollupAction.INSTANCE, new RollupAction.Request(sourceIndex, rollupIndex, config), rollupListener);
         ResourceAlreadyExistsException exception = expectThrows(
             ResourceAlreadyExistsException.class,
             () -> rollup(sourceIndex, rollupIndex, config)
         );
         assertThat(exception.getMessage(), containsString(rollupIndex));
+        // We must wait until the in-progress rollup ends, otherwise data will not be cleaned up
+        assertBusy(() -> assertTrue("In progress rollup did not complete", rollupListener.success), 60, TimeUnit.SECONDS);
     }
 
     @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/88800")
@@ -521,23 +549,22 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
 
     private void prepareSourceIndex(String sourceIndex) {
         // Set the source index to read-only state
-        AcknowledgedResponse r = client().admin()
-            .indices()
-            .prepareUpdateSettings(sourceIndex)
-            .setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build())
-            .get();
-        assertTrue(r.isAcknowledged());
+        assertAcked(
+            client().admin()
+                .indices()
+                .prepareUpdateSettings(sourceIndex)
+                .setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build())
+                .get()
+        );
     }
 
     private void rollup(String sourceIndex, String rollupIndex, RollupActionConfig config) {
-        AcknowledgedResponse response = client().execute(RollupAction.INSTANCE, new RollupAction.Request(sourceIndex, rollupIndex, config))
-            .actionGet();
-        assertTrue(response.isAcknowledged());
+        assertAcked(client().execute(RollupAction.INSTANCE, new RollupAction.Request(sourceIndex, rollupIndex, config)).actionGet());
     }
 
     private RolloverResponse rollover(String dataStreamName) throws ExecutionException, InterruptedException {
         RolloverResponse response = client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)).get();
-        assertTrue(response.isAcknowledged());
+        assertAcked(response);
         return response;
     }
 
@@ -887,12 +914,8 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase {
         );
         PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(dataStreamName + "_template")
             .indexTemplate(template);
-        AcknowledgedResponse response = client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
-
-        assertTrue(response.isAcknowledged());
-        assertTrue(
-            client().execute(CreateDataStreamAction.INSTANCE, new CreateDataStreamAction.Request(dataStreamName)).get().isAcknowledged()
-        );
+        assertAcked(client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet());
+        assertAcked(client().execute(CreateDataStreamAction.INSTANCE, new CreateDataStreamAction.Request(dataStreamName)).get());
         return dataStreamName;
     }
 }