Kaynağa Gözat

[Transform] Reset health status on successful noop (#135653)

When there are no results for the current checkpoint, the checkpoint is
considered successful, and the transform will reset its failure count.

This fixes a bug where transform will continue to report unhealthy if it
previously failed to search the source index.

Resolve #135650
Pat Whelan 1 hafta önce
ebeveyn
işleme
4e549cbe4e

+ 6 - 0
docs/changelog/135653.yaml

@@ -0,0 +1,6 @@
+pr: 135653
+summary: Reset health status on successful empty checkpoint
+area: Machine Learning
+type: bug
+issues:
+ - 135650

+ 47 - 9
x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java

@@ -217,7 +217,6 @@ public class TransformIT extends TransformRestTestCase {
      * Verify the basic stats API, which includes state, health, and optionally progress (if it exists).
      * These are required for Kibana 8.13+.
      */
-    @SuppressWarnings("unchecked")
     public void testBasicContinuousTransformStats() throws Exception {
         var transformId = "transform-continuous-basic-stats";
         createContinuousTransform("continuous-basic-stats-reviews", transformId, "reviews-by-user-business-day");
@@ -234,6 +233,50 @@ public class TransformIT extends TransformRestTestCase {
         deleteTransform(transformId);
     }
 
+    public void testEmptySourceIndexClearsErrors() throws Exception {
+        var sourceIndexName = "source-empty-reviews";
+        var destIndexName = "destination-empty-reviews";
+        var transformId = "transform-empty-source-index";
+
+        createReviewsIndexMappings(sourceIndexName, null);
+
+        var config = createTransformConfigBuilder(transformId, destIndexName, QueryConfig.matchAll(), sourceIndexName).setPivotConfig(
+            createPivotConfig(groupByUserOnly(), aggregateScoresAndTimes())
+        )
+            .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)))
+            .setSettings(new SettingsConfig.Builder().setUnattended(true).build())
+            .build();
+
+        putTransform(transformId, Strings.toString(config), RequestOptions.DEFAULT);
+        startTransform(config.getId(), RequestOptions.DEFAULT);
+
+        waitUntilCheckpoint(config.getId(), 1L);
+        assertEquals("green", getTransformHealthStatus(transformId));
+
+        // this will cause the transform to fail to search
+        assertAcknowledged(adminClient().performRequest(new Request("PUT", sourceIndexName + "/_block/read")));
+        assertBusy(() -> assertThat(getTransformHealthStatus(transformId), oneOf("yellow", "red")), 30, TimeUnit.SECONDS);
+
+        // unblock reads on the search index and the transform should recover
+        assertAcknowledged(adminClient().performRequest(new Request("DELETE", sourceIndexName + "/_block/read")));
+        assertBusy(() -> assertEquals("green", getTransformHealthStatus(transformId)), 30, TimeUnit.SECONDS);
+
+        stopTransform(transformId);
+        deleteTransform(transformId);
+        deleteIndex(sourceIndexName);
+        deleteIndex(destIndexName);
+    }
+
+    private Map<String, SingleGroupSource> groupByUserOnly() {
+        return Map.of("by-user", new TermsGroupSource("user_id", null, false));
+    }
+
+    private AggregatorFactories.Builder aggregateScoresAndTimes() {
+        return AggregatorFactories.builder()
+            .addAggregator(AggregationBuilders.avg("review_score").field("stars"))
+            .addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));
+    }
+
     public void testDestinationIndexBlocked() throws Exception {
         var transformId = "transform-continuous-blocked-destination";
         var sourceIndexName = "source-reviews";
@@ -385,12 +428,8 @@ public class TransformIT extends TransformRestTestCase {
         String indexName = "continuous-reviews-update";
         createReviewsIndex(indexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
 
-        Map<String, SingleGroupSource> groups = new HashMap<>();
-        groups.put("by-user", new TermsGroupSource("user_id", null, false));
-
-        AggregatorFactories.Builder aggs = AggregatorFactories.builder()
-            .addAggregator(AggregationBuilders.avg("review_score").field("stars"))
-            .addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));
+        var groups = groupByUserOnly();
+        var aggs = aggregateScoresAndTimes();
 
         String id = "transform-to-update";
         String dest = "reviews-by-user-business-day-to-update";
@@ -481,8 +520,7 @@ public class TransformIT extends TransformRestTestCase {
         String dest = "retention-policy-dest";
         createReviewsIndex(indexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
 
-        Map<String, SingleGroupSource> groups = new HashMap<>();
-        groups.put("by-user", new TermsGroupSource("user_id", null, false));
+        var groups = groupByUserOnly();
 
         AggregatorFactories.Builder aggs = AggregatorFactories.builder()
             .addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));

+ 38 - 31
x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

@@ -223,6 +223,10 @@ public abstract class TransformRestTestCase extends TransformCommonRestTestCase
         return (String) getBasicTransformStats(id).get("state");
     }
 
