Browse Source

[Transform] Extract common test code to TransformCommonRestTestCase class (#107103)

Przemysław Witek 1 year ago
parent
commit
ee667c40d7

+ 8 - 0
x-pack/plugin/transform/qa/common/build.gradle

@@ -0,0 +1,8 @@
+apply plugin: 'elasticsearch.internal-java-rest-test'
+
+dependencies {
+  api project(':libs:elasticsearch-x-content')
+  api project(':test:framework')
+  api project(xpackModule('core'))
+}
+

+ 127 - 0
x-pack/plugin/transform/qa/common/src/main/java/org/elasticsearch/xpack/transform/integration/common/TransformCommonRestTestCase.java

@@ -0,0 +1,127 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.transform.integration.common;
+
+import org.apache.logging.log4j.Level;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseException;
+import org.elasticsearch.common.xcontent.support.XContentMapValues;
+import org.elasticsearch.test.rest.ESRestTestCase;
+import org.elasticsearch.xpack.core.transform.TransformField;
+import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public abstract class TransformCommonRestTestCase extends ESRestTestCase {
+
+    protected static final String TRANSFORM_ENDPOINT = TransformField.REST_BASE_PATH_TRANSFORMS;
+    protected static final String AUTH_KEY = "Authorization";
+    protected static final String SECONDARY_AUTH_KEY = "es-secondary-authorization";
+
+    protected static String getTransformEndpoint() {
+        return TRANSFORM_ENDPOINT;
+    }
+
+    /**
+     * Returns the list of transform tasks as reported by the _tasks API.
+     */
+    @SuppressWarnings("unchecked")
+    protected List<String> getTransformTasks() throws IOException {
+        Request tasksRequest = new Request("GET", "/_tasks");
+        tasksRequest.addParameter("actions", TransformField.TASK_NAME + "*");
+        Map<String, Object> tasksResponse = entityAsMap(client().performRequest(tasksRequest));
+
+        Map<String, Object> nodes = (Map<String, Object>) tasksResponse.get("nodes");
+        if (nodes == null) {
+            return List.of();
+        }
+
+        List<String> foundTasks = new ArrayList<>();
+        for (Map.Entry<String, Object> node : nodes.entrySet()) {
+            Map<String, Object> nodeInfo = (Map<String, Object>) node.getValue();
+            Map<String, Object> tasks = (Map<String, Object>) nodeInfo.get("tasks");
+            if (tasks != null) {
+                foundTasks.addAll(tasks.keySet());
+            }
+        }
+        return foundTasks;
+    }
+
+    /**
+     * Returns the list of transform tasks for the given transform as reported by the _cluster/state API.
+     */
+    @SuppressWarnings("unchecked")
+    protected List<String> getTransformTasksFromClusterState(String transformId) throws IOException {
+        Request request = new Request("GET", "_cluster/state");
+        Map<String, Object> response = entityAsMap(adminClient().performRequest(request));
+
+        List<Map<String, Object>> tasks = (List<Map<String, Object>>) XContentMapValues.extractValue(
+            response,
+            "metadata",
+            "persistent_tasks",
+            "tasks"
+        );
+
+        return tasks.stream().map(t -> (String) t.get("id")).filter(transformId::equals).toList();
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void logAudits() throws Exception {
+        logger.info("writing audit messages to the log");
+        Request searchRequest = new Request("GET", TransformInternalIndexConstants.AUDIT_INDEX + "/_search?ignore_unavailable=true");
+        searchRequest.setJsonEntity("""
+            {
+              "size": 100,
+              "sort": [ { "timestamp": { "order": "asc" } } ]
+            }""");
+
+        assertBusy(() -> {
+            try {
+                refreshIndex(TransformInternalIndexConstants.AUDIT_INDEX_PATTERN);
+                Response searchResponse = client().performRequest(searchRequest);
+
+                Map<String, Object> searchResult = entityAsMap(searchResponse);
+                List<Map<String, Object>> searchHits = (List<Map<String, Object>>) XContentMapValues.extractValue(
+                    "hits.hits",
+                    searchResult
+                );
+
+                for (Map<String, Object> hit : searchHits) {
+                    Map<String, Object> source = (Map<String, Object>) XContentMapValues.extractValue("_source", hit);
+                    String level = (String) source.getOrDefault("level", "info");
+                    logger.log(
+                        Level.getLevel(level.toUpperCase(Locale.ROOT)),
+                        "Transform audit: [{}] [{}] [{}] [{}]",
+                        Instant.ofEpochMilli((long) source.getOrDefault("timestamp", 0)),
+                        source.getOrDefault("transform_id", "n/a"),
+                        source.getOrDefault("message", "n/a"),
+                        source.getOrDefault("node_name", "n/a")
+                    );
+                }
+            } catch (ResponseException e) {
+                // see gh#54810, wrap temporary 503's as assertion error for retry
+                if (e.getResponse().getStatusLine().getStatusCode() != 503) {
+                    throw e;
+                }
+                throw new AssertionError("Failed to retrieve audit logs", e);
+            }
+        }, 5, TimeUnit.SECONDS);
+    }
+
+    protected void refreshIndex(String index) throws IOException {
+        Request refreshRequest = new Request("POST", index + "/_refresh");
+        assertOK(adminClient().performRequest(refreshRequest));
+    }
+}

+ 1 - 0
x-pack/plugin/transform/qa/multi-node-tests/build.gradle

@@ -3,6 +3,7 @@ apply plugin: 'elasticsearch.legacy-java-rest-test'
 dependencies {
   javaRestTestImplementation(testArtifact(project(xpackModule('core'))))
   javaRestTestImplementation project(path: xpackModule('transform'))
+  javaRestTestImplementation project(path: xpackModule('transform:qa:common'))
 }
 
 // location for keys and certificates

+ 1 - 1
x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/LatestIT.java

@@ -126,7 +126,7 @@ public class LatestIT extends TransformRestTestCase {
         waitUntilCheckpoint(transformConfig.getId(), 1L);
         stopTransform(transformConfig.getId());
 
-        refreshIndex(destIndexName, RequestOptions.DEFAULT);
+        refreshIndex(destIndexName);
         var mappings = getIndexMapping(destIndexName, RequestOptions.DEFAULT);
         assertThat(
             (Map<String, Object>) XContentMapValues.extractValue(destIndexName + ".mappings", mappings),

+ 5 - 65
x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java

@@ -22,7 +22,6 @@ import org.elasticsearch.search.aggregations.AggregatorFactories;
 import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xpack.core.transform.TransformConfigVersion;
-import org.elasticsearch.xpack.core.transform.TransformField;
 import org.elasticsearch.xpack.core.transform.transforms.QueryConfig;
 import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
 import org.elasticsearch.xpack.core.transform.transforms.SyncConfig;
@@ -37,9 +36,7 @@ import org.junit.Before;
 
 import java.io.IOException;
 import java.time.Instant;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -247,23 +244,23 @@ public class TransformIT extends TransformRestTestCase {
                 // Create the continuous transform
                 putTransform(transformId, config, RequestOptions.DEFAULT);
                 assertThat(getTransformTasks(), is(empty()));
-                assertThatTransformTaskDoesNotExist(transformId);
+                assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
 
                 startTransform(transformId, RequestOptions.DEFAULT);
                 // There is 1 transform task after start
                 assertThat(getTransformTasks(), hasSize(1));
-                assertThatTransformTaskExists(transformId);
+                assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));
 
                 Thread.sleep(sleepAfterStartMillis);
                 // There should still be 1 transform task as the transform is continuous
                 assertThat(getTransformTasks(), hasSize(1));
-                assertThatTransformTaskExists(transformId);
+                assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));
 
                 // Stop the transform with force set randomly
                 stopTransform(transformId, true, null, false, force);
                 // After the transform is stopped, there should be no transform task left
                 assertThat(getTransformTasks(), is(empty()));
-                assertThatTransformTaskDoesNotExist(transformId);
+                assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
 
                 // Delete the transform
                 deleteTransform(transformId);
@@ -303,63 +300,6 @@ public class TransformIT extends TransformRestTestCase {
         return Strings.toString(config);
     }
 
-    /**
-     * Returns the list of transform tasks as reported by _tasks API.
-     */
-    @SuppressWarnings("unchecked")
-    protected List<String> getTransformTasks() throws IOException {
-        final Request tasksRequest = new Request("GET", "/_tasks");
-        tasksRequest.addParameter("actions", TransformField.TASK_NAME + "*");
-        final Map<String, Object> tasksResponse = entityAsMap(client().performRequest(tasksRequest));
-
-        Map<String, Object> nodes = (Map<String, Object>) tasksResponse.get("nodes");
-        if (nodes == null) {
-            return List.of();
-        }
-
-        List<String> foundTasks = new ArrayList<>();
-        for (Map.Entry<String, Object> node : nodes.entrySet()) {
-            Map<String, Object> nodeInfo = (Map<String, Object>) node.getValue();
-            Map<String, Object> tasks = (Map<String, Object>) nodeInfo.get("tasks");
-            if (tasks != null) {
-                foundTasks.addAll(tasks.keySet());
-            }
-        }
-        return foundTasks;
-    }
-
-    /**
-     * Verifies that the given transform task exists in cluster state.
-     */
-    private void assertThatTransformTaskExists(String transformId) throws IOException {
-        assertThatTransformTaskCountIsEqualTo(transformId, 1);
-    }
-
-    /**
-     * Verifies that the given transform task does not exist in cluster state.
-     */
-    private void assertThatTransformTaskDoesNotExist(String transformId) throws IOException {
-        assertThatTransformTaskCountIsEqualTo(transformId, 0);
-    }
-
-    /**
-     * Verifies that the number of transform tasks in cluster state for the given transform is as expected.
-     */
-    @SuppressWarnings("unchecked")
-    private void assertThatTransformTaskCountIsEqualTo(String transformId, int expectedCount) throws IOException {
-        Request request = new Request("GET", "_cluster/state");
-        Map<String, Object> response = entityAsMap(adminClient().performRequest(request));
-
-        List<Map<String, Object>> tasks = (List<Map<String, Object>>) XContentMapValues.extractValue(
-            response,
-            "metadata",
-            "persistent_tasks",
-            "tasks"
-        );
-
-        assertThat("Tasks were: " + tasks, tasks.stream().filter(t -> transformId.equals(t.get("id"))).toList(), hasSize(expectedCount));
-    }
-
     public void testContinuousTransformUpdate() throws Exception {
         String indexName = "continuous-reviews-update";
         createReviewsIndex(indexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
@@ -447,7 +387,7 @@ public class TransformIT extends TransformRestTestCase {
             assertOK(searchResponse);
             var responseMap = entityAsMap(searchResponse);
             assertThat((Integer) XContentMapValues.extractValue("hits.total.value", responseMap), greaterThan(0));
-            refreshIndex(dest, RequestOptions.DEFAULT);
+            refreshIndex(dest);
         }, 30, TimeUnit.SECONDS);
 
         stopTransform(config.getId());

+ 2 - 56
x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

@@ -10,7 +10,6 @@ package org.elasticsearch.xpack.transform.integration;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.StringEntity;
-import org.apache.logging.log4j.Level;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.Response;
@@ -27,7 +26,6 @@ import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchModule;
 import org.elasticsearch.search.aggregations.AggregatorFactories;
 import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
-import org.elasticsearch.test.rest.ESRestTestCase;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -40,22 +38,20 @@ import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
 import org.elasticsearch.xpack.core.transform.transforms.QueryConfig;
 import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
 import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
-import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.AggregationConfig;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.DateHistogramGroupSource;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
+import org.elasticsearch.xpack.transform.integration.common.TransformCommonRestTestCase;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.time.Instant;
 import java.time.ZoneId;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -67,9 +63,8 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.core.Is.is;
 
-public abstract class TransformRestTestCase extends ESRestTestCase {
+public abstract class TransformRestTestCase extends TransformCommonRestTestCase {
 
-    protected static String TRANSFORM_ENDPOINT = "/_transform/";
     protected static final String AUTH_KEY = "Authorization";
     protected static final String SECONDARY_AUTH_KEY = "es-secondary-authorization";
 
@@ -81,49 +76,6 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
         waitForPendingTasks();
     }
 
-    @SuppressWarnings("unchecked")
-    private void logAudits() throws Exception {
-        logger.info("writing audit messages to the log");
-        Request searchRequest = new Request("GET", TransformInternalIndexConstants.AUDIT_INDEX + "/_search?ignore_unavailable=true");
-        searchRequest.setJsonEntity("""
-            {
-              "size": 100,
-              "sort": [ { "timestamp": { "order": "asc" } } ]
-            }""");
-
-        assertBusy(() -> {
-            try {
-                refreshIndex(TransformInternalIndexConstants.AUDIT_INDEX_PATTERN, RequestOptions.DEFAULT);
-                Response searchResponse = client().performRequest(searchRequest);
-
-                Map<String, Object> searchResult = entityAsMap(searchResponse);
-                List<Map<String, Object>> searchHits = (List<Map<String, Object>>) XContentMapValues.extractValue(
-                    "hits.hits",
-                    searchResult
-                );
-
-                for (Map<String, Object> hit : searchHits) {
-                    Map<String, Object> source = (Map<String, Object>) XContentMapValues.extractValue("_source", hit);
-                    String level = (String) source.getOrDefault("level", "info");
-                    logger.log(
-                        Level.getLevel(level.toUpperCase(Locale.ROOT)),
-                        "Transform audit: [{}] [{}] [{}] [{}]",
-                        Instant.ofEpochMilli((long) source.getOrDefault("timestamp", 0)),
-                        source.getOrDefault("transform_id", "n/a"),
-                        source.getOrDefault("message", "n/a"),
-                        source.getOrDefault("node_name", "n/a")
-                    );
-                }
-            } catch (ResponseException e) {
-                // see gh#54810, wrap temporary 503's as assertion error for retry
-                if (e.getResponse().getStatusLine().getStatusCode() != 503) {
-                    throw e;
-                }
-                throw new AssertionError("Failed to retrieve audit logs", e);
-            }
-        }, 5, TimeUnit.SECONDS);
-    }
-
     protected void cleanUpTransforms() throws IOException {
         for (String id : createdTransformIds) {
             try {
@@ -140,12 +92,6 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
         createdTransformIds.clear();
     }
 
-    protected void refreshIndex(String index, RequestOptions options) throws IOException {
-        var r = new Request("POST", index + "/_refresh");
-        r.setOptions(options);
-        assertOK(adminClient().performRequest(r));
-    }
-
     protected Map<String, Object> getIndexMapping(String index, RequestOptions options) throws IOException {
         var r = new Request("GET", "/" + index + "/_mapping");
         r.setOptions(options);

+ 2 - 2
x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUsingSearchRuntimeFieldsIT.java

@@ -138,7 +138,7 @@ public class TransformUsingSearchRuntimeFieldsIT extends TransformRestTestCase {
         stopTransform(config.getId());
         assertBusy(() -> { assertEquals("stopped", getTransformState(config.getId())); });
 
-        refreshIndex(destIndexName, RequestOptions.DEFAULT);
+        refreshIndex(destIndexName);
         // Verify destination index mappings
         var mappings = (Map<String, Object>) XContentMapValues.extractValue(
             destIndexName + ".mappings",
@@ -235,7 +235,7 @@ public class TransformUsingSearchRuntimeFieldsIT extends TransformRestTestCase {
         stopTransform(configWithRuntimeFields.getId());
         assertBusy(() -> { assertEquals("stopped", getTransformState(configWithRuntimeFields.getId())); });
 
-        refreshIndex(destIndexName, RequestOptions.DEFAULT);
+        refreshIndex(destIndexName);
         // Verify destination index mappings
         var destIndexMapping = getIndexMapping(destIndexName, RequestOptions.DEFAULT);
 

+ 1 - 1
x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java

@@ -254,7 +254,7 @@ public class TransformContinuousIT extends TransformRestTestCase {
                 source.append("\r\n");
                 doBulk(source.toString(), false);
             }
-            refreshIndex(sourceIndexName, RequestOptions.DEFAULT);
+            refreshIndex(sourceIndexName);
 
             // start all transforms, wait until the processed all data and stop them
             startTransforms();

+ 1 - 0
x-pack/plugin/transform/qa/single-node-tests/build.gradle

@@ -4,6 +4,7 @@ apply plugin: 'elasticsearch.legacy-java-rest-test'
 dependencies {
   javaRestTestImplementation(testArtifact(project(xpackModule('core'))))
   javaRestTestImplementation project(path: xpackModule('transform'))
+  javaRestTestImplementation project(path: xpackModule('transform:qa:common'))
 }
 
 testClusters.configureEach {

+ 3 - 83
x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

@@ -9,33 +9,27 @@ package org.elasticsearch.xpack.transform.integration;
 
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.StringEntity;
-import org.apache.logging.log4j.Level;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.Response;
-import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.SecureString;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
 import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.test.rest.ESRestTestCase;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xpack.core.transform.TransformField;
 import org.elasticsearch.xpack.core.transform.transforms.DestAlias;
 import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
-import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
+import org.elasticsearch.xpack.transform.integration.common.TransformCommonRestTestCase;
 import org.junit.After;
 import org.junit.AfterClass;
 
 import java.io.IOException;
-import java.time.Instant;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -44,10 +38,9 @@ import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 
-public abstract class TransformRestTestCase extends ESRestTestCase {
+public abstract class TransformRestTestCase extends TransformCommonRestTestCase {
 
     protected static final String TEST_PASSWORD = "x-pack-test-password";
-    private static final String SECONDARY_AUTH_KEY = "es-secondary-authorization";
     protected static final SecureString TEST_PASSWORD_SECURE_STRING = new SecureString(TEST_PASSWORD.toCharArray());
     private static final String BASIC_AUTH_VALUE_SUPER_USER = basicAuthHeaderValue("x_pack_rest_user", TEST_PASSWORD_SECURE_STRING);
 
@@ -538,7 +531,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
 
         RequestOptions.Builder options = request.getOptions().toBuilder();
         if (authHeader != null) {
-            options.addHeader("Authorization", authHeader);
+            options.addHeader(AUTH_KEY, authHeader);
         }
         if (secondaryAuthHeader != null) {
             options.addHeader(SECONDARY_AUTH_KEY, secondaryAuthHeader);
@@ -563,10 +556,6 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
         }, 30, TimeUnit.SECONDS);
     }
 
-    void refreshIndex(String index) throws IOException {
-        assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
-    }
-
     @SuppressWarnings("unchecked")
     protected static List<Map<String, Object>> getTransforms(List<Map<String, String>> expectedErrors) throws IOException {
         Request request = new Request("GET", getTransformEndpoint() + "_all");
@@ -688,73 +677,4 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
         int actual = (Integer) ((List<?>) XContentMapValues.extractValue(field, searchResult)).get(0);
         assertEquals(expected, actual);
     }
-
-    protected static String getTransformEndpoint() {
-        return TransformField.REST_BASE_PATH_TRANSFORMS;
-    }
-
-    @SuppressWarnings("unchecked")
-    private void logAudits() throws Exception {
-        logger.info("writing audit messages to the log");
-        Request searchRequest = new Request("GET", TransformInternalIndexConstants.AUDIT_INDEX + "/_search?ignore_unavailable=true");
-        searchRequest.setJsonEntity("""
-            {
-              "size": 100,
-              "sort": [ { "timestamp": { "order": "asc" } } ]
-            }""");
-
-        assertBusy(() -> {
-            try {
-                refreshIndex(TransformInternalIndexConstants.AUDIT_INDEX_PATTERN);
-                Response searchResponse = client().performRequest(searchRequest);
-
-                Map<String, Object> searchResult = entityAsMap(searchResponse);
-                List<Map<String, Object>> searchHits = (List<Map<String, Object>>) XContentMapValues.extractValue(
-                    "hits.hits",
-                    searchResult
-                );
-
-                for (Map<String, Object> hit : searchHits) {
-                    Map<String, Object> source = (Map<String, Object>) XContentMapValues.extractValue("_source", hit);
-                    String level = (String) source.getOrDefault("level", "info");
-                    logger.log(
-                        Level.getLevel(level.toUpperCase(Locale.ROOT)),
-                        "Transform audit: [{}] [{}] [{}] [{}]",
-                        Instant.ofEpochMilli((long) source.getOrDefault("timestamp", 0)),
-                        source.getOrDefault("transform_id", "n/a"),
-                        source.getOrDefault("message", "n/a"),
-                        source.getOrDefault("node_name", "n/a")
-                    );
-                }
-            } catch (ResponseException e) {
-                // see gh#54810, wrap temporary 503's as assertion error for retry
-                if (e.getResponse().getStatusLine().getStatusCode() != 503) {
-                    throw e;
-                }
-                throw new AssertionError("Failed to retrieve audit logs", e);
-            }
-        }, 5, TimeUnit.SECONDS);
-    }
-
-    @SuppressWarnings("unchecked")
-    protected List<String> getTransformTasks() throws IOException {
-        final Request tasksRequest = new Request("GET", "/_tasks");
-        tasksRequest.addParameter("actions", TransformField.TASK_NAME + "*");
-        Map<String, Object> tasksResponse = entityAsMap(client().performRequest(tasksRequest));
-
-        Map<String, Object> nodes = (Map<String, Object>) tasksResponse.get("nodes");
-        if (nodes == null) {
-            return List.of();
-        }
-
-        List<String> foundTasks = new ArrayList<>();
-        for (Map.Entry<String, Object> node : nodes.entrySet()) {
-            Map<String, Object> nodeInfo = (Map<String, Object>) node.getValue();
-            Map<String, Object> tasks = (Map<String, Object>) nodeInfo.get("tasks");
-            if (tasks != null) {
-                foundTasks.addAll(tasks.keySet());
-            }
-        }
-        return foundTasks;
-    }
 }

+ 6 - 0
x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRobustnessIT.java

@@ -95,10 +95,16 @@ public class TransformRobustnessIT extends TransformRestTestCase {
             try {
                 // Create the batch transform
                 createPivotReviewsTransform(transformId, destIndex, null);
+                assertThat(getTransformTasks(), is(empty()));
+                assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
+
                 // Wait until the transform finishes
                 startAndWaitForTransform(transformId, destIndex);
+
                 // After the transform finishes, there should be no transform task left
                 assertThat(getTransformTasks(), is(empty()));
+                assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
+
                 // Delete the transform
                 deleteTransform(transformId);
             } catch (AssertionError | Exception e) {