Browse Source

[ML] Rename JobProvider to JobResultsProvider (#32551)

David Kyle 7 years ago
parent
commit
15679315e3
33 changed files with 251 additions and 239 deletions
  1. 6 6
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
  2. 6 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java
  3. 5 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarEventAction.java
  4. 6 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java
  5. 5 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java
  6. 6 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetBucketsAction.java
  7. 8 7
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java
  8. 6 6
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarsAction.java
  9. 6 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCategoriesAction.java
  10. 6 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetInfluencersAction.java
  11. 7 7
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java
  12. 5 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetModelSnapshotsAction.java
  13. 6 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetRecordsAction.java
  14. 6 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java
  15. 6 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java
  16. 14 10
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java
  17. 5 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateCalendarJobAction.java
  18. 5 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateModelSnapshotAction.java
  19. 6 6
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java
  20. 12 16
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java
  21. 3 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java
  22. 8 8
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java
  23. 12 10
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java
  24. 7 7
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java
  25. 9 9
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java
  26. 12 12
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java
  27. 4 4
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java
  28. 4 4
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java
  29. 6 5
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java
  30. 25 25
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java
  31. 8 8
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java
  32. 14 14
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java
  33. 7 7
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java

+ 6 - 6
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -162,7 +162,7 @@ import org.elasticsearch.xpack.ml.job.UpdateJobProcessNotifier;
 import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizer;
 import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizerFactory;
 import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
 import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
 import org.elasticsearch.xpack.ml.job.process.NativeController;
@@ -357,9 +357,9 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
         }
 
         Auditor auditor = new Auditor(client, clusterService.nodeName());
-        JobProvider jobProvider = new JobProvider(client, settings);
+        JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
         UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, client, clusterService, threadPool);
-        JobManager jobManager = new JobManager(env, settings, jobProvider, clusterService, auditor, client, notifier);
+        JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, client, notifier);
 
         JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client);
         JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, client);
@@ -390,10 +390,10 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
         NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory,
                 threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));
         AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(env, settings, client, threadPool,
-                jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
+                jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
                 normalizerFactory, xContentRegistry, auditor);
         this.autodetectProcessManager.set(autodetectProcessManager);
-        DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobProvider, auditor, System::currentTimeMillis);
+        DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis);
         DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
                 System::currentTimeMillis, auditor);
         this.datafeedManager.set(datafeedManager);
@@ -408,7 +408,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
 
         return Arrays.asList(
                 mlLifeCycleService,
-                jobProvider,
+                jobResultsProvider,
                 jobManager,
                 autodetectProcessManager,
                 new MlInitializationService(settings, threadPool, clusterService, client),

+ 6 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java

@@ -24,7 +24,7 @@ import org.elasticsearch.xpack.core.ml.MlMetaIndex;
 import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction;
 import org.elasticsearch.xpack.core.ml.calendars.Calendar;
 import org.elasticsearch.xpack.ml.job.JobManager;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 
 import java.util.function.Supplier;
 
@@ -35,16 +35,17 @@ public class TransportDeleteCalendarAction extends HandledTransportAction<Delete
 
     private final Client client;
     private final JobManager jobManager;
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
 
     @Inject
     public TransportDeleteCalendarAction(Settings settings, TransportService transportService,
-                                         ActionFilters actionFilters, Client client, JobManager jobManager, JobProvider jobProvider) {
+                                         ActionFilters actionFilters, Client client, JobManager jobManager,
+                                         JobResultsProvider jobResultsProvider) {
         super(settings, DeleteCalendarAction.NAME, transportService, actionFilters,
             (Supplier<DeleteCalendarAction.Request>) DeleteCalendarAction.Request::new);
         this.client = client;
         this.jobManager = jobManager;
-        this.jobProvider = jobProvider;
+        this.jobResultsProvider = jobResultsProvider;
     }
 
     @Override
@@ -70,7 +71,7 @@ public class TransportDeleteCalendarAction extends HandledTransportAction<Delete
                 listener::onFailure
         );
 
-        jobProvider.calendar(calendarId, calendarListener);
+        jobResultsProvider.calendar(calendarId, calendarListener);
     }
 
     private DeleteByQueryRequest buildDeleteByQuery(String calendarId) {

+ 5 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarEventAction.java

@@ -26,7 +26,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteCalendarEventAction;
 import org.elasticsearch.xpack.core.ml.calendars.Calendar;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.job.JobManager;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 
 import java.util.Map;
 
@@ -37,16 +37,16 @@ public class TransportDeleteCalendarEventAction extends HandledTransportAction<D
         DeleteCalendarEventAction.Response> {
 
     private final Client client;
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
     private final JobManager jobManager;
 
     @Inject
     public TransportDeleteCalendarEventAction(Settings settings, TransportService transportService, ActionFilters actionFilters,
-                                              Client client, JobProvider jobProvider, JobManager jobManager) {
+                                              Client client, JobResultsProvider jobResultsProvider, JobManager jobManager) {
         super(settings, DeleteCalendarEventAction.NAME, transportService, actionFilters,
               DeleteCalendarEventAction.Request::new);
         this.client = client;
-        this.jobProvider = jobProvider;
+        this.jobResultsProvider = jobResultsProvider;
         this.jobManager = jobManager;
     }
 
@@ -87,7 +87,7 @@ public class TransportDeleteCalendarEventAction extends HandledTransportAction<D
                 }, listener::onFailure);
 
         // Get the calendar first so we check the calendar exists before checking the event exists
-        jobProvider.calendar(request.getCalendarId(), calendarListener);
+        jobResultsProvider.calendar(request.getCalendarId(), calendarListener);
     }
 
     private void deleteEvent(String eventId, Calendar calendar, ActionListener<DeleteCalendarEventAction.Response> listener) {

+ 6 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java

@@ -22,7 +22,7 @@ import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.job.persistence.JobDataDeleter;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.ml.job.JobManager;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.notifications.Auditor;
 
 import java.util.Collections;
@@ -32,17 +32,18 @@ public class TransportDeleteModelSnapshotAction extends HandledTransportAction<D
         DeleteModelSnapshotAction.Response> {
 
     private final Client client;
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
     private final ClusterService clusterService;
     private final Auditor auditor;
 
     @Inject
     public TransportDeleteModelSnapshotAction(Settings settings, TransportService transportService, ActionFilters actionFilters,
-                                              JobProvider jobProvider, ClusterService clusterService, Client client, Auditor auditor) {
+                                              JobResultsProvider jobResultsProvider, ClusterService clusterService, Client client,
+                                              Auditor auditor) {
         super(settings, DeleteModelSnapshotAction.NAME, transportService, actionFilters,
               DeleteModelSnapshotAction.Request::new);
         this.client = client;
-        this.jobProvider = jobProvider;
+        this.jobResultsProvider = jobResultsProvider;
         this.clusterService = clusterService;
         this.auditor = auditor;
     }
@@ -51,7 +52,7 @@ public class TransportDeleteModelSnapshotAction extends HandledTransportAction<D
     protected void doExecute(Task task, DeleteModelSnapshotAction.Request request,
                              ActionListener<DeleteModelSnapshotAction.Response> listener) {
         // Verify the snapshot exists
-        jobProvider.modelSnapshots(
+        jobResultsProvider.modelSnapshots(
                 request.getJobId(), 0, 1, null, null, null, true, request.getSnapshotId(),
                 page -> {
                     List<ModelSnapshot> deleteCandidates = page.results();

+ 5 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java

@@ -24,7 +24,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.job.JobManager;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
 
@@ -40,15 +40,15 @@ public class TransportForecastJobAction extends TransportJobTaskAction<ForecastJ
 
     private static final ByteSizeValue FORECAST_LOCAL_STORAGE_LIMIT = new ByteSizeValue(500, ByteSizeUnit.MB);
 
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
     @Inject
     public TransportForecastJobAction(Settings settings, TransportService transportService,
                                       ClusterService clusterService, ActionFilters actionFilters,
-                                      JobProvider jobProvider, AutodetectProcessManager processManager) {
+                                      JobResultsProvider jobResultsProvider, AutodetectProcessManager processManager) {
         super(settings, ForecastJobAction.NAME, clusterService, transportService, actionFilters,
             ForecastJobAction.Request::new, ForecastJobAction.Response::new,
                 ThreadPool.Names.SAME, processManager);
-        this.jobProvider = jobProvider;
+        this.jobResultsProvider = jobResultsProvider;
         // ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
     }
 
@@ -107,7 +107,7 @@ public class TransportForecastJobAction extends TransportJobTaskAction<ForecastJ
                     }
                 };
 
-                jobProvider.getForecastRequestStats(request.getJobId(), params.getForecastId(),
+                jobResultsProvider.getForecastRequestStats(request.getJobId(), params.getForecastId(),
                         forecastRequestStatsHandler, listener::onFailure);
             } else {
                 listener.onFailure(e);

+ 6 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetBucketsAction.java

@@ -16,22 +16,23 @@ import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
 import org.elasticsearch.xpack.ml.job.JobManager;
 import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 
 import java.util.function.Supplier;
 
 public class TransportGetBucketsAction extends HandledTransportAction<GetBucketsAction.Request, GetBucketsAction.Response> {
 
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
     private final JobManager jobManager;
     private final Client client;
 
     @Inject
     public TransportGetBucketsAction(Settings settings, TransportService transportService,
-                                     ActionFilters actionFilters, JobProvider jobProvider, JobManager jobManager, Client client) {
+                                     ActionFilters actionFilters, JobResultsProvider jobResultsProvider,
+                                     JobManager jobManager, Client client) {
         super(settings, GetBucketsAction.NAME, transportService, actionFilters,
             (Supplier<GetBucketsAction.Request>) GetBucketsAction.Request::new);
-        this.jobProvider = jobProvider;
+        this.jobResultsProvider = jobResultsProvider;
         this.jobManager = jobManager;
         this.client = client;
     }
@@ -59,7 +60,7 @@ public class TransportGetBucketsAction extends HandledTransportAction<GetBuckets
             query.start(request.getStart());
             query.end(request.getEnd());
         }
-        jobProvider.buckets(request.getJobId(), query, q ->
+        jobResultsProvider.buckets(request.getJobId(), query, q ->
                 listener.onResponse(new GetBucketsAction.Response(q)), listener::onFailure, client);
     }
 }

+ 8 - 7
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java

@@ -21,7 +21,7 @@ import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
 import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
 
 import java.util.Collections;
@@ -31,15 +31,16 @@ import java.util.function.Supplier;
 public class TransportGetCalendarEventsAction extends HandledTransportAction<GetCalendarEventsAction.Request,
         GetCalendarEventsAction.Response> {
 
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
     private final ClusterService clusterService;
 
     @Inject
     public TransportGetCalendarEventsAction(Settings settings, TransportService transportService,
-                                            ActionFilters actionFilters, ClusterService clusterService, JobProvider jobProvider) {
+                                            ActionFilters actionFilters, ClusterService clusterService,
+                                            JobResultsProvider jobResultsProvider) {
         super(settings, GetCalendarEventsAction.NAME, transportService, actionFilters,
             (Supplier<GetCalendarEventsAction.Request>) GetCalendarEventsAction.Request::new);
-        this.jobProvider = jobProvider;
+        this.jobResultsProvider = jobResultsProvider;
         this.clusterService = clusterService;
     }
 
@@ -85,9 +86,9 @@ public class TransportGetCalendarEventsAction extends HandledTransportAction<Get
                             jobGroups = job.getGroups();
                         }
 
-                        jobProvider.scheduledEventsForJob(requestId, jobGroups, query, eventsListener);
+                        jobResultsProvider.scheduledEventsForJob(requestId, jobGroups, query, eventsListener);
                     } else {
-                        jobProvider.scheduledEvents(query, eventsListener);
+                        jobResultsProvider.scheduledEvents(query, eventsListener);
                     }
                 },
                 listener::onFailure);
@@ -101,7 +102,7 @@ public class TransportGetCalendarEventsAction extends HandledTransportAction<Get
             return;
         }
 