+    protected String getTransformHealthStatus(String id) throws IOException {
+        return (String) XContentMapValues.extractValue("health.status", getBasicTransformStats(id));
+    }
+
     @SuppressWarnings("unchecked")
     protected Map<String, Object> getTransform(String id) throws IOException {
         var request = new Request("GET", TRANSFORM_ENDPOINT + id);
@@ -390,7 +394,40 @@ public abstract class TransformRestTestCase extends TransformCommonRestTestCase
     ) throws Exception {
         assert numUsers > 0;
 
-        // create mapping
+        createReviewsIndexMappings(indexName, defaultPipeline);
+
+        // create index
+        StringBuilder sourceBuilder = new StringBuilder();
+        for (int i = 0; i < numDocs; i++) {
+            Integer user = userIdProvider.apply(i);
+            int stars = i % 5;
+            long business = i % 50;
+            String dateString = dateStringProvider.apply(i);
+
+            sourceBuilder.append(Strings.format("""
+                {"create":{"_index":"%s"}}
+                """, indexName));
+
+            sourceBuilder.append("{");
+            if (user != null) {
+                sourceBuilder.append("\"user_id\":\"").append("user_").append(user).append("\",");
+            }
+            sourceBuilder.append(Strings.format("""
+                "count":%s,"business_id":"business_%s","stars":%s,"comment":"Great stuff, deserves %s stars","regular_object":\
+                {"foo": 42},"nested_object":{"bar": 43},"timestamp":"%s"}
+                """, i, business, stars, stars, dateString));
+
+            if (i % 100 == 0) {
+                sourceBuilder.append("\r\n");
+                doBulk(sourceBuilder.toString(), false);
+                sourceBuilder.setLength(0);
+            }
+        }
+        sourceBuilder.append("\r\n");
+        doBulk(sourceBuilder.toString(), true);
+    }
+
+    protected void createReviewsIndexMappings(String indexName, String defaultPipeline) throws IOException {
         try (XContentBuilder builder = jsonBuilder()) {
             builder.startObject();
             {
@@ -439,36 +476,6 @@ public abstract class TransformRestTestCase extends TransformCommonRestTestCase
             req.setOptions(RequestOptions.DEFAULT);
             assertOKAndConsume(adminClient().performRequest(req));
         }
-
-        // create index
-        StringBuilder sourceBuilder = new StringBuilder();
-        for (int i = 0; i < numDocs; i++) {
-            Integer user = userIdProvider.apply(i);
-            int stars = i % 5;
-            long business = i % 50;
-            String dateString = dateStringProvider.apply(i);
-
-            sourceBuilder.append(Strings.format("""
-                {"create":{"_index":"%s"}}
-                """, indexName));
-
-            sourceBuilder.append("{");
-            if (user != null) {
-                sourceBuilder.append("\"user_id\":\"").append("user_").append(user).append("\",");
-            }
-            sourceBuilder.append(Strings.format("""
-                "count":%s,"business_id":"business_%s","stars":%s,"comment":"Great stuff, deserves %s stars","regular_object":\
-                {"foo": 42},"nested_object":{"bar": 43},"timestamp":"%s"}
-                """, i, business, stars, stars, dateString));
-
-            if (i % 100 == 0) {
-                sourceBuilder.append("\r\n");
-                doBulk(sourceBuilder.toString(), false);
-                sourceBuilder.setLength(0);
-            }
-        }
-        sourceBuilder.append("\r\n");
-        doBulk(sourceBuilder.toString(), true);
     }
 
     protected void doBulk(String bulkDocuments, boolean refresh) throws IOException {

+ 1 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

@@ -483,6 +483,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
             if (context.shouldStopAtCheckpoint()) {
                 stop();
             }
+            context.resetReasonAndFailureCounter();
             listener.onResponse(null);
             return;
         }