Browse Source

[Transform] Integrate "sourceHasChanged" call into failure handling and retry logic. (#92762)

Przemysław Witek 2 years ago
parent
commit
cff3d7b9e9

+ 6 - 0
docs/changelog/92762.yaml

@@ -0,0 +1,6 @@
+pr: 92762
+summary: Integrate "sourceHasChanged" call into failure handling and retry logic
+area: Transform
+type: bug
+issues:
+ - 92133

+ 1 - 15
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

@@ -357,7 +357,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
         // If we are not on the initial batch checkpoint and its the first pass of whatever continuous checkpoint we are on,
         // If we are not on the initial batch checkpoint and its the first pass of whatever continuous checkpoint we are on,
         // we should verify if there are local changes based on the sync config. If not, do not proceed further and exit.
         // we should verify if there are local changes based on the sync config. If not, do not proceed further and exit.
         if (context.getCheckpoint() > 0 && initialRun()) {
         if (context.getCheckpoint() > 0 && initialRun()) {
-            sourceHasChanged(ActionListener.wrap(hasChanged -> {
+            checkpointProvider.sourceHasChanged(getLastCheckpoint(), ActionListener.wrap(hasChanged -> {
                 context.setLastSearchTime(instantOfTrigger);
                 context.setLastSearchTime(instantOfTrigger);
                 hasSourceChanged = hasChanged;
                 hasSourceChanged = hasChanged;
                 if (hasChanged) {
                 if (hasChanged) {
@@ -959,20 +959,6 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
         }
         }
     }
     }
 
 
-    private void sourceHasChanged(ActionListener<Boolean> hasChangedListener) {
-        checkpointProvider.sourceHasChanged(getLastCheckpoint(), ActionListener.wrap(hasChanged -> {
-            logger.trace("[{}] change detected [{}].", getJobId(), hasChanged);
-            hasChangedListener.onResponse(hasChanged);
-        }, e -> {
-            logger.warn(() -> "[" + getJobId() + "] failed to detect changes for transform. Skipping update till next check.", e);
-            auditor.warning(
-                getJobId(),
-                "Failed to detect changes for transform, skipping update till next check. Exception: " + e.getMessage()
-            );
-            hasChangedListener.onResponse(false);
-        }));
-    }
-
     private IterationResult<TransformIndexerPosition> processBuckets(final SearchResponse searchResponse) {
     private IterationResult<TransformIndexerPosition> processBuckets(final SearchResponse searchResponse) {
         Tuple<Stream<IndexRequest>, Map<String, Object>> indexRequestStreamAndCursor = function.processSearchResponse(
         Tuple<Stream<IndexRequest>, Map<String, Object>> indexRequestStreamAndCursor = function.processSearchResponse(
             searchResponse,
             searchResponse,