Browse Source

[Transform] Implement robustness test that constantly creates/starts/stops/deletes a continuous transform (#106313)

Przemysław Witek 1 year ago
parent
commit
bb520eb9d4

+ 141 - 6
x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java

@@ -22,11 +22,14 @@ import org.elasticsearch.search.aggregations.AggregatorFactories;
 import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xpack.core.transform.TransformConfigVersion;
+import org.elasticsearch.xpack.core.transform.TransformField;
 import org.elasticsearch.xpack.core.transform.transforms.QueryConfig;
 import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
+import org.elasticsearch.xpack.core.transform.transforms.SyncConfig;
 import org.elasticsearch.xpack.core.transform.transforms.TimeRetentionPolicyConfig;
 import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
 import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
+import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSource;
 import org.junit.After;
@@ -34,7 +37,9 @@ import org.junit.Before;
 
 import java.io.IOException;
 import java.time.Instant;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -42,9 +47,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.elasticsearch.core.Strings.format;
 import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.oneOf;
@@ -225,6 +233,133 @@ public class TransformIT extends TransformRestTestCase {
         deleteTransform(transformId);
     }
 
+    public void testTransformLifecycleInALoop() throws Exception {
+        String transformId = "lifecycle-in-a-loop";
+        String indexName = transformId + "-src";
+        createReviewsIndex(indexName, 100, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
+
+        String destIndex = transformId + "-dest";
+        String config = createConfig(transformId, indexName, destIndex);
+        for (int i = 0; i < 100; ++i) {
+            long sleepAfterStartMillis = randomLongBetween(0, 5_000);
+            boolean force = randomBoolean();
+            try {
+                // Create the continuous transform
+                putTransform(transformId, config, RequestOptions.DEFAULT);
+                assertThat(getTransformTasks(), is(empty()));
+                assertThatTransformTaskDoesNotExist(transformId);
+
+                startTransform(transformId, RequestOptions.DEFAULT);
+                // There is 1 transform task after start
+                assertThat(getTransformTasks(), hasSize(1));
+                assertThatTransformTaskExists(transformId);
+
+                Thread.sleep(sleepAfterStartMillis);
+                // There should still be 1 transform task as the transform is continuous
+                assertThat(getTransformTasks(), hasSize(1));
+                assertThatTransformTaskExists(transformId);
+
+                // Stop the transform with force set randomly
+                stopTransform(transformId, true, null, false, force);
+                // After the transform is stopped, there should be no transform task left
+                assertThat(getTransformTasks(), is(empty()));
+                assertThatTransformTaskDoesNotExist(transformId);
+
+                // Delete the transform
+                deleteTransform(transformId);
+            } catch (AssertionError | Exception e) {
+                throw new AssertionError(
+                    format(
+                        "Failure at iteration %d (sleepAfterStartMillis=%s,force=%s): %s",
+                        i,
+                        sleepAfterStartMillis,
+                        force,
+                        e.getMessage()
+                    ),
+                    e
+                );
+            }
+        }
+    }
+
+    private String createConfig(String transformId, String sourceIndex, String destIndex) throws Exception {
+        Map<String, SingleGroupSource> groups = new HashMap<>();
+        groups.put("by-day", createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null));
+        groups.put("by-user", new TermsGroupSource("user_id", null, false));
+        groups.put("by-business", new TermsGroupSource("business_id", null, false));
+
+        AggregatorFactories.Builder aggs = AggregatorFactories.builder()
+            .addAggregator(AggregationBuilders.avg("review_score").field("stars"))
+            .addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));
+
+        PivotConfig pivotConfig = createPivotConfig(groups, aggs);
+
+        SyncConfig syncConfig = new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1));
+
+        TransformConfig config = createTransformConfigBuilder(transformId, destIndex, QueryConfig.matchAll(), sourceIndex).setFrequency(
+            TimeValue.timeValueSeconds(1)
+        ).setSyncConfig(syncConfig).setPivotConfig(pivotConfig).build();
+
+        return Strings.toString(config);
+    }
+
+    /**
+     * Returns the list of transform tasks as reported by _tasks API.
+     */
+    @SuppressWarnings("unchecked")
+    protected List<String> getTransformTasks() throws IOException {
+        final Request tasksRequest = new Request("GET", "/_tasks");
+        tasksRequest.addParameter("actions", TransformField.TASK_NAME + "*");
+        final Map<String, Object> tasksResponse = entityAsMap(client().performRequest(tasksRequest));
+
+        Map<String, Object> nodes = (Map<String, Object>) tasksResponse.get("nodes");
+        if (nodes == null) {
+            return List.of();
+        }
+
+        List<String> foundTasks = new ArrayList<>();
+        for (Map.Entry<String, Object> node : nodes.entrySet()) {
+            Map<String, Object> nodeInfo = (Map<String, Object>) node.getValue();
+            Map<String, Object> tasks = (Map<String, Object>) nodeInfo.get("tasks");
+            if (tasks != null) {
+                foundTasks.addAll(tasks.keySet());
+            }
+        }
+        return foundTasks;
+    }
+
+    /**
+     * Verifies that the given transform task exists in cluster state.
+     */
+    private void assertThatTransformTaskExists(String transformId) throws IOException {
+        assertThatTransformTaskCountIsEqualTo(transformId, 1);
+    }
+
+    /**
+     * Verifies that the given transform task does not exist in cluster state.
+     */
+    private void assertThatTransformTaskDoesNotExist(String transformId) throws IOException {
+        assertThatTransformTaskCountIsEqualTo(transformId, 0);
+    }
+
+    /**
+     * Verifies that the number of transform tasks in cluster state for the given transform is as expected.
+     */
+    @SuppressWarnings("unchecked")
+    private void assertThatTransformTaskCountIsEqualTo(String transformId, int expectedCount) throws IOException {
+        Request request = new Request("GET", "_cluster/state");
+        Map<String, Object> response = entityAsMap(adminClient().performRequest(request));
+
+        List<Map<String, Object>> tasks = (List<Map<String, Object>>) XContentMapValues.extractValue(
+            response,
+            "metadata",
+            "persistent_tasks",
+            "tasks"
+        );
+
+        assertThat("Tasks were: " + tasks, tasks.stream().filter(t -> transformId.equals(t.get("id"))).toList(), hasSize(expectedCount));
+    }
+
     public void testContinuousTransformUpdate() throws Exception {
         String indexName = "continuous-reviews-update";
         createReviewsIndex(indexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
@@ -271,7 +406,7 @@ public class TransformIT extends TransformRestTestCase {
         putPipeline.setEntity(new StringEntity(Strings.toString(pipelineBuilder), ContentType.APPLICATION_JSON));
         assertOK(client().performRequest(putPipeline));
 
-        String update = Strings.format("""
+        String update = format("""
             {
                 "description": "updated config",
                 "dest": {
@@ -382,7 +517,7 @@ public class TransformIT extends TransformRestTestCase {
         });
 
         // waitForCheckpoint: true should make the transform continue until we hit the first checkpoint, then it will stop
-        stopTransform(transformId, false, null, true);
+        stopTransform(transformId, false, null, true, false);
 
         // Wait until the first checkpoint
         waitUntilCheckpoint(config.getId(), 1L);
@@ -416,7 +551,7 @@ public class TransformIT extends TransformRestTestCase {
             });
 
             var waitForCompletion = randomBoolean();
-            stopTransform(transformId, waitForCompletion, null, true);
+            stopTransform(transformId, waitForCompletion, null, true, false);
             assertBusy(() -> {
                 var stateAndStats = getBasicTransformStats(transformId);
                 assertThat(stateAndStats.get("state"), equalTo("stopped"));
@@ -467,7 +602,7 @@ public class TransformIT extends TransformRestTestCase {
         // test randomly: with explicit settings and reset to default
         String reqsPerSec = randomBoolean() ? "1000" : "null";
         String maxPageSize = randomBoolean() ? "1000" : "null";
-        String update = Strings.format("""
+        String update = format("""
             {
                 "settings" : {
                     "docs_per_second": %s,
@@ -556,14 +691,14 @@ public class TransformIT extends TransformRestTestCase {
     private void indexMoreDocs(long timestamp, long userId, String index) throws Exception {
         StringBuilder bulkBuilder = new StringBuilder();
         for (int i = 0; i < 25; i++) {
-            bulkBuilder.append(Strings.format("""
+            bulkBuilder.append(format("""
                 {"create":{"_index":"%s"}}
                 """, index));
 
             int stars = (i + 20) % 5;
             long business = (i + 100) % 50;
 
-            String source = Strings.format("""
+            String source = format("""
                 {"user_id":"user_%s","count":%s,"business_id":"business_%s","stars":%s,"timestamp":%s}
                 """, userId, i, business, stars, timestamp);
             bulkBuilder.append(source);

+ 13 - 4
x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

@@ -153,11 +153,16 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
     }
 
     protected void stopTransform(String id) throws IOException {
-        stopTransform(id, true, null, false);
+        stopTransform(id, true, null, false, false);
     }
 
-    protected void stopTransform(String id, boolean waitForCompletion, @Nullable TimeValue timeout, boolean waitForCheckpoint)
-        throws IOException {
+    protected void stopTransform(
+        String id,
+        boolean waitForCompletion,
+        @Nullable TimeValue timeout,
+        boolean waitForCheckpoint,
+        boolean force
+    ) throws IOException {
 
         final Request stopTransformRequest = new Request("POST", TRANSFORM_ENDPOINT + id + "/_stop");
         stopTransformRequest.addParameter(TransformField.WAIT_FOR_COMPLETION.getPreferredName(), Boolean.toString(waitForCompletion));
@@ -165,6 +170,9 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
         if (timeout != null) {
             stopTransformRequest.addParameter(TransformField.TIMEOUT.getPreferredName(), timeout.getStringRep());
         }
+        if (force) {
+            stopTransformRequest.addParameter(TransformField.FORCE.getPreferredName(), "true");
+        }
         assertAcknowledged(client().performRequest(stopTransformRequest));
     }
 
@@ -215,9 +223,10 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
     protected void deleteTransform(String id, boolean force) throws IOException {
         Request request = new Request("DELETE", TRANSFORM_ENDPOINT + id);
         if (force) {
-            request.addParameter("force", "true");
+            request.addParameter(TransformField.FORCE.getPreferredName(), "true");
         }
         assertOK(adminClient().performRequest(request));
+        createdTransformIds.remove(id);
     }
 
     protected Response putTransform(String id, String config, RequestOptions options) throws IOException {