Browse Source

[ML] Update running process when global calendar changes (#83044)

Adding events to global calendars did not update open 
jobs as the special _all job Id was not checked.
David Kyle 3 years ago
parent
commit
c61881ce0e

+ 5 - 0
docs/changelog/83044.yaml

@@ -0,0 +1,5 @@
+pr: 83044
+summary: Update running process when global calendar changes
+area: Machine Learning
+type: bug
+issues: []

+ 6 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java

@@ -7,6 +7,7 @@
 package org.elasticsearch.xpack.core.ml.job.config;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -395,6 +396,11 @@ public class JobUpdate implements Writeable, ToXContentObject {
         return builder;
     }
 
+    @Override
+    public String toString() {
+        return Strings.toString(this::toXContent);
+    }
+
     public Set<String> getUpdateFields() {
         Set<String> updateFields = new TreeSet<>();
         if (groups != null) {

+ 47 - 13
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java

@@ -33,6 +33,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -372,7 +373,10 @@ public class ScheduledEventsIT extends MlNativeAutodetectIntegTestCase {
     }
 
     /**
-     * An open job that later gets added to a calendar, should take the scheduled events into account
+     * Add a global calendar then create a job that will pick
+     * up the calendar.
+     * Add a new scheduled event to the calendar, the open
+     * job should pick up the new event
      */
     public void testNewJobWithGlobalCalendar() throws Exception {
         String calendarId = "test-global-calendar";
@@ -381,28 +385,56 @@ public class ScheduledEventsIT extends MlNativeAutodetectIntegTestCase {
         putCalendar(calendarId, Collections.singletonList(Metadata.ALL), "testNewJobWithGlobalCalendar calendar");
 
         long startTime = 1514764800000L;
-        final int bucketCount = 3;
+        final int bucketCount = 6;
         TimeValue bucketSpan = TimeValue.timeValueMinutes(30);
 
         // Put events in the calendar
-        List<ScheduledEvent> events = new ArrayList<>();
+        List<ScheduledEvent> preOpenEvents = new ArrayList<>();
         long eventStartTime = startTime;
         long eventEndTime = eventStartTime + (long) (1.5 * bucketSpan.millis());
-        events.add(
-            new ScheduledEvent.Builder().description("Some Event")
+        preOpenEvents.add(
+            new ScheduledEvent.Builder().description("Pre open Event")
                 .startTime((Instant.ofEpochMilli(eventStartTime)))
                 .endTime((Instant.ofEpochMilli(eventEndTime)))
                 .calendarId(calendarId)
                 .build()
         );
 
-        postScheduledEvents(calendarId, events);
-
-        Job.Builder job = createJob("scheduled-events-add-to-new-job--with-global-calendar", bucketSpan);
+        postScheduledEvents(calendarId, preOpenEvents);
 
         // Open the job
+        Job.Builder job = createJob("scheduled-events-add-to-new-job--with-global-calendar", bucketSpan);
         openJob(job.getId());
 
+        // Add another event after the job is opened
+        List<ScheduledEvent> postOpenJobEvents = new ArrayList<>();
+        eventStartTime = eventEndTime + (3 * bucketSpan.millis());
+        eventEndTime = eventStartTime + bucketSpan.millis();
+        postOpenJobEvents.add(
+            new ScheduledEvent.Builder().description("Event added after job is opened")
+                .startTime((Instant.ofEpochMilli(eventStartTime)))
+                .endTime((Instant.ofEpochMilli(eventEndTime)))
+                .calendarId(calendarId)
+                .build()
+        );
+        postScheduledEvents(calendarId, postOpenJobEvents);
+
+        // Wait until the notification that the job was updated is indexed
+        assertBusy(() -> {
+            SearchResponse searchResponse = client().prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX)
+                .setSize(1)
+                .addSort("timestamp", SortOrder.DESC)
+                .setQuery(
+                    QueryBuilders.boolQuery()
+                        .filter(QueryBuilders.termQuery("job_id", job.getId()))
+                        .filter(QueryBuilders.termQuery("level", "info"))
+                )
+                .get();
+            SearchHit[] hits = searchResponse.getHits().getHits();
+            assertThat(hits.length, equalTo(1));
+            assertThat(hits[0].getSourceAsMap().get("message"), equalTo("Updated calendars in running process"));
+        });
+
         // write some buckets of data
         postData(
             job.getId(),
@@ -416,12 +448,14 @@ public class ScheduledEventsIT extends MlNativeAutodetectIntegTestCase {
         GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(job.getId());
         List<Bucket> buckets = getBuckets(getBucketsRequest);
 
-        // 1st and 2nd buckets have the event but the last one does not
-        assertEquals(1, buckets.get(0).getScheduledEvents().size());
-        assertEquals("Some Event", buckets.get(0).getScheduledEvents().get(0));
-        assertEquals(1, buckets.get(1).getScheduledEvents().size());
-        assertEquals("Some Event", buckets.get(1).getScheduledEvents().get(0));
+        // 1st and 2nd buckets have the first event
+        // 5th and 6th buckets have the second event
+        assertThat(buckets.get(0).getScheduledEvents(), contains("Pre open Event"));
+        assertThat(buckets.get(1).getScheduledEvents(), contains("Pre open Event"));
         assertEquals(0, buckets.get(2).getScheduledEvents().size());
+        assertEquals(0, buckets.get(3).getScheduledEvents().size());
+        assertThat(buckets.get(4).getScheduledEvents(), contains("Event added after job is opened"));
+        assertThat(buckets.get(5).getScheduledEvents(), contains("Event added after job is opened"));
     }
 
     private Job.Builder createJob(String jobId, TimeValue bucketSpan) {

+ 38 - 31
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.job;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.Version;
@@ -27,9 +28,6 @@ import org.elasticsearch.index.analysis.AnalysisRegistry;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
-import org.elasticsearch.xcontent.ToXContent;
-import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xcontent.XContentFactory;
 import org.elasticsearch.xpack.core.action.util.QueryPage;
 import org.elasticsearch.xpack.core.ml.MlConfigIndex;
 import org.elasticsearch.xpack.core.ml.MlTasks;
@@ -63,6 +61,7 @@ import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
 import org.elasticsearch.xpack.ml.utils.VoidChainTaskExecutor;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
@@ -426,28 +425,28 @@ public class JobManager {
 
     private void postJobUpdate(UpdateJobAction.Request request, Job updatedJob, ActionListener<PutJobAction.Response> actionListener) {
         // Autodetect must be updated if the fields that the C++ uses are changed
-        if (request.getJobUpdate().isAutodetectProcessUpdate()) {
-            JobUpdate jobUpdate = request.getJobUpdate();
+        JobUpdate jobUpdate = request.getJobUpdate();
+        if (jobUpdate.isAutodetectProcessUpdate()) {
             if (isJobOpen(clusterService.state(), request.getJobId())) {
                 updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate), ActionListener.wrap(isUpdated -> {
                     if (isUpdated) {
                         auditJobUpdatedIfNotInternal(request);
+                    } else {
+                        logger.error("[{}] Updating autodetect failed for job update [{}]", jobUpdate.getJobId(), jobUpdate);
                     }
                 }, e -> {
-                    // No need to do anything
+                    logger.error(
+                        new ParameterizedMessage(
+                            "[{}] Updating autodetect failed with an exception, job update [{}] ",
+                            jobUpdate.getJobId(),
+                            jobUpdate
+                        ),
+                        e
+                    );
                 }));
             }
         } else {
-            logger.debug("[{}] No process update required for job update: {}", request::getJobId, () -> {
-                try {
-                    XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
-                    request.getJobUpdate().toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
-                    return Strings.toString(jsonBuilder);
-                } catch (IOException e) {
-                    return "(unprintable due to " + e.getMessage() + ")";
-                }
-            });
-
+            logger.debug("[{}] No process update required for job update: {}", jobUpdate::getJobId, jobUpdate::toString);
             auditJobUpdatedIfNotInternal(request);
         }
 
@@ -610,32 +609,40 @@ public class JobManager {
             return;
         }
 
+        boolean appliesToAllJobs = calendarJobIds.stream().anyMatch(Metadata.ALL::equals);
+        if (appliesToAllJobs) {
+            submitJobEventUpdate(openJobIds, updateListener);
+            return;
+        }
+
         // calendarJobIds may be a group or job
         jobConfigProvider.expandGroupIds(
             calendarJobIds,
             ActionListener.wrap(expandedIds -> threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
-                // Merge the expended group members with the request Ids.
+                // Merge the expanded group members with the request Ids.
                 // Ids that aren't jobs will be filtered by isJobOpen()
                 expandedIds.addAll(calendarJobIds);
 
-                for (String jobId : expandedIds) {
-                    if (isJobOpen(clusterState, jobId)) {
-                        updateJobProcessNotifier.submitJobUpdate(
-                            UpdateParams.scheduledEventsUpdate(jobId),
-                            ActionListener.wrap(isUpdated -> {
-                                if (isUpdated) {
-                                    auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS));
-                                }
-                            }, e -> logger.error("[" + jobId + "] failed submitting process update on calendar change", e))
-                        );
-                    }
-                }
-
-                updateListener.onResponse(Boolean.TRUE);
+                openJobIds.retainAll(expandedIds);
+                submitJobEventUpdate(openJobIds, updateListener);
             }), updateListener::onFailure)
         );
     }
 
+    private void submitJobEventUpdate(Collection<String> jobIds, ActionListener<Boolean> updateListener) {
+        for (String jobId : jobIds) {
+            updateJobProcessNotifier.submitJobUpdate(
+                UpdateParams.scheduledEventsUpdate(jobId),
+                ActionListener.wrap(
+                    isUpdated -> { auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_CALENDARS_UPDATED_ON_PROCESS)); },
+                    e -> logger.error("[" + jobId + "] failed submitting process update on calendar change", e)
+                )
+            );
+        }
+
+        updateListener.onResponse(Boolean.TRUE);
+    }
+
     public void revertSnapshot(
         RevertModelSnapshotAction.Request request,
         ActionListener<RevertModelSnapshotAction.Response> actionListener,