-        jobProvider.calendar(calendarId, ActionListener.wrap(
+        jobResultsProvider.calendar(calendarId, ActionListener.wrap(
                 c -> listener.onResponse(true),
                 listener::onFailure
         ));

+ 6 - 6
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarsAction.java

@@ -17,20 +17,20 @@ import org.elasticsearch.xpack.core.ml.action.util.PageParams;
 import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
 import org.elasticsearch.xpack.core.ml.calendars.Calendar;
 import org.elasticsearch.xpack.ml.job.persistence.CalendarQueryBuilder;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 
 import java.util.Collections;
 
 public class TransportGetCalendarsAction extends HandledTransportAction<GetCalendarsAction.Request, GetCalendarsAction.Response> {
 
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
 
     @Inject
     public TransportGetCalendarsAction(Settings settings, TransportService transportService,
-                                       ActionFilters actionFilters, JobProvider jobProvider) {
+                                       ActionFilters actionFilters, JobResultsProvider jobResultsProvider) {
         super(settings, GetCalendarsAction.NAME, transportService, actionFilters,
             GetCalendarsAction.Request::new);
-        this.jobProvider = jobProvider;
+        this.jobResultsProvider = jobResultsProvider;
     }
 
     @Override
@@ -49,7 +49,7 @@ public class TransportGetCalendarsAction extends HandledTransportAction<GetCalen
 
     private void getCalendar(String calendarId, ActionListener<GetCalendarsAction.Response> listener) {
 
-        jobProvider.calendar(calendarId, ActionListener.wrap(
+        jobResultsProvider.calendar(calendarId, ActionListener.wrap(
                 calendar -> {
                     QueryPage<Calendar> page = new QueryPage<>(Collections.singletonList(calendar), 1, Calendar.RESULTS_FIELD);
                     listener.onResponse(new GetCalendarsAction.Response(page));
@@ -60,7 +60,7 @@ public class TransportGetCalendarsAction extends HandledTransportAction<GetCalen
 
     private void getCalendars(PageParams pageParams, ActionListener<GetCalendarsAction.Response> listener) {
         CalendarQueryBuilder query = new CalendarQueryBuilder().pageParams(pageParams).sort(true);
-        jobProvider.calendars(query, ActionListener.wrap(
+        jobResultsProvider.calendars(query, ActionListener.wrap(
                 calendars -> {
                     listener.onResponse(new GetCalendarsAction.Response(calendars));
                 },

+ 6 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCategoriesAction.java

@@ -15,22 +15,23 @@ import org.elasticsearch.tasks.Task;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ml.action.GetCategoriesAction;
 import org.elasticsearch.xpack.ml.job.JobManager;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 
 import java.util.function.Supplier;
 
 public class TransportGetCategoriesAction extends HandledTransportAction<GetCategoriesAction.Request, GetCategoriesAction.Response> {
 
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
     private final Client client;
     private final JobManager jobManager;
 
     @Inject
     public TransportGetCategoriesAction(Settings settings, TransportService transportService,
-                                        ActionFilters actionFilters, JobProvider jobProvider, Client client, JobManager jobManager) {
+                                        ActionFilters actionFilters, JobResultsProvider jobResultsProvider,
+                                        Client client, JobManager jobManager) {
         super(settings, GetCategoriesAction.NAME, transportService, actionFilters,
             (Supplier<GetCategoriesAction.Request>) GetCategoriesAction.Request::new);
-        this.jobProvider = jobProvider;
+        this.jobResultsProvider = jobResultsProvider;
         this.client = client;
         this.jobManager = jobManager;
     }
@@ -41,7 +42,7 @@ public class TransportGetCategoriesAction extends HandledTransportAction<GetCate
 
         Integer from = request.getPageParams() != null ? request.getPageParams().getFrom() : null;
         Integer size = request.getPageParams() != null ? request.getPageParams().getSize() : null;
-        jobProvider.categoryDefinitions(request.getJobId(), request.getCategoryId(), true, from, size,
+        jobResultsProvider.categoryDefinitions(request.getJobId(), request.getCategoryId(), true, from, size,
                 r -> listener.onResponse(new GetCategoriesAction.Response(r)), listener::onFailure, client);
     }
 }

+ 6 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetInfluencersAction.java

@@ -16,22 +16,23 @@ import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ml.action.GetInfluencersAction;
 import org.elasticsearch.xpack.ml.job.JobManager;
 import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 
 import java.util.function.Supplier;
 
 public class TransportGetInfluencersAction extends HandledTransportAction<GetInfluencersAction.Request, GetInfluencersAction.Response> {
 
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
     private final Client client;
     private final JobManager jobManager;
 
     @Inject
     public TransportGetInfluencersAction(Settings settings, TransportService transportService,
-                                         ActionFilters actionFilters, JobProvider jobProvider, Client client, JobManager jobManager) {
+                                         ActionFilters actionFilters, JobResultsProvider jobResultsProvider,
+                                         Client client, JobManager jobManager) {
         super(settings, GetInfluencersAction.NAME, transportService, actionFilters,
             (Supplier<GetInfluencersAction.Request>) GetInfluencersAction.Request::new);
-        this.jobProvider = jobProvider;
+        this.jobResultsProvider = jobResultsProvider;
         this.client = client;
         this.jobManager = jobManager;
     }
@@ -49,7 +50,7 @@ public class TransportGetInfluencersAction extends HandledTransportAction<GetInf
                 .influencerScoreThreshold(request.getInfluencerScore())
                 .sortField(request.getSort())
                 .sortDescending(request.isDescending()).build();
-        jobProvider.influencers(request.getJobId(), query,
+        jobResultsProvider.influencers(request.getJobId(), query,
                 page -> listener.onResponse(new GetInfluencersAction.Response(page)), listener::onFailure, client);
     }
 }

+ 7 - 7
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java

@@ -32,7 +32,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
 import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
 
 import java.io.IOException;
@@ -52,18 +52,18 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
 
     private final ClusterService clusterService;
     private final AutodetectProcessManager processManager;
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
 
     @Inject
     public TransportGetJobsStatsAction(Settings settings, TransportService transportService,
                                        ActionFilters actionFilters, ClusterService clusterService,
-                                       AutodetectProcessManager processManager, JobProvider jobProvider) {
+                                       AutodetectProcessManager processManager, JobResultsProvider jobResultsProvider) {
         super(settings, GetJobsStatsAction.NAME, clusterService, transportService, actionFilters,
             GetJobsStatsAction.Request::new, GetJobsStatsAction.Response::new,
                 ThreadPool.Names.MANAGEMENT);
         this.clusterService = clusterService;
         this.processManager = processManager;
-        this.jobProvider = jobProvider;
+        this.jobResultsProvider = jobResultsProvider;
     }
 
     @Override
@@ -158,13 +158,13 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
     }
 
     void gatherForecastStats(String jobId, Consumer<ForecastStats> handler, Consumer<Exception> errorHandler) {
-        jobProvider.getForecastStats(jobId, handler, errorHandler);
+        jobResultsProvider.getForecastStats(jobId, handler, errorHandler);
     }
     
     void gatherDataCountsAndModelSizeStats(String jobId, BiConsumer<DataCounts, ModelSizeStats> handler,
                                                    Consumer<Exception> errorHandler) {
-        jobProvider.dataCounts(jobId, dataCounts -> {
-            jobProvider.modelSizeStats(jobId, modelSizeStats -> {
+        jobResultsProvider.dataCounts(jobId, dataCounts -> {
+            jobResultsProvider.modelSizeStats(jobId, modelSizeStats -> {
                 handler.accept(dataCounts, modelSizeStats);
             }, errorHandler);
         }, errorHandler);

+ 5 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetModelSnapshotsAction.java

@@ -16,22 +16,22 @@ import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
 import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.ml.job.JobManager;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 
 import java.util.stream.Collectors;
 
 public class TransportGetModelSnapshotsAction extends HandledTransportAction<GetModelSnapshotsAction.Request,
         GetModelSnapshotsAction.Response> {
 
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
     private final JobManager jobManager;
 
     @Inject
     public TransportGetModelSnapshotsAction(Settings settings, TransportService transportService,
-                                            ActionFilters actionFilters, JobProvider jobProvider, JobManager jobManager) {
+                                            ActionFilters actionFilters, JobResultsProvider jobResultsProvider, JobManager jobManager) {
         super(settings, GetModelSnapshotsAction.NAME, transportService, actionFilters,
             GetModelSnapshotsAction.Request::new);
-        this.jobProvider = jobProvider;
+        this.jobResultsProvider = jobResultsProvider;
         this.jobManager = jobManager;
     }
 
@@ -45,7 +45,7 @@ public class TransportGetModelSnapshotsAction extends HandledTransportAction<Get
 
         jobManager.getJobOrThrowIfUnknown(request.getJobId());
 
-        jobProvider.modelSnapshots(request.getJobId(), request.getPageParams().getFrom(), request.getPageParams().getSize(),
+        jobResultsProvider.modelSnapshots(request.getJobId(), request.getPageParams().getFrom(), request.getPageParams().getSize(),
                 request.getStart(), request.getEnd(), request.getSort(), request.getDescOrder(), request.getSnapshotId(),
                 page -> {
                     listener.onResponse(new GetModelSnapshotsAction.Response(clearQuantiles(page)));

+ 6 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetRecordsAction.java

@@ -15,23 +15,24 @@ import org.elasticsearch.tasks.Task;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
 import org.elasticsearch.xpack.ml.job.JobManager;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder;
 
 import java.util.function.Supplier;
 
 public class TransportGetRecordsAction extends HandledTransportAction<GetRecordsAction.Request, GetRecordsAction.Response> {
 
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
     private final JobManager jobManager;
     private final Client client;
 
     @Inject
     public TransportGetRecordsAction(Settings settings, TransportService transportService,
-                                     ActionFilters actionFilters, JobProvider jobProvider, JobManager jobManager, Client client) {
+                                     ActionFilters actionFilters, JobResultsProvider jobResultsProvider,
+                                     JobManager jobManager, Client client) {
         super(settings, GetRecordsAction.NAME, transportService, actionFilters,
             (Supplier<GetRecordsAction.Request>) GetRecordsAction.Request::new);
-        this.jobProvider = jobProvider;
+        this.jobResultsProvider = jobResultsProvider;
         this.jobManager = jobManager;
         this.client = client;
     }
@@ -50,7 +51,7 @@ public class TransportGetRecordsAction extends HandledTransportAction<GetRecords
                 .recordScore(request.getRecordScoreFilter())
                 .sortField(request.getSort())
                 .sortDescending(request.isDescending());
-        jobProvider.records(request.getJobId(), query, page ->
+        jobResultsProvider.records(request.getJobId(), query, page ->
                         listener.onResponse(new GetRecordsAction.Response(page)), listener::onFailure, client);
     }
 }

+ 6 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

@@ -64,7 +64,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.MachineLearning;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
 
 import java.io.IOException;
@@ -95,19 +95,20 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
     private final XPackLicenseState licenseState;
     private final PersistentTasksService persistentTasksService;
     private final Client client;
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
 
     @Inject
     public TransportOpenJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
                                   XPackLicenseState licenseState, ClusterService clusterService,
                                   PersistentTasksService persistentTasksService, ActionFilters actionFilters,
-                                  IndexNameExpressionResolver indexNameExpressionResolver, Client client, JobProvider jobProvider) {
+                                  IndexNameExpressionResolver indexNameExpressionResolver, Client client,
+                                  JobResultsProvider jobResultsProvider) {
         super(settings, OpenJobAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
                 OpenJobAction.Request::new);
         this.licenseState = licenseState;
         this.persistentTasksService = persistentTasksService;
         this.client = client;
-        this.jobProvider = jobProvider;
+        this.jobResultsProvider = jobResultsProvider;
     }
 
     /**
@@ -504,7 +505,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
                             Long jobEstablishedModelMemory = job.getEstablishedModelMemory();
                             if ((jobVersion == null || jobVersion.before(Version.V_6_1_0))
                                     && (jobEstablishedModelMemory == null || jobEstablishedModelMemory == 0)) {
-                                jobProvider.getEstablishedMemoryUsage(job.getId(), null, null, establishedModelMemory -> {
+                                jobResultsProvider.getEstablishedMemoryUsage(job.getId(), null, null, establishedModelMemory -> {
                                     if (establishedModelMemory != null && establishedModelMemory > 0) {
                                         JobUpdate update = new JobUpdate.Builder(job.getId())
                                                 .setEstablishedModelMemory(establishedModelMemory).build();

+ 6 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostCalendarEventsAction.java

@@ -27,7 +27,7 @@ import org.elasticsearch.xpack.core.ml.calendars.Calendar;
 import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.job.JobManager;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -40,16 +40,17 @@ public class TransportPostCalendarEventsAction extends HandledTransportAction<Po
         PostCalendarEventsAction.Response> {
 
     private final Client client;
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
     private final JobManager jobManager;
 
     @Inject
     public TransportPostCalendarEventsAction(Settings settings, TransportService transportService,
-                                             ActionFilters actionFilters, Client client, JobProvider jobProvider, JobManager jobManager) {
+                                             ActionFilters actionFilters, Client client,
+                                             JobResultsProvider jobResultsProvider, JobManager jobManager) {
         super(settings, PostCalendarEventsAction.NAME, transportService, actionFilters,
             PostCalendarEventsAction.Request::new);
         this.client = client;
-        this.jobProvider = jobProvider;
+        this.jobResultsProvider = jobResultsProvider;
         this.jobManager = jobManager;
     }
 
@@ -92,6 +93,6 @@ public class TransportPostCalendarEventsAction extends HandledTransportAction<Po
                 },
                 listener::onFailure);
 
-        jobProvider.calendar(request.getCalendarId(), calendarListener);
+        jobResultsProvider.calendar(request.getCalendarId(), calendarListener);
     }
 }

+ 14 - 10
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java

@@ -17,8 +17,10 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
@@ -28,7 +30,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapsho
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.job.JobManager;
 import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 
 import java.util.Date;
 import java.util.function.Consumer;
@@ -38,19 +40,19 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
 
     private final Client client;
     private final JobManager jobManager;
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
     private final JobDataCountsPersister jobDataCountsPersister;
 
     @Inject
     public TransportRevertModelSnapshotAction(Settings settings, ThreadPool threadPool, TransportService transportService,
                                               ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
-                                              JobManager jobManager, JobProvider jobProvider,
+                                              JobManager jobManager, JobResultsProvider jobResultsProvider,
                                               ClusterService clusterService, Client client, JobDataCountsPersister jobDataCountsPersister) {
         super(settings, RevertModelSnapshotAction.NAME, transportService, clusterService, threadPool, actionFilters,
                 indexNameExpressionResolver, RevertModelSnapshotAction.Request::new);
         this.client = client;
         this.jobManager = jobManager;
-        this.jobProvider = jobProvider;
+        this.jobResultsProvider = jobResultsProvider;
         this.jobDataCountsPersister = jobDataCountsPersister;
     }
 
@@ -66,17 +68,19 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
 
     @Override
     protected void masterOperation(RevertModelSnapshotAction.Request request, ClusterState state,
-                                   ActionListener<RevertModelSnapshotAction.Response> listener) throws Exception {
+                                   ActionListener<RevertModelSnapshotAction.Response> listener) {
         logger.debug("Received request to revert to snapshot id '{}' for job '{}', deleting intervening results: {}",
                 request.getSnapshotId(), request.getJobId(), request.getDeleteInterveningResults());
 
-        Job job = JobManager.getJobOrThrowIfUnknown(request.getJobId(), clusterService.state());
-        JobState jobState = jobManager.getJobState(job.getId());
+        Job job = JobManager.getJobOrThrowIfUnknown(request.getJobId(), state);
+        PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
+        JobState jobState = MlTasks.getJobState(job.getId(), tasks);
+
         if (jobState.equals(JobState.CLOSED) == false) {
             throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT));
         }
 
-        getModelSnapshot(request, jobProvider, modelSnapshot -> {
+        getModelSnapshot(request, jobResultsProvider, modelSnapshot -> {
             ActionListener<RevertModelSnapshotAction.Response> wrappedListener = listener;
             if (request.getDeleteInterveningResults()) {
                 wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, request.getJobId());
@@ -86,7 +90,7 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
         }, listener::onFailure);
     }
 
-    private void getModelSnapshot(RevertModelSnapshotAction.Request request, JobProvider provider, Consumer<ModelSnapshot> handler,
+    private void getModelSnapshot(RevertModelSnapshotAction.Request request, JobResultsProvider provider, Consumer<ModelSnapshot> handler,
                                   Consumer<Exception> errorHandler) {
         logger.info("Reverting to snapshot '" + request.getSnapshotId() + "'");
 
@@ -134,7 +138,7 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
 
 
         return ActionListener.wrap(response -> {
-            jobProvider.dataCounts(jobId, counts -> {
+            jobResultsProvider.dataCounts(jobId, counts -> {
                 counts.setLatestRecordTimeStamp(modelSnapshot.getLatestRecordTimeStamp());
                 jobDataCountsPersister.persistDataCounts(jobId, counts, new ActionListener<Boolean>() {
                     @Override

+ 5 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateCalendarJobAction.java

@@ -16,20 +16,20 @@ import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ml.action.PutCalendarAction;
 import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction;
 import org.elasticsearch.xpack.ml.job.JobManager;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 
 import java.util.Set;
 
 public class TransportUpdateCalendarJobAction extends HandledTransportAction<UpdateCalendarJobAction.Request, PutCalendarAction.Response> {
 
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
     private final JobManager jobManager;
 
     @Inject
     public TransportUpdateCalendarJobAction(Settings settings, TransportService transportService,
-                                            ActionFilters actionFilters, JobProvider jobProvider, JobManager jobManager) {
+                                            ActionFilters actionFilters, JobResultsProvider jobResultsProvider, JobManager jobManager) {
         super(settings, UpdateCalendarJobAction.NAME, transportService, actionFilters, UpdateCalendarJobAction.Request::new);
-        this.jobProvider = jobProvider;
+        this.jobResultsProvider = jobResultsProvider;
         this.jobManager = jobManager;
     }
 
@@ -38,7 +38,7 @@ public class TransportUpdateCalendarJobAction extends HandledTransportAction<Upd
         Set<String> jobIdsToAdd = Strings.tokenizeByCommaToSet(request.getJobIdsToAddExpression());
         Set<String> jobIdsToRemove = Strings.tokenizeByCommaToSet(request.getJobIdsToRemoveExpression());
 
-        jobProvider.updateCalendar(request.getCalendarId(), jobIdsToAdd, jobIdsToRemove,
+        jobResultsProvider.updateCalendar(request.getCalendarId(), jobIdsToAdd, jobIdsToRemove,
                 c -> {
                     jobManager.updateProcessOnCalendarChanged(c.getJobIds());
                     listener.onResponse(new PutCalendarAction.Response(c));

+ 5 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateModelSnapshotAction.java

@@ -27,7 +27,7 @@ import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.core.ml.job.results.Result;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 
 import java.io.IOException;
 import java.util.function.Consumer;
@@ -38,15 +38,15 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
 public class TransportUpdateModelSnapshotAction extends HandledTransportAction<UpdateModelSnapshotAction.Request,
         UpdateModelSnapshotAction.Response> {
 
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
     private final Client client;
 
     @Inject
     public TransportUpdateModelSnapshotAction(Settings settings, TransportService transportService,
-                                              ActionFilters actionFilters, JobProvider jobProvider, Client client) {
+                                              ActionFilters actionFilters, JobResultsProvider jobResultsProvider, Client client) {
         super(settings, UpdateModelSnapshotAction.NAME, transportService, actionFilters,
             UpdateModelSnapshotAction.Request::new);
-        this.jobProvider = jobProvider;
+        this.jobResultsProvider = jobResultsProvider;
         this.client = client;
     }
 
@@ -54,7 +54,7 @@ public class TransportUpdateModelSnapshotAction extends HandledTransportAction<U
     protected void doExecute(Task task, UpdateModelSnapshotAction.Request request,
                              ActionListener<UpdateModelSnapshotAction.Response> listener) {
         logger.debug("Received request to update model snapshot [{}] for job [{}]", request.getSnapshotId(), request.getJobId());
-        jobProvider.getModelSnapshot(request.getJobId(), request.getSnapshotId(), modelSnapshot -> {
+        jobResultsProvider.getModelSnapshot(request.getJobId(), request.getSnapshotId(), modelSnapshot -> {
             if (modelSnapshot == null) {
                 listener.onFailure(new ResourceNotFoundException(Messages.getMessage(
                         Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getSnapshotId(), request.getJobId())));

+ 6 - 6
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java

@@ -18,7 +18,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
 import org.elasticsearch.xpack.core.ml.job.results.Bucket;
 import org.elasticsearch.xpack.core.ml.job.results.Result;
 import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.notifications.Auditor;
 
 import java.util.Collections;
@@ -29,13 +29,13 @@ import java.util.function.Supplier;
 public class DatafeedJobBuilder {
 
     private final Client client;
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
     private final Auditor auditor;
     private final Supplier<Long> currentTimeSupplier;
 
-    public DatafeedJobBuilder(Client client, JobProvider jobProvider, Auditor auditor, Supplier<Long> currentTimeSupplier) {
+    public DatafeedJobBuilder(Client client, JobResultsProvider jobResultsProvider, Auditor auditor, Supplier<Long> currentTimeSupplier) {
         this.client = client;
-        this.jobProvider = Objects.requireNonNull(jobProvider);
+        this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider);
         this.auditor = Objects.requireNonNull(auditor);
         this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
     }
@@ -79,7 +79,7 @@ public class DatafeedJobBuilder {
                 TimeValue bucketSpan = job.getAnalysisConfig().getBucketSpan();
                 context.latestFinalBucketEndMs = buckets.results().get(0).getTimestamp().getTime() + bucketSpan.millis() - 1;
             }
-            jobProvider.dataCounts(job.getId(), dataCountsHandler, listener::onFailure);
+            jobResultsProvider.dataCounts(job.getId(), dataCountsHandler, listener::onFailure);
         };
 
         // Step 1. Collect latest bucket
@@ -87,7 +87,7 @@ public class DatafeedJobBuilder {
                 .sortField(Result.TIMESTAMP.getPreferredName())
                 .sortDescending(true).size(1)
                 .includeInterim(false);
-        jobProvider.bucketsViaInternalClient(job.getId(), latestBucketQuery, bucketsHandler, e -> {
+        jobResultsProvider.bucketsViaInternalClient(job.getId(), latestBucketQuery, bucketsHandler, e -> {
             if (e instanceof ResourceNotFoundException) {
                 QueryPage<Bucket> empty = new QueryPage<>(Collections.emptyList(), 0, Bucket.RESULT_TYPE_FIELD);
                 bucketsHandler.accept(empty);

+ 12 - 16
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java

@@ -52,7 +52,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapsho
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
 import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams;
 import org.elasticsearch.xpack.ml.notifications.Auditor;
@@ -86,7 +86,7 @@ public class JobManager extends AbstractComponent {
             new DeprecationLogger(Loggers.getLogger(JobManager.class));
 
     private final Environment environment;
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
     private final ClusterService clusterService;
     private final Auditor auditor;
     private final Client client;
@@ -97,11 +97,12 @@ public class JobManager extends AbstractComponent {
     /**
      * Create a JobManager
      */
-    public JobManager(Environment environment, Settings settings, JobProvider jobProvider, ClusterService clusterService, Auditor auditor,
+    public JobManager(Environment environment, Settings settings, JobResultsProvider jobResultsProvider,
+                      ClusterService clusterService, Auditor auditor,
                       Client client, UpdateJobProcessNotifier updateJobProcessNotifier) {
         super(settings);
         this.environment = environment;
-        this.jobProvider = Objects.requireNonNull(jobProvider);
+        this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider);
         this.clusterService = Objects.requireNonNull(clusterService);
         this.auditor = Objects.requireNonNull(auditor);
         this.client = Objects.requireNonNull(client);
@@ -167,11 +168,6 @@ public class JobManager extends AbstractComponent {
         return new QueryPage<>(jobs, jobs.size(), Job.RESULTS_FIELD);
     }
 
-    public JobState getJobState(String jobId) {
-        PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
-        return MlTasks.getJobState(jobId, tasks);
-    }
-
     /**
      * Validate the char filter/tokenizer/token filter names used in the categorization analyzer config (if any).
      * This validation has to be done server-side; it cannot be done in a client as that won't have loaded the
@@ -249,12 +245,12 @@ public class JobManager extends AbstractComponent {
 
         ActionListener<Boolean> checkForLeftOverDocs = ActionListener.wrap(
                 response -> {
-                    jobProvider.createJobResultIndex(job, state, putJobListener);
+                    jobResultsProvider.createJobResultIndex(job, state, putJobListener);
                 },
                 actionListener::onFailure
         );
 
-        jobProvider.checkForLeftOverDocuments(job, checkForLeftOverDocs);
+        jobResultsProvider.checkForLeftOverDocuments(job, checkForLeftOverDocs);
     }
 
     public void updateJob(UpdateJobAction.Request request, ActionListener<PutJobAction.Response> actionListener) {
@@ -275,14 +271,14 @@ public class JobManager extends AbstractComponent {
     private void validateModelSnapshotIdUpdate(Job job, String modelSnapshotId, ChainTaskExecutor chainTaskExecutor) {
         if (modelSnapshotId != null) {
             chainTaskExecutor.add(listener -> {
-                jobProvider.getModelSnapshot(job.getId(), modelSnapshotId, newModelSnapshot -> {
+                jobResultsProvider.getModelSnapshot(job.getId(), modelSnapshotId, newModelSnapshot -> {
                     if (newModelSnapshot == null) {
                         String message = Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, modelSnapshotId,
                                 job.getId());
                         listener.onFailure(new ResourceNotFoundException(message));
                         return;
                     }
-                    jobProvider.getModelSnapshot(job.getId(), job.getModelSnapshotId(), oldModelSnapshot -> {
+                    jobResultsProvider.getModelSnapshot(job.getId(), job.getModelSnapshotId(), oldModelSnapshot -> {
                         if (oldModelSnapshot != null
                                 && newModelSnapshot.result.getTimestamp().before(oldModelSnapshot.result.getTimestamp())) {
                             String message = "Job [" + job.getId() + "] has a more recent model snapshot [" +
@@ -307,7 +303,7 @@ public class JobManager extends AbstractComponent {
                         + " while the job is open"));
                 return;
             }
-            jobProvider.modelSizeStats(job.getId(), modelSizeStats -> {
+            jobResultsProvider.modelSizeStats(job.getId(), modelSizeStats -> {
                 if (modelSizeStats != null) {
                     ByteSizeValue modelSize = new ByteSizeValue(modelSizeStats.getModelBytes(), ByteSizeUnit.BYTES);
                     if (newModelMemoryLimit < modelSize.getMb()) {
@@ -539,7 +535,7 @@ public class JobManager extends AbstractComponent {
 
         // Step 2. Remove the job from any calendars
         CheckedConsumer<Boolean, Exception> removeFromCalendarsHandler = response -> {
-            jobProvider.removeJobFromCalendars(jobId, ActionListener.<Boolean>wrap(deleteJobStateHandler::accept,
+            jobResultsProvider.removeJobFromCalendars(jobId, ActionListener.<Boolean>wrap(deleteJobStateHandler::accept,
                     actionListener::onFailure ));
         };
 
@@ -607,7 +603,7 @@ public class JobManager extends AbstractComponent {
 
         // Step 0. Find the appropriate established model memory for the reverted job
         // -------
-        jobProvider.getEstablishedMemoryUsage(request.getJobId(), modelSizeStats.getTimestamp(), modelSizeStats, clusterStateHandler,
+        jobResultsProvider.getEstablishedMemoryUsage(request.getJobId(), modelSizeStats.getTimestamp(), modelSizeStats, clusterStateHandler,
                 actionListener::onFailure);
     }
 

+ 3 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java → x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java

@@ -129,8 +129,8 @@ import static org.elasticsearch.xpack.core.ClientHelper.clientWithOrigin;
 import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
 import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;
 
-public class JobProvider {
-    private static final Logger LOGGER = Loggers.getLogger(JobProvider.class);
+public class JobResultsProvider {
+    private static final Logger LOGGER = Loggers.getLogger(JobResultsProvider.class);
 
     private static final int RECORDS_SIZE_PARAM = 10000;
     public static final int BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE = 20;
@@ -139,7 +139,7 @@ public class JobProvider {
     private final Client client;
     private final Settings settings;
 
-    public JobProvider(Client client, Settings settings) {
+    public JobResultsProvider(Client client, Settings settings) {
         this.client = Objects.requireNonNull(client);
         this.settings = settings;
     }

+ 8 - 8
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

@@ -44,7 +44,7 @@ import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask;
 import org.elasticsearch.xpack.ml.job.JobManager;
 import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
 import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
@@ -112,7 +112,7 @@ public class AutodetectProcessManager extends AbstractComponent {
     private final Environment environment;
     private final ThreadPool threadPool;
     private final JobManager jobManager;
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
     private final AutodetectProcessFactory autodetectProcessFactory;
     private final NormalizerFactory normalizerFactory;
 
@@ -132,7 +132,7 @@ public class AutodetectProcessManager extends AbstractComponent {
     private final Auditor auditor;
 
     public AutodetectProcessManager(Environment environment, Settings settings, Client client, ThreadPool threadPool,
-                                    JobManager jobManager, JobProvider jobProvider, JobResultsPersister jobResultsPersister,
+                                    JobManager jobManager, JobResultsProvider jobResultsProvider, JobResultsPersister jobResultsPersister,
                                     JobDataCountsPersister jobDataCountsPersister,
                                     AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory,
                                     NamedXContentRegistry xContentRegistry, Auditor auditor) {
@@ -145,7 +145,7 @@ public class AutodetectProcessManager extends AbstractComponent {
         this.autodetectProcessFactory = autodetectProcessFactory;
         this.normalizerFactory = normalizerFactory;
         this.jobManager = jobManager;
-        this.jobProvider = jobProvider;
+        this.jobResultsProvider = jobResultsProvider;
         this.jobResultsPersister = jobResultsPersister;
         this.jobDataCountsPersister = jobDataCountsPersister;
         this.auditor = auditor;
@@ -362,7 +362,7 @@ public class AutodetectProcessManager extends AbstractComponent {
                         Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId());
                         DataCounts dataCounts = getStatistics(jobTask).get().v1();
                         ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts));
-                        jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener);
+                        jobResultsProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener);
                     } else {
                         eventsListener.onResponse(null);
                     }
@@ -403,7 +403,7 @@ public class AutodetectProcessManager extends AbstractComponent {
 
         logger.info("Opening job [{}]", jobId);
         processByAllocation.putIfAbsent(jobTask.getAllocationId(), new ProcessContext(jobTask));
-        jobProvider.getAutodetectParams(job, params -> {
+        jobResultsProvider.getAutodetectParams(job, params -> {
             // We need to fork, otherwise we restore model state from a network thread (several GET api calls):
             threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
                 @Override
@@ -495,7 +495,7 @@ public class AutodetectProcessManager extends AbstractComponent {
         ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME);
         DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, autodetectParams.dataCounts(),
                 jobDataCountsPersister);
-        ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider,
+        ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobResultsProvider,
                 new JobRenormalizedResultsPersister(job.getId(), settings, client), normalizerFactory);
         ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME);
         Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater,
@@ -504,7 +504,7 @@ public class AutodetectProcessManager extends AbstractComponent {
         AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autoDetectExecutorService,
                 onProcessCrash(jobTask));
         AutoDetectResultProcessor processor = new AutoDetectResultProcessor(
-                client, auditor, jobId, renormalizer, jobResultsPersister, jobProvider, autodetectParams.modelSizeStats(),
+                client, auditor, jobId, renormalizer, jobResultsPersister, jobResultsProvider, autodetectParams.modelSizeStats(),
                 autodetectParams.modelSnapshot() != null);
         ExecutorService autodetectWorkerExecutor;
         try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {

+ 12 - 10
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java

@@ -34,7 +34,7 @@ import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
 import org.elasticsearch.xpack.core.ml.job.results.Influencer;
 import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
 import org.elasticsearch.xpack.ml.MachineLearning;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
 import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
 import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
@@ -88,7 +88,7 @@ public class AutoDetectResultProcessor {
     private final String jobId;
     private final Renormalizer renormalizer;
     private final JobResultsPersister persister;
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
     private final boolean restoredSnapshot;
 
     final CountDownLatch completionLatch = new CountDownLatch(1);
@@ -107,20 +107,22 @@ public class AutoDetectResultProcessor {
     private volatile boolean haveNewLatestModelSizeStats;
     private Future<?> scheduledEstablishedModelMemoryUpdate; // only accessed in synchronized methods
 
-    public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, JobResultsPersister persister,
-                                     JobProvider jobProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot) {
-        this(client, auditor, jobId, renormalizer, persister, jobProvider, latestModelSizeStats, restoredSnapshot, new FlushListener());
+    public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
+                                     JobResultsPersister persister, JobResultsProvider jobResultsProvider,
+                                     ModelSizeStats latestModelSizeStats, boolean restoredSnapshot) {
+        this(client, auditor, jobId, renormalizer, persister, jobResultsProvider, latestModelSizeStats,
+                restoredSnapshot, new FlushListener());
     }
 
     AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, JobResultsPersister persister,
-                              JobProvider jobProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot,
+                              JobResultsProvider jobResultsProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot,
                               FlushListener flushListener) {
         this.client = Objects.requireNonNull(client);
         this.auditor = Objects.requireNonNull(auditor);
         this.jobId = Objects.requireNonNull(jobId);
         this.renormalizer = Objects.requireNonNull(renormalizer);
         this.persister = Objects.requireNonNull(persister);
-        this.jobProvider = Objects.requireNonNull(jobProvider);
+        this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider);
         this.flushListener = Objects.requireNonNull(flushListener);
         this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats);
         this.restoredSnapshot = restoredSnapshot;
@@ -214,7 +216,7 @@ public class AutoDetectResultProcessor {
 
             // if we haven't previously set established model memory, consider trying again after
             // a reasonable number of buckets have elapsed since the last model size stats update
-            long minEstablishedTimespanMs = JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE * bucket.getBucketSpan() * 1000L;
+            long minEstablishedTimespanMs = JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE * bucket.getBucketSpan() * 1000L;
             if (haveNewLatestModelSizeStats && latestEstablishedModelMemory == 0 && latestDateForEstablishedModelMemoryCalc.getTime()
                 > latestModelSizeStats.getTimestamp().getTime() + minEstablishedTimespanMs) {
                 scheduleEstablishedModelMemoryUpdate(ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY);
@@ -314,7 +316,7 @@ public class AutoDetectResultProcessor {
         // This is a crude way to NOT refresh the index and NOT attempt to update established model memory during the first 20 buckets
         // because this is when the model size stats are likely to be least stable and lots of updates will be coming through, and
         // we'll NEVER consider memory usage to be established during this period
-        if (restoredSnapshot || bucketCount >= JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) {
+        if (restoredSnapshot || bucketCount >= JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) {
             scheduleEstablishedModelMemoryUpdate(ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY);
         }
     }
@@ -429,7 +431,7 @@ public class AutoDetectResultProcessor {
         // We need to make all results written up to and including these stats available for the established memory calculation
         persister.commitResultWrites(jobId);
 
-        jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStatsForCalc, establishedModelMemory -> {
+        jobResultsProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStatsForCalc, establishedModelMemory -> {
             if (latestEstablishedModelMemory != establishedModelMemory) {
                 JobUpdate update = new JobUpdate.Builder(jobId).setEstablishedModelMemory(establishedModelMemory).build();
                 UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update);

+ 7 - 7
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java

@@ -13,7 +13,7 @@ import org.elasticsearch.xpack.core.ml.job.results.Bucket;
 import org.elasticsearch.xpack.core.ml.job.results.Influencer;
 import org.elasticsearch.xpack.core.ml.job.results.Result;
 import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
 
 import java.util.ArrayList;
@@ -43,17 +43,17 @@ public class ScoresUpdater {
     private static final long MILLISECONDS_IN_SECOND = 1000;
 
     private final String jobId;
-    private final JobProvider jobProvider;
+    private final JobResultsProvider jobResultsProvider;
     private final JobRenormalizedResultsPersister updatesPersister;
     private final NormalizerFactory normalizerFactory;
     private int bucketSpan;
     private long normalizationWindow;
     private volatile boolean shutdown;
 
-    public ScoresUpdater(Job job, JobProvider jobProvider, JobRenormalizedResultsPersister jobRenormalizedResultsPersister,
+    public ScoresUpdater(Job job, JobResultsProvider jobResultsProvider, JobRenormalizedResultsPersister jobRenormalizedResultsPersister,
                          NormalizerFactory normalizerFactory) {
         jobId = job.getId();
-        this.jobProvider = Objects.requireNonNull(jobProvider);
+        this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider);
         updatesPersister = Objects.requireNonNull(jobRenormalizedResultsPersister);
         this.normalizerFactory = Objects.requireNonNull(normalizerFactory);
         bucketSpan = ((Long) job.getAnalysisConfig().getBucketSpan().seconds()).intValue();
@@ -96,7 +96,7 @@ public class ScoresUpdater {
     private void updateBuckets(Normalizer normalizer, String quantilesState, long endBucketEpochMs,
                                long windowExtensionMs, int[] counts, boolean perPartitionNormalization) {
         BatchedDocumentsIterator<Result<Bucket>> bucketsIterator =
-                jobProvider.newBatchedBucketsIterator(jobId)
+                jobResultsProvider.newBatchedBucketsIterator(jobId)
                         .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs)
                         .includeInterim(false);
 
@@ -145,7 +145,7 @@ public class ScoresUpdater {
 
     private void updateRecords(Normalizer normalizer, String quantilesState, long endBucketEpochMs,
                                long windowExtensionMs, int[] counts, boolean perPartitionNormalization) {
-        BatchedDocumentsIterator<Result<AnomalyRecord>> recordsIterator = jobProvider.newBatchedRecordsIterator(jobId)
+        BatchedDocumentsIterator<Result<AnomalyRecord>> recordsIterator = jobResultsProvider.newBatchedRecordsIterator(jobId)
                 .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs)
                 .includeInterim(false);
 
@@ -168,7 +168,7 @@ public class ScoresUpdater {
 
     private void updateInfluencers(Normalizer normalizer, String quantilesState, long endBucketEpochMs,
                                    long windowExtensionMs, int[] counts, boolean perPartitionNormalization) {
-        BatchedDocumentsIterator<Result<Influencer>> influencersIterator = jobProvider.newBatchedInfluencersIterator(jobId)
+        BatchedDocumentsIterator<Result<Influencer>> influencersIterator = jobResultsProvider.newBatchedInfluencersIterator(jobId)
                 .timeRange(calcNormalizationWindowStart(endBucketEpochMs, windowExtensionMs), endBucketEpochMs)
                 .includeInterim(false);
 

+ 9 - 9
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilderTests.java

@@ -19,7 +19,7 @@ import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
 import org.elasticsearch.xpack.core.ml.job.results.Bucket;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.notifications.Auditor;
 import org.junit.Before;
 
@@ -41,7 +41,7 @@ public class DatafeedJobBuilderTests extends ESTestCase {
 
     private Client client;
     private Auditor auditor;
-    private JobProvider jobProvider;
+    private JobResultsProvider jobResultsProvider;
     private Consumer<Exception> taskHandler;
 
     private DatafeedJobBuilder datafeedJobBuilder;
@@ -54,9 +54,9 @@ public class DatafeedJobBuilderTests extends ESTestCase {
         when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
         when(client.settings()).thenReturn(Settings.EMPTY);
         auditor = mock(Auditor.class);
-        jobProvider = mock(JobProvider.class);
+        jobResultsProvider = mock(JobResultsProvider.class);
         taskHandler = mock(Consumer.class);
-        datafeedJobBuilder = new DatafeedJobBuilder(client, jobProvider, auditor, System::currentTimeMillis);
+        datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis);
 
         Mockito.doAnswer(invocationOnMock -> {
             String jobId = (String) invocationOnMock.getArguments()[0];
@@ -64,14 +64,14 @@ public class DatafeedJobBuilderTests extends ESTestCase {
             Consumer<DataCounts> handler = (Consumer<DataCounts>) invocationOnMock.getArguments()[1];
             handler.accept(new DataCounts(jobId));
             return null;
-        }).when(jobProvider).dataCounts(any(), any(), any());
+        }).when(jobResultsProvider).dataCounts(any(), any(), any());
 
         doAnswer(invocationOnMock -> {
             @SuppressWarnings("unchecked")
             Consumer<ResourceNotFoundException> consumer = (Consumer<ResourceNotFoundException>) invocationOnMock.getArguments()[3];
             consumer.accept(new ResourceNotFoundException("dummy"));
             return null;
-        }).when(jobProvider).bucketsViaInternalClient(any(), any(), any(), any());
+        }).when(jobResultsProvider).bucketsViaInternalClient(any(), any(), any(), any());
     }
 
     public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception {
@@ -157,7 +157,7 @@ public class DatafeedJobBuilderTests extends ESTestCase {
             Consumer<Exception> consumer = (Consumer<Exception>) invocationOnMock.getArguments()[3];
             consumer.accept(error);
             return null;
-        }).when(jobProvider).bucketsViaInternalClient(any(), any(), any(), any());
+        }).when(jobResultsProvider).bucketsViaInternalClient(any(), any(), any(), any());
 
         datafeedJobBuilder.build(jobBuilder.build(new Date()), datafeed, ActionListener.wrap(datafeedJob -> fail(), taskHandler));
 
@@ -173,7 +173,7 @@ public class DatafeedJobBuilderTests extends ESTestCase {
             dataCounts.setLatestRecordTimeStamp(new Date(latestRecordTimestamp));
             handler.accept(dataCounts);
             return null;
-        }).when(jobProvider).dataCounts(any(), any(), any());
+        }).when(jobResultsProvider).dataCounts(any(), any(), any());
 
         doAnswer(invocationOnMock -> {
             @SuppressWarnings("unchecked")
@@ -183,6 +183,6 @@ public class DatafeedJobBuilderTests extends ESTestCase {
             QueryPage<Bucket> bucketQueryPage = new QueryPage<Bucket>(Collections.singletonList(bucket), 1, Bucket.RESULTS_FIELD);
             consumer.accept(bucketQueryPage);
             return null;
-        }).when(jobProvider).bucketsViaInternalClient(any(), any(), any(), any());
+        }).when(jobResultsProvider).bucketsViaInternalClient(any(), any(), any(), any());
     }
 }

+ 12 - 12
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java

@@ -36,7 +36,7 @@ import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.MlSingleNodeTestCase;
 import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
 import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
 import org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder;
 import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
@@ -72,7 +72,7 @@ import static org.mockito.Mockito.when;
 public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
     private static final String JOB_ID = "autodetect-result-processor-it-job";
 
-    private JobProvider jobProvider;
+    private JobResultsProvider jobResultsProvider;
     private List<ModelSnapshot> capturedUpdateModelSnapshotOnJobRequests;
     private AutoDetectResultProcessor resultProcessor;
     private Renormalizer renormalizer;
@@ -98,11 +98,11 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         Settings.Builder builder = Settings.builder()
                 .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1));
         Auditor auditor = new Auditor(client(), "test_node");
-        jobProvider = new JobProvider(client(), builder.build());
+        jobResultsProvider = new JobResultsProvider(client(), builder.build());
         renormalizer = mock(Renormalizer.class);
         capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>();
         resultProcessor = new AutoDetectResultProcessor(client(), auditor, JOB_ID, renormalizer,
-                new JobResultsPersister(nodeSettings(), client()), jobProvider, new ModelSizeStats.Builder(JOB_ID).build(), false) {
+                new JobResultsPersister(nodeSettings(), client()), jobResultsProvider, new ModelSizeStats.Builder(JOB_ID).build(), false) {
             @Override
             protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) {
                 capturedUpdateModelSnapshotOnJobRequests.add(modelSnapshot);
@@ -159,7 +159,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         assertEquals(1, persistedDefinition.count());
         assertEquals(categoryDefinition, persistedDefinition.results().get(0));
 
-        QueryPage<ModelPlot> persistedModelPlot = jobProvider.modelPlot(JOB_ID, 0, 100);
+        QueryPage<ModelPlot> persistedModelPlot = jobResultsProvider.modelPlot(JOB_ID, 0, 100);
         assertEquals(1, persistedModelPlot.count());
         assertEquals(modelPlot, persistedModelPlot.results().get(0));
 
@@ -443,7 +443,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         AtomicReference<Exception> errorHolder = new AtomicReference<>();
         AtomicReference<QueryPage<Bucket>> resultHolder = new AtomicReference<>();
         CountDownLatch latch = new CountDownLatch(1);
-        jobProvider.buckets(JOB_ID, bucketsQuery, r -> {
+        jobResultsProvider.buckets(JOB_ID, bucketsQuery, r -> {
             resultHolder.set(r);
             latch.countDown();
         }, e -> {
@@ -461,7 +461,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         AtomicReference<Exception> errorHolder = new AtomicReference<>();
         AtomicReference<QueryPage<CategoryDefinition>> resultHolder = new AtomicReference<>();
         CountDownLatch latch = new CountDownLatch(1);
-        jobProvider.categoryDefinitions(JOB_ID, categoryId, false, null, null, r -> {
+        jobResultsProvider.categoryDefinitions(JOB_ID, categoryId, false, null, null, r -> {
             resultHolder.set(r);
             latch.countDown();
         }, e -> {
@@ -479,7 +479,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         AtomicReference<Exception> errorHolder = new AtomicReference<>();
         AtomicReference<ModelSizeStats> resultHolder = new AtomicReference<>();
         CountDownLatch latch = new CountDownLatch(1);
-        jobProvider.modelSizeStats(JOB_ID, modelSizeStats -> {
+        jobResultsProvider.modelSizeStats(JOB_ID, modelSizeStats -> {
             resultHolder.set(modelSizeStats);
             latch.countDown();
         }, e -> {
@@ -497,7 +497,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         AtomicReference<Exception> errorHolder = new AtomicReference<>();
         AtomicReference<QueryPage<Influencer>> resultHolder = new AtomicReference<>();
         CountDownLatch latch = new CountDownLatch(1);
-        jobProvider.influencers(JOB_ID, new InfluencersQueryBuilder().build(), page -> {
+        jobResultsProvider.influencers(JOB_ID, new InfluencersQueryBuilder().build(), page -> {
             resultHolder.set(page);
             latch.countDown();
         }, e -> {
@@ -515,7 +515,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         AtomicReference<Exception> errorHolder = new AtomicReference<>();
         AtomicReference<QueryPage<AnomalyRecord>> resultHolder = new AtomicReference<>();
         CountDownLatch latch = new CountDownLatch(1);
-        jobProvider.records(JOB_ID, recordsQuery, page -> {
+        jobResultsProvider.records(JOB_ID, recordsQuery, page -> {
             resultHolder.set(page);
             latch.countDown();
         }, e -> {
@@ -533,7 +533,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         AtomicReference<Exception> errorHolder = new AtomicReference<>();
         AtomicReference<QueryPage<ModelSnapshot>> resultHolder = new AtomicReference<>();
         CountDownLatch latch = new CountDownLatch(1);
-        jobProvider.modelSnapshots(JOB_ID, 0, 100, page -> {
+        jobResultsProvider.modelSnapshots(JOB_ID, 0, 100, page -> {
             resultHolder.set(page);
             latch.countDown();
         }, e -> {
@@ -551,7 +551,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         AtomicReference<Exception> errorHolder = new AtomicReference<>();
         AtomicReference<Optional<Quantiles>> resultHolder = new AtomicReference<>();
         CountDownLatch latch = new CountDownLatch(1);
-        jobProvider.getAutodetectParams(JobTests.buildJobBuilder(JOB_ID).build(),params -> {
+        jobResultsProvider.getAutodetectParams(JobTests.buildJobBuilder(JOB_ID).build(), params -> {
             resultHolder.set(Optional.ofNullable(params.quantiles()));
             latch.countDown();
         }, e -> {

+ 4 - 4
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java

@@ -11,7 +11,7 @@ import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
 import org.elasticsearch.xpack.core.ml.job.results.Bucket;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
 import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
 import org.junit.Before;
@@ -26,13 +26,13 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
 
     private long bucketSpan = AnalysisConfig.Builder.DEFAULT_BUCKET_SPAN.getMillis();
 
-    private JobProvider jobProvider;
+    private JobResultsProvider jobResultsProvider;
     private JobResultsPersister jobResultsPersister;
 
     @Before
     public void createComponents() {
         Settings settings = nodeSettings(0);
-        jobProvider = new JobProvider(client(), settings);
+        jobResultsProvider = new JobResultsProvider(client(), settings);
         jobResultsPersister = new JobResultsPersister(settings, client());
     }
 
@@ -251,7 +251,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
         CountDownLatch latch = new CountDownLatch(1);
 
         Date latestBucketTimestamp = (bucketNum != null) ? new Date(bucketSpan * bucketNum) : null;
-        jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, latestModelSizeStats, memUse -> {
+        jobResultsProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, latestModelSizeStats, memUse -> {
                     establishedModelMemoryUsage.set(memUse);
                     latch.countDown();
                 }, e -> {

+ 4 - 4
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobProviderIT.java → x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java

@@ -45,7 +45,7 @@ import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.MlSingleNodeTestCase;
 import org.elasticsearch.xpack.ml.job.persistence.CalendarQueryBuilder;
 import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
 import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
@@ -72,9 +72,9 @@ import static org.hamcrest.collection.IsEmptyCollection.empty;
 import static org.hamcrest.core.Is.is;
 
 
-public class JobProviderIT extends MlSingleNodeTestCase {
+public class JobResultsProviderIT extends MlSingleNodeTestCase {
 
-    private JobProvider jobProvider;
+    private JobResultsProvider jobProvider;
 
     @Override
     protected Settings nodeSettings()  {
@@ -95,7 +95,7 @@ public class JobProviderIT extends MlSingleNodeTestCase {
     public void createComponents() throws Exception {
         Settings.Builder builder = Settings.builder()
                 .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1));
-        jobProvider = new JobProvider(client(), builder.build());
+        jobProvider = new JobResultsProvider(client(), builder.build());
         waitForMlTemplates();
     }
 

+ 6 - 5
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java

@@ -34,7 +34,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState;
 import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
 import org.elasticsearch.xpack.core.ml.job.config.RuleScope;
 import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams;
 import org.elasticsearch.xpack.ml.notifications.Auditor;
 import org.junit.Before;
@@ -68,7 +68,7 @@ public class JobManagerTests extends ESTestCase {
     private AnalysisRegistry analysisRegistry;
     private Client client;
     private ClusterService clusterService;
-    private JobProvider jobProvider;
+    private JobResultsProvider jobResultsProvider;
     private Auditor auditor;
     private UpdateJobProcessNotifier updateJobProcessNotifier;
 
@@ -79,7 +79,7 @@ public class JobManagerTests extends ESTestCase {
         analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(environment);
         client = mock(Client.class);
         clusterService = mock(ClusterService.class);
-        jobProvider = mock(JobProvider.class);
+        jobResultsProvider = mock(JobResultsProvider.class);
         auditor = mock(Auditor.class);
         updateJobProcessNotifier = mock(UpdateJobProcessNotifier.class);
     }
@@ -131,7 +131,7 @@ public class JobManagerTests extends ESTestCase {
             ActionListener<Boolean> listener = (ActionListener<Boolean>) invocation.getArguments()[2];
             listener.onResponse(true);
             return null;
-        }).when(jobProvider).createJobResultIndex(requestCaptor.capture(), any(ClusterState.class), any(ActionListener.class));
+        }).when(jobResultsProvider).createJobResultIndex(requestCaptor.capture(), any(ClusterState.class), any(ActionListener.class));
 
         ClusterState clusterState = createClusterState();
 
@@ -404,7 +404,8 @@ public class JobManagerTests extends ESTestCase {
         ClusterSettings clusterSettings = new ClusterSettings(environment.settings(),
                 Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT));
         when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
-        return new JobManager(environment, environment.settings(), jobProvider, clusterService, auditor, client, updateJobProcessNotifier);
+        return new JobManager(environment, environment.settings(), jobResultsProvider, clusterService,
+                auditor, client, updateJobProcessNotifier);
     }
 
     private ClusterState createClusterState() {

+ 25 - 25
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java → x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java

@@ -72,7 +72,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-public class JobProviderTests extends ESTestCase {
+public class JobResultsProviderTests extends ESTestCase {
     private static final String CLUSTER_NAME = "myCluster";
 
     @SuppressWarnings("unchecked")
@@ -87,7 +87,7 @@ public class JobProviderTests extends ESTestCase {
         clientBuilder.prepareAlias(resultsIndexName, AnomalyDetectorsIndex.resultsWriteAlias("foo"));
 
         Job.Builder job = buildJobBuilder("foo");
-        JobProvider provider = createProvider(clientBuilder.build());
+        JobResultsProvider provider = createProvider(clientBuilder.build());
         AtomicReference<Boolean> resultHolder = new AtomicReference<>();
 
         ClusterState cs = ClusterState.builder(new ClusterName("_name"))
@@ -141,7 +141,7 @@ public class JobProviderTests extends ESTestCase {
 
         Job.Builder job = buildJobBuilder("foo123");
         job.setResultsIndexName("foo");
-        JobProvider provider = createProvider(clientBuilder.build());
+        JobResultsProvider provider = createProvider(clientBuilder.build());
 
         Index index = mock(Index.class);
         when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsAliasedName("foo"));
@@ -202,7 +202,7 @@ public class JobProviderTests extends ESTestCase {
         Job.Builder job = buildJobBuilder("foo");
         job.setResultsIndexName("bar");
         Client client = clientBuilder.build();
-        JobProvider provider = createProvider(client);
+        JobResultsProvider provider = createProvider(client);
 
         ImmutableOpenMap<String, IndexMetaData> indexMap = ImmutableOpenMap.<String, IndexMetaData>builder().build();
 
@@ -248,7 +248,7 @@ public class JobProviderTests extends ESTestCase {
         int from = 0;
         int size = 10;
         Client client = getMockedClient(queryBuilder -> queryBuilderHolder[0] = queryBuilder, response);
-        JobProvider provider = createProvider(client);
+        JobResultsProvider provider = createProvider(client);
 
         BucketsQueryBuilder bq = new BucketsQueryBuilder().from(from).size(size).anomalyScoreThreshold(1.0);
 
@@ -281,7 +281,7 @@ public class JobProviderTests extends ESTestCase {
         int size = 17;
 
         Client client = getMockedClient(queryBuilder -> queryBuilderHolder[0] = queryBuilder, response);
-        JobProvider provider = createProvider(client);
+        JobResultsProvider provider = createProvider(client);
 
         BucketsQueryBuilder bq = new BucketsQueryBuilder().from(from).size(size).anomalyScoreThreshold(5.1)
                 .includeInterim(true);
@@ -314,7 +314,7 @@ public class JobProviderTests extends ESTestCase {
         int size = 17;
 
         Client client = getMockedClient(queryBuilder -> queryBuilderHolder[0] = queryBuilder, response);
-        JobProvider provider = createProvider(client);
+        JobResultsProvider provider = createProvider(client);
 
         BucketsQueryBuilder bq = new BucketsQueryBuilder();
         bq.from(from);
@@ -341,7 +341,7 @@ public class JobProviderTests extends ESTestCase {
         SearchResponse response = createSearchResponse(source);
 
         Client client = getMockedClient(queryBuilder -> {}, response);
-        JobProvider provider = createProvider(client);
+        JobResultsProvider provider = createProvider(client);
 
         BucketsQueryBuilder bq = new BucketsQueryBuilder();
         bq.timestamp(Long.toString(timestamp));
@@ -363,7 +363,7 @@ public class JobProviderTests extends ESTestCase {
 
         SearchResponse response = createSearchResponse(source);
         Client client = getMockedClient(queryBuilder -> {}, response);
-        JobProvider provider = createProvider(client);
+        JobResultsProvider provider = createProvider(client);
 
         BucketsQueryBuilder bq = new BucketsQueryBuilder();
         bq.timestamp(Long.toString(now.getTime()));
@@ -403,7 +403,7 @@ public class JobProviderTests extends ESTestCase {
         String sortfield = "minefield";
         SearchResponse response = createSearchResponse(source);
         Client client = getMockedClient(qb -> {}, response);
-        JobProvider provider = createProvider(client);
+        JobResultsProvider provider = createProvider(client);
 
         RecordsQueryBuilder rqb = new RecordsQueryBuilder().from(from).size(size).epochStart(String.valueOf(now.getTime()))
                 .epochEnd(String.valueOf(now.getTime())).includeInterim(true).sortField(sortfield)
@@ -451,7 +451,7 @@ public class JobProviderTests extends ESTestCase {
         SearchResponse response = createSearchResponse(source);
 
         Client client = getMockedClient(qb -> {}, response);
-        JobProvider provider = createProvider(client);
+        JobResultsProvider provider = createProvider(client);
 
         RecordsQueryBuilder rqb = new RecordsQueryBuilder();
         rqb.from(from);
@@ -505,7 +505,7 @@ public class JobProviderTests extends ESTestCase {
         String sortfield = "minefield";
         SearchResponse response = createSearchResponse(source);
         Client client = getMockedClient(qb -> {}, response);
-        JobProvider provider = createProvider(client);
+        JobResultsProvider provider = createProvider(client);
 
         @SuppressWarnings({"unchecked"})
         QueryPage<AnomalyRecord>[] holder = new QueryPage[1];
@@ -542,7 +542,7 @@ public class JobProviderTests extends ESTestCase {
 
         SearchResponse response = createSearchResponse(source);
         Client client = getMockedClient(qb -> {}, response);
-        JobProvider provider = createProvider(client);
+        JobResultsProvider provider = createProvider(client);
 
         Integer[] holder = new Integer[1];
         provider.expandBucket(jobId, false, bucket, records -> holder[0] = records, RuntimeException::new, client);
@@ -567,7 +567,7 @@ public class JobProviderTests extends ESTestCase {
         int size = 10;
         Client client = getMockedClient(q -> {}, response);
 
-        JobProvider provider = createProvider(client);
+        JobResultsProvider provider = createProvider(client);
         @SuppressWarnings({"unchecked"})
         QueryPage<CategoryDefinition>[] holder = new QueryPage[1];
         provider.categoryDefinitions(jobId, null, false, from, size, r -> holder[0] = r,
@@ -589,7 +589,7 @@ public class JobProviderTests extends ESTestCase {
 
         SearchResponse response = createSearchResponse(Collections.singletonList(source));
         Client client = getMockedClient(q -> {}, response);
-        JobProvider provider = createProvider(client);
+        JobResultsProvider provider = createProvider(client);
         @SuppressWarnings({"unchecked"})
         QueryPage<CategoryDefinition>[] holder = new QueryPage[1];
         provider.categoryDefinitions(jobId, categoryId, false, null, null,
@@ -630,7 +630,7 @@ public class JobProviderTests extends ESTestCase {
         QueryBuilder[] qbHolder = new QueryBuilder[1];
         SearchResponse response = createSearchResponse(source);
         Client client = getMockedClient(q -> qbHolder[0] = q, response);
-        JobProvider provider = createProvider(client);
+        JobResultsProvider provider = createProvider(client);
 
         @SuppressWarnings({"unchecked"})
         QueryPage<Influencer>[] holder = new QueryPage[1];
@@ -690,7 +690,7 @@ public class JobProviderTests extends ESTestCase {
         QueryBuilder[] qbHolder = new QueryBuilder[1];
         SearchResponse response = createSearchResponse(source);
         Client client = getMockedClient(q -> qbHolder[0] = q, response);
-        JobProvider provider = createProvider(client);
+        JobResultsProvider provider = createProvider(client);
 
         @SuppressWarnings({"unchecked"})
         QueryPage<Influencer>[] holder = new QueryPage[1];
@@ -745,7 +745,7 @@ public class JobProviderTests extends ESTestCase {
         int size = 3;
         SearchResponse response = createSearchResponse(source);
         Client client = getMockedClient(qb -> {}, response);
-        JobProvider provider = createProvider(client);
+        JobResultsProvider provider = createProvider(client);
 
         @SuppressWarnings({"unchecked"})
         QueryPage<ModelSnapshot>[] holder = new QueryPage[1];
@@ -783,11 +783,11 @@ public class JobProviderTests extends ESTestCase {
         MetaData metaData = MetaData.builder()
                 .put(indexMetaData1)
                 .build();
-        boolean result = JobProvider.violatedFieldCountLimit("index1", 0, 10,
+        boolean result = JobResultsProvider.violatedFieldCountLimit("index1", 0, 10,
                 ClusterState.builder(new ClusterName("_name")).metaData(metaData).build());
         assertFalse(result);
 
-        result = JobProvider.violatedFieldCountLimit("index1", 1, 10,
+        result = JobResultsProvider.violatedFieldCountLimit("index1", 1, 10,
                 ClusterState.builder(new ClusterName("_name")).metaData(metaData).build());
         assertTrue(result);
 
@@ -801,7 +801,7 @@ public class JobProviderTests extends ESTestCase {
         metaData = MetaData.builder()
                 .put(indexMetaData2)
                 .build();
-        result = JobProvider.violatedFieldCountLimit("index1", 0, 19,
+        result = JobResultsProvider.violatedFieldCountLimit("index1", 0, 19,
                 ClusterState.builder(new ClusterName("_name")).metaData(metaData).build());
         assertTrue(result);
     }
@@ -811,7 +811,7 @@ public class JobProviderTests extends ESTestCase {
         mapping.put("field1", Collections.singletonMap("type", "string"));
         mapping.put("field2", Collections.singletonMap("type", "string"));
         mapping.put("field3", Collections.singletonMap("type", "string"));
-        assertEquals(3, JobProvider.countFields(Collections.singletonMap("properties", mapping)));
+        assertEquals(3, JobResultsProvider.countFields(Collections.singletonMap("properties", mapping)));
 
         Map<String, Object> objectProperties = new HashMap<>();
         objectProperties.put("field4", Collections.singletonMap("type", "string"));
@@ -822,15 +822,15 @@ public class JobProviderTests extends ESTestCase {
         objectField.put("properties", objectProperties);
 
         mapping.put("field4", objectField);
-        assertEquals(7, JobProvider.countFields(Collections.singletonMap("properties", mapping)));
+        assertEquals(7, JobResultsProvider.countFields(Collections.singletonMap("properties", mapping)));
     }
 
     private Bucket createBucketAtEpochTime(long epoch) {
         return new Bucket("foo", new Date(epoch), 123);
     }
 
-    private JobProvider createProvider(Client client) {
-        return new JobProvider(client, Settings.EMPTY);
+    private JobResultsProvider createProvider(Client client) {
+        return new JobResultsProvider(client, Settings.EMPTY);
     }
 
     private static GetResponse createGetResponse(boolean exists, Map<String, Object> source) throws IOException {

+ 8 - 8
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java

@@ -39,7 +39,7 @@ import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask;
 import org.elasticsearch.xpack.ml.job.JobManager;
 import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests;
 import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
 import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
@@ -101,7 +101,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
     private Environment environment;
     private AnalysisRegistry analysisRegistry;
     private JobManager jobManager;
-    private JobProvider jobProvider;
+    private JobResultsProvider jobResultsProvider;
     private JobResultsPersister jobResultsPersister;
     private JobDataCountsPersister jobDataCountsPersister;
     private NormalizerFactory normalizerFactory;
@@ -119,7 +119,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         environment = TestEnvironment.newEnvironment(settings);
         analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(environment);
         jobManager = mock(JobManager.class);
-        jobProvider = mock(JobProvider.class);
+        jobResultsProvider = mock(JobResultsProvider.class);
         jobResultsPersister = mock(JobResultsPersister.class);
         when(jobResultsPersister.bulkPersisterBuilder(any())).thenReturn(mock(JobResultsPersister.Builder.class));
         jobDataCountsPersister = mock(JobDataCountsPersister.class);
@@ -132,7 +132,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
             Consumer<AutodetectParams> handler = (Consumer<AutodetectParams>) invocationOnMock.getArguments()[1];
             handler.accept(buildAutodetectParams());
             return null;
-        }).when(jobProvider).getAutodetectParams(any(), any(), any());
+        }).when(jobResultsProvider).getAutodetectParams(any(), any(), any());
     }
 
     public void testMaxOpenJobsSetting_givenDefault() {
@@ -227,7 +227,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         Settings.Builder settings = Settings.builder();
         settings.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), 3);
         AutodetectProcessManager manager = spy(new AutodetectProcessManager(environment, settings.build(), client, threadPool,
-                jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
+                jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
                 normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor));
         doReturn(executorService).when(manager).createAutodetectExecutorService(any());
 
@@ -582,7 +582,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         AutodetectProcessFactory autodetectProcessFactory =
                 (j, autodetectParams, e, onProcessCrash) -> autodetectProcess;
         AutodetectProcessManager manager = new AutodetectProcessManager(environment, Settings.EMPTY,
-                client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
+                client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
                 normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor);
 
         JobTask jobTask = mock(JobTask.class);
@@ -655,7 +655,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         AutodetectProcessFactory autodetectProcessFactory =
                 (j, autodetectParams, e, onProcessCrash) -> autodetectProcess;
         return new AutodetectProcessManager(environment, Settings.EMPTY, client, threadPool, jobManager,
-                jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
+                jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
                 normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor);
     }
 
@@ -680,7 +680,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         when(threadPool.executor(anyString())).thenReturn(EsExecutors.newDirectExecutorService());
         AutodetectProcessFactory autodetectProcessFactory = mock(AutodetectProcessFactory.class);
         AutodetectProcessManager manager = new AutodetectProcessManager(environment, Settings.EMPTY,
-                client, threadPool, jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister,
+                client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister,
                 autodetectProcessFactory, normalizerFactory,
                 new NamedXContentRegistry(Collections.emptyList()), auditor);
         manager = spy(manager);

+ 14 - 14
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java

@@ -29,7 +29,7 @@ import org.elasticsearch.xpack.core.ml.job.results.Bucket;
 import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
 import org.elasticsearch.xpack.core.ml.job.results.Influencer;
 import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
 import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
 import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
@@ -75,7 +75,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
     private Auditor auditor;
     private Renormalizer renormalizer;
     private JobResultsPersister persister;
-    private JobProvider jobProvider;
+    private JobResultsProvider jobResultsProvider;
     private FlushListener flushListener;
     private AutoDetectResultProcessor processorUnderTest;
     private ScheduledThreadPoolExecutor executor;
@@ -90,9 +90,9 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
         when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
         renormalizer = mock(Renormalizer.class);
         persister = mock(JobResultsPersister.class);
-        jobProvider = mock(JobProvider.class);
+        jobResultsProvider = mock(JobResultsProvider.class);
         flushListener = mock(FlushListener.class);
-        processorUnderTest = new AutoDetectResultProcessor(client, auditor, JOB_ID, renormalizer, persister, jobProvider,
+        processorUnderTest = new AutoDetectResultProcessor(client, auditor, JOB_ID, renormalizer, persister, jobResultsProvider,
                 new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(), false, flushListener);
     }
 
@@ -294,8 +294,8 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
 
         verify(persister, times(1)).persistModelSizeStats(modelSizeStats);
         verifyNoMoreInteractions(persister);
-        // No interactions with the jobProvider confirms that the established memory calculation did not run
-        verifyNoMoreInteractions(jobProvider, auditor);
+        // No interactions with the jobResultsProvider confirms that the established memory calculation did not run
+        verifyNoMoreInteractions(jobResultsProvider, auditor);
         assertEquals(modelSizeStats, processorUnderTest.modelSizeStats());
     }
 
@@ -347,7 +347,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
 
         AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder);
         context.deleteInterimRequired = false;
-        for (int i = 0; i < JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE; ++i) {
+        for (int i = 0; i < JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE; ++i) {
             AutodetectResult result = mock(AutodetectResult.class);
             Bucket bucket = mock(Bucket.class);
             when(result.getBucket()).thenReturn(bucket);
@@ -366,9 +366,9 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
             verify(persister, times(1)).persistModelSizeStats(modelSizeStats);
             verify(persister, times(1)).commitResultWrites(JOB_ID);
             verifyNoMoreInteractions(persister);
-            verify(jobProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), eq(timestamp), eq(modelSizeStats), any(Consumer.class),
-                any(Consumer.class));
-            verifyNoMoreInteractions(jobProvider);
+            verify(jobResultsProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), eq(timestamp),
+                    eq(modelSizeStats), any(Consumer.class), any(Consumer.class));
+            verifyNoMoreInteractions(jobResultsProvider);
             assertEquals(modelSizeStats, processorUnderTest.modelSizeStats());
         });
     }
@@ -383,13 +383,13 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
         AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder);
         context.deleteInterimRequired = false;
         ModelSizeStats modelSizeStats = null;
-        for (int i = 1; i <= JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE + 5; ++i) {
+        for (int i = 1; i <= JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE + 5; ++i) {
             AutodetectResult result = mock(AutodetectResult.class);
             Bucket bucket = mock(Bucket.class);
             when(bucket.getTimestamp()).thenReturn(new Date(BUCKET_SPAN_MS * i));
             when(result.getBucket()).thenReturn(bucket);
             processorUnderTest.processResult(context, result);
-            if (i > JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) {
+            if (i > JobResultsProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) {
                 result = mock(AutodetectResult.class);
                 modelSizeStats = mock(ModelSizeStats.class);
                 when(modelSizeStats.getTimestamp()).thenReturn(new Date(BUCKET_SPAN_MS * i));
@@ -409,9 +409,9 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
             // ...but only the last should trigger an established model memory update
             verify(persister, times(1)).commitResultWrites(JOB_ID);
             verifyNoMoreInteractions(persister);
-            verify(jobProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), eq(lastTimestamp), eq(lastModelSizeStats),
+            verify(jobResultsProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), eq(lastTimestamp), eq(lastModelSizeStats),
                 any(Consumer.class), any(Consumer.class));
-            verifyNoMoreInteractions(jobProvider);
+            verifyNoMoreInteractions(jobResultsProvider);
             assertEquals(lastModelSizeStats, processorUnderTest.modelSizeStats());
         });
     }

+ 7 - 7
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdaterTests.java

@@ -16,7 +16,7 @@ import org.elasticsearch.xpack.core.ml.job.results.Bucket;
 import org.elasticsearch.xpack.core.ml.job.results.BucketInfluencer;
 import org.elasticsearch.xpack.core.ml.job.results.Influencer;
 import org.elasticsearch.xpack.core.ml.job.results.Result;
-import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
 import org.elasticsearch.xpack.ml.job.persistence.MockBatchedDocumentsIterator;
 import org.junit.Before;
@@ -52,7 +52,7 @@ public class ScoresUpdaterTests extends ESTestCase {
     private static final long DEFAULT_BUCKET_SPAN = 3600;
     private static final long DEFAULT_START_TIME = 0;
 
-    private JobProvider jobProvider = mock(JobProvider.class);
+    private JobResultsProvider jobResultsProvider = mock(JobResultsProvider.class);
     private JobRenormalizedResultsPersister jobRenormalizedResultsPersister = mock(JobRenormalizedResultsPersister.class);
     private Normalizer normalizer = mock(Normalizer.class);
     private NormalizerFactory normalizerFactory = mock(NormalizerFactory.class);
@@ -78,7 +78,7 @@ public class ScoresUpdaterTests extends ESTestCase {
 
         job = jobBuilder.build(new Date());
 
-        scoresUpdater = new ScoresUpdater(job, jobProvider, jobRenormalizedResultsPersister, normalizerFactory);
+        scoresUpdater = new ScoresUpdater(job, jobResultsProvider, jobRenormalizedResultsPersister, normalizerFactory);
 
         givenProviderReturnsNoBuckets();
         givenProviderReturnsNoRecords();
@@ -210,7 +210,7 @@ public class ScoresUpdaterTests extends ESTestCase {
         MockBatchedDocumentsIterator<AnomalyRecord> recordIter = new MockBatchedDocumentsIterator<>(
                 recordBatches, AnomalyRecord.RESULT_TYPE_VALUE);
         recordIter.requireIncludeInterim(false);
-        when(jobProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter);
+        when(jobResultsProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter);
 
         scoresUpdater.update(QUANTILES_STATE, 3600, 0, false);
 
@@ -376,7 +376,7 @@ public class ScoresUpdaterTests extends ESTestCase {
 
         MockBatchedDocumentsIterator<Bucket> bucketIter = new MockBatchedDocumentsIterator<>(batchesWithIndex, Bucket.RESULT_TYPE_VALUE);
         bucketIter.requireIncludeInterim(false);
-        when(jobProvider.newBatchedBucketsIterator(JOB_ID)).thenReturn(bucketIter);
+        when(jobResultsProvider.newBatchedBucketsIterator(JOB_ID)).thenReturn(bucketIter);
     }
 
     private void givenProviderReturnsNoRecords() {
@@ -394,7 +394,7 @@ public class ScoresUpdaterTests extends ESTestCase {
         MockBatchedDocumentsIterator<AnomalyRecord> recordIter = new MockBatchedDocumentsIterator<>(
                 batches, AnomalyRecord.RESULT_TYPE_VALUE);
         recordIter.requireIncludeInterim(false);
-        when(jobProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter);
+        when(jobResultsProvider.newBatchedRecordsIterator(JOB_ID)).thenReturn(recordIter);
     }
 
     private void givenProviderReturnsNoInfluencers() {
@@ -410,7 +410,7 @@ public class ScoresUpdaterTests extends ESTestCase {
         batches.add(queue);
         MockBatchedDocumentsIterator<Influencer> iterator = new MockBatchedDocumentsIterator<>(batches, Influencer.RESULT_TYPE_VALUE);
         iterator.requireIncludeInterim(false);
-        when(jobProvider.newBatchedInfluencersIterator(JOB_ID)).thenReturn(iterator);
+        when(jobResultsProvider.newBatchedInfluencersIterator(JOB_ID)).thenReturn(iterator);
     }
 
     private void verifyNormalizerWasInvoked(int times) throws IOException {