소스 검색

Disallow persisting any documents when datafeed is isolated (#46485)

Przemysław Witek 6 년 전
부모
커밋
2fe7a0929e

+ 1 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

@@ -97,6 +97,7 @@ class DatafeedJob {
 
     void isolate() {
         isIsolated = true;
+        timingStatsReporter.disallowPersisting();
     }
 
     boolean isIsolated() {

+ 11 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java

@@ -33,12 +33,15 @@ public class DatafeedTimingStatsReporter {
     private volatile DatafeedTimingStats currentTimingStats;
     /** Object used to persist current timing stats. */
     private final DatafeedTimingStatsPersister persister;
+    /** Whether or not timing stats will be persisted by the persister object. */
+    private volatile boolean allowedPersisting;
 
     public DatafeedTimingStatsReporter(DatafeedTimingStats timingStats, DatafeedTimingStatsPersister persister) {
         Objects.requireNonNull(timingStats);
         this.persistedTimingStats = new DatafeedTimingStats(timingStats);
         this.currentTimingStats = new DatafeedTimingStats(timingStats);
         this.persister = Objects.requireNonNull(persister);
+        this.allowedPersisting = true;
     }
 
     /** Gets current timing stats. */
@@ -79,6 +82,11 @@ public class DatafeedTimingStatsReporter {
         }
     }
 
+    /** Disallows persisting timing stats. After this call finishes, no document will be persisted. */
+    public void disallowPersisting() {
+        allowedPersisting = false;
+    }
+
     private void flushIfDifferSignificantly() {
         if (differSignificantly(currentTimingStats, persistedTimingStats)) {
             flush(WriteRequest.RefreshPolicy.NONE);
@@ -87,7 +95,9 @@ public class DatafeedTimingStatsReporter {
 
     private void flush(WriteRequest.RefreshPolicy refreshPolicy) {
         persistedTimingStats = new DatafeedTimingStats(currentTimingStats);
-        persister.persistDatafeedTimingStats(persistedTimingStats, refreshPolicy);
+        if (allowedPersisting) {
+            persister.persistDatafeedTimingStats(persistedTimingStats, refreshPolicy);
+        }
     }
 
     /**

+ 9 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java

@@ -132,6 +132,15 @@ public class DatafeedTimingStatsReporterTests extends ESTestCase {
         verifyNoMoreInteractions(timingStatsPersister);
     }
 
+    public void testDisallowPersisting() {
+        DatafeedTimingStatsReporter reporter = createReporter(createDatafeedTimingStats(JOB_ID, 0, 0, 0.0));
+        reporter.disallowPersisting();
+        // This call would normally trigger persisting but because of the "disallowPersisting" call above it will not.
+        reporter.reportSearchDuration(ONE_SECOND);
+
+        verifyZeroInteractions(timingStatsPersister);
+    }
+
     public void testTimingStatsDifferSignificantly() {
         assertThat(
             DatafeedTimingStatsReporter.differSignificantly(