浏览代码

[ML] Convert DatafeedContext to a Record (#107009)

Refactor DatafeedContext from a POJO with a Builder to a Record.  This
is mostly to reduce code footprint and improve readability in the
`DatafeedContextProvider` class.  The lambda closures are now responsible
for passing the fields to the DatafeedContext, rather than the Builder.

Close #107008
Pat Whelan 1 年之前
父节点
当前提交
1c923a66bd

+ 6 - 66
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedContext.java

@@ -14,72 +14,12 @@ import org.elasticsearch.xpack.ml.job.persistence.RestartTimeInfo;
 
 import java.util.Objects;
 
-public class DatafeedContext {
+public record DatafeedContext(DatafeedConfig datafeedConfig, Job job, RestartTimeInfo restartTimeInfo, DatafeedTimingStats timingStats) {
 
-    private final DatafeedConfig datafeedConfig;
-    private final Job job;
-    private final RestartTimeInfo restartTimeInfo;
-    private final DatafeedTimingStats timingStats;
-
-    private DatafeedContext(DatafeedConfig datafeedConfig, Job job, RestartTimeInfo restartTimeInfo, DatafeedTimingStats timingStats) {
-        this.datafeedConfig = Objects.requireNonNull(datafeedConfig);
-        this.job = Objects.requireNonNull(job);
-        this.restartTimeInfo = Objects.requireNonNull(restartTimeInfo);
-        this.timingStats = Objects.requireNonNull(timingStats);
-    }
-
-    public DatafeedConfig getDatafeedConfig() {
-        return datafeedConfig;
-    }
-
-    public Job getJob() {
-        return job;
-    }
-
-    public RestartTimeInfo getRestartTimeInfo() {
-        return restartTimeInfo;
-    }
-
-    public DatafeedTimingStats getTimingStats() {
-        return timingStats;
-    }
-
-    static Builder builder() {
-        return new Builder();
-    }
-
-    static class Builder {
-        private volatile DatafeedConfig datafeedConfig;
-        private volatile Job job;
-        private volatile RestartTimeInfo restartTimeInfo;
-        private volatile DatafeedTimingStats timingStats;
-
-        Builder setDatafeedConfig(DatafeedConfig datafeedConfig) {
-            this.datafeedConfig = datafeedConfig;
-            return this;
-        }
-
-        Builder setJob(Job job) {
-            this.job = job;
-            return this;
-        }
-
-        Job getJob() {
-            return job;
-        }
-
-        Builder setRestartTimeInfo(RestartTimeInfo restartTimeInfo) {
-            this.restartTimeInfo = restartTimeInfo;
-            return this;
-        }
-
-        Builder setTimingStats(DatafeedTimingStats timingStats) {
-            this.timingStats = timingStats;
-            return this;
-        }
-
-        DatafeedContext build() {
-            return new DatafeedContext(datafeedConfig, job, restartTimeInfo, timingStats);
-        }
+    public DatafeedContext {
+        Objects.requireNonNull(datafeedConfig);
+        Objects.requireNonNull(job);
+        Objects.requireNonNull(restartTimeInfo);
+        Objects.requireNonNull(timingStats);
     }
 }

+ 2 - 8
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedContextProvider.java

@@ -32,18 +32,12 @@ public class DatafeedContextProvider {
     }
 
     public void buildDatafeedContext(String datafeedId, ActionListener<DatafeedContext> listener) {
-        DatafeedContext.Builder context = DatafeedContext.builder();
-
         datafeedConfigProvider.getDatafeedConfig(datafeedId, null, listener.delegateFailureAndWrap((delegate1, datafeedConfigBuilder) -> {
             DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
-            context.setDatafeedConfig(datafeedConfig);
             jobConfigProvider.getJob(datafeedConfig.getJobId(), null, delegate1.delegateFailureAndWrap((delegate2, jobBuilder) -> {
-                context.setJob(jobBuilder.build());
                 resultsProvider.getRestartTimeInfo(jobBuilder.getId(), delegate2.delegateFailureAndWrap((delegate3, restartTimeInfo) -> {
-                    context.setRestartTimeInfo(restartTimeInfo);
-                    resultsProvider.datafeedTimingStats(context.getJob().getId(), timingStats -> {
-                        context.setTimingStats(timingStats);
-                        delegate3.onResponse(context.build());
+                    resultsProvider.datafeedTimingStats(jobBuilder.getId(), timingStats -> {
+                        delegate3.onResponse(new DatafeedContext(datafeedConfig, jobBuilder.build(), restartTimeInfo, timingStats));
                     }, delegate3::onFailure);
                 }));
             }));

+ 8 - 8
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java

@@ -76,16 +76,16 @@ public class DatafeedJobBuilder {
 
     void build(TransportStartDatafeedAction.DatafeedTask task, DatafeedContext context, ActionListener<DatafeedJob> listener) {
         final ParentTaskAssigningClient parentTaskAssigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task);
-        final DatafeedConfig datafeedConfig = context.getDatafeedConfig();
-        final Job job = context.getJob();
-        final long latestFinalBucketEndMs = context.getRestartTimeInfo().getLatestFinalBucketTimeMs() == null
+        final DatafeedConfig datafeedConfig = context.datafeedConfig();
+        final Job job = context.job();
+        final long latestFinalBucketEndMs = context.restartTimeInfo().getLatestFinalBucketTimeMs() == null
             ? -1
-            : context.getRestartTimeInfo().getLatestFinalBucketTimeMs() + job.getAnalysisConfig().getBucketSpan().millis() - 1;
-        final long latestRecordTimeMs = context.getRestartTimeInfo().getLatestRecordTimeMs() == null
+            : context.restartTimeInfo().getLatestFinalBucketTimeMs() + job.getAnalysisConfig().getBucketSpan().millis() - 1;
+        final long latestRecordTimeMs = context.restartTimeInfo().getLatestRecordTimeMs() == null
             ? -1
-            : context.getRestartTimeInfo().getLatestRecordTimeMs();
+            : context.restartTimeInfo().getLatestRecordTimeMs();
         final DatafeedTimingStatsReporter timingStatsReporter = new DatafeedTimingStatsReporter(
-            context.getTimingStats(),
+            context.timingStats(),
             jobResultsPersister::persistDatafeedTimingStats
         );
 
@@ -130,7 +130,7 @@ public class DatafeedJobBuilder {
                 datafeedConfig.getMaxEmptySearches(),
                 latestFinalBucketEndMs,
                 latestRecordTimeMs,
-                context.getRestartTimeInfo().haveSeenDataPreviously(),
+                context.restartTimeInfo().haveSeenDataPreviously(),
                 delayedDataCheckFreq
             );
 

+ 24 - 24
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java

@@ -129,12 +129,12 @@ public class DatafeedJobBuilderTests extends ESTestCase {
             wasHandlerCalled.compareAndSet(false, true);
         });
 
-        DatafeedContext datafeedContext = DatafeedContext.builder()
-            .setDatafeedConfig(datafeed.build())
-            .setJob(jobBuilder.build())
-            .setRestartTimeInfo(new RestartTimeInfo(null, null, false))
-            .setTimingStats(new DatafeedTimingStats(jobBuilder.getId()))
-            .build();
+        DatafeedContext datafeedContext = new DatafeedContext(
+            datafeed.build(),
+            jobBuilder.build(),
+            new RestartTimeInfo(null, null, false),
+            new DatafeedTimingStats(jobBuilder.getId())
+        );
 
         TransportStartDatafeedAction.DatafeedTask datafeedTask = newDatafeedTask("datafeed1");
 
@@ -159,12 +159,12 @@ public class DatafeedJobBuilderTests extends ESTestCase {
             wasHandlerCalled.compareAndSet(false, true);
         });
 
-        DatafeedContext datafeedContext = DatafeedContext.builder()
-            .setDatafeedConfig(datafeed.build())
-            .setJob(jobBuilder.build())
-            .setRestartTimeInfo(new RestartTimeInfo(3_600_000L, 7_200_000L, false))
-            .setTimingStats(new DatafeedTimingStats(jobBuilder.getId()))
-            .build();
+        DatafeedContext datafeedContext = new DatafeedContext(
+            datafeed.build(),
+            jobBuilder.build(),
+            new RestartTimeInfo(3_800_000L, 7_200_000L, false),
+            new DatafeedTimingStats(jobBuilder.getId())
+        );
 
         TransportStartDatafeedAction.DatafeedTask datafeedTask = newDatafeedTask("datafeed1");
 
@@ -189,12 +189,12 @@ public class DatafeedJobBuilderTests extends ESTestCase {
             wasHandlerCalled.compareAndSet(false, true);
         });
 
-        DatafeedContext datafeedContext = DatafeedContext.builder()
-            .setDatafeedConfig(datafeed.build())
-            .setJob(jobBuilder.build())
-            .setRestartTimeInfo(new RestartTimeInfo(3_800_000L, 3_600_000L, false))
-            .setTimingStats(new DatafeedTimingStats(jobBuilder.getId()))
-            .build();
+        DatafeedContext datafeedContext = new DatafeedContext(
+            datafeed.build(),
+            jobBuilder.build(),
+            new RestartTimeInfo(3_800_000L, 3_600_000L, false),
+            new DatafeedTimingStats(jobBuilder.getId())
+        );
 
         TransportStartDatafeedAction.DatafeedTask datafeedTask = newDatafeedTask("datafeed1");
 
@@ -241,12 +241,12 @@ public class DatafeedJobBuilderTests extends ESTestCase {
             }
         );
 
-        DatafeedContext datafeedContext = DatafeedContext.builder()
-            .setDatafeedConfig(datafeed.build())
-            .setJob(jobBuilder.build())
-            .setRestartTimeInfo(new RestartTimeInfo(null, null, false))
-            .setTimingStats(new DatafeedTimingStats(jobBuilder.getId()))
-            .build();
+        DatafeedContext datafeedContext = new DatafeedContext(
+            datafeed.build(),
+            jobBuilder.build(),
+            new RestartTimeInfo(null, null, false),
+            new DatafeedTimingStats(jobBuilder.getId())
+        );
 
         TransportStartDatafeedAction.DatafeedTask datafeedTask = newDatafeedTask("datafeed1");
 

+ 4 - 9
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedRunnerTests.java

@@ -522,15 +522,10 @@ public class DatafeedRunnerTests extends ESTestCase {
 
     private void givenDatafeedHasNeverRunBefore(Job job, DatafeedConfig datafeed) {
         doAnswer(invocationOnMock -> {
-            @SuppressWarnings("unchecked")
-            ActionListener<DatafeedContext> datafeedContextListener = (ActionListener<DatafeedContext>) invocationOnMock.getArguments()[1];
-            DatafeedContext datafeedContext = DatafeedContext.builder()
-                .setJob(job)
-                .setDatafeedConfig(datafeed)
-                .setRestartTimeInfo(new RestartTimeInfo(null, null, false))
-                .setTimingStats(new DatafeedTimingStats(job.getId()))
-                .build();
-            datafeedContextListener.onResponse(datafeedContext);
+            ActionListener<DatafeedContext> datafeedContextListener = invocationOnMock.getArgument(1);
+            datafeedContextListener.onResponse(
+                new DatafeedContext(datafeed, job, new RestartTimeInfo(null, null, false), new DatafeedTimingStats(job.getId()))
+            );
             return null;
         }).when(datafeedContextProvider).buildDatafeedContext(eq(DATAFEED_ID), any());
     }