Browse Source

Remove the ability to update datafeed's job_id. (#44752)

Przemysław Witek 6 years ago
parent
commit
3980cab461

+ 3 - 20
client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java

@@ -18,7 +18,6 @@
  */
 package org.elasticsearch.client.ml.datafeed;
 
-import org.elasticsearch.client.ml.job.config.Job;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
@@ -57,7 +56,6 @@ public class DatafeedUpdate implements ToXContentObject {
     static {
         PARSER.declareString(ConstructingObjectParser.constructorArg(), DatafeedConfig.ID);
 
-        PARSER.declareString(Builder::setJobId, Job.ID);
         PARSER.declareStringArray(Builder::setIndices, DatafeedConfig.INDEXES);
         PARSER.declareStringArray(Builder::setIndices, DatafeedConfig.INDICES);
         PARSER.declareString((builder, val) -> builder.setQueryDelay(
@@ -88,7 +86,6 @@ public class DatafeedUpdate implements ToXContentObject {
     }
 
     private final String id;
-    private final String jobId;
     private final TimeValue queryDelay;
     private final TimeValue frequency;
     private final List<String> indices;
@@ -99,11 +96,10 @@ public class DatafeedUpdate implements ToXContentObject {
     private final ChunkingConfig chunkingConfig;
     private final DelayedDataCheckConfig delayedDataCheckConfig;
 
-    private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, BytesReference query,
+    private DatafeedUpdate(String id, TimeValue queryDelay, TimeValue frequency, List<String> indices, BytesReference query,
                            BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields, Integer scrollSize,
                            ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) {
         this.id = id;
-        this.jobId = jobId;
         this.queryDelay = queryDelay;
         this.frequency = frequency;
         this.indices = indices;
@@ -126,7 +122,6 @@ public class DatafeedUpdate implements ToXContentObject {
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
         builder.field(DatafeedConfig.ID.getPreferredName(), id);
-        addOptionalField(builder, Job.ID, jobId);
         if (queryDelay != null) {
             builder.field(DatafeedConfig.QUERY_DELAY.getPreferredName(), queryDelay.getStringRep());
         }
@@ -162,10 +157,6 @@ public class DatafeedUpdate implements ToXContentObject {
         }
     }
 
-    public String getJobId() {
-        return jobId;
-    }
-
     public TimeValue getQueryDelay() {
         return queryDelay;
     }
@@ -228,7 +219,6 @@ public class DatafeedUpdate implements ToXContentObject {
         DatafeedUpdate that = (DatafeedUpdate) other;
 
         return Objects.equals(this.id, that.id)
-            && Objects.equals(this.jobId, that.jobId)
             && Objects.equals(this.frequency, that.frequency)
             && Objects.equals(this.queryDelay, that.queryDelay)
             && Objects.equals(this.indices, that.indices)
@@ -247,7 +237,7 @@ public class DatafeedUpdate implements ToXContentObject {
      */
     @Override
     public int hashCode() {
-        return Objects.hash(id, jobId, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields,
+        return Objects.hash(id, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields,
             chunkingConfig, delayedDataCheckConfig);
     }
 
@@ -258,7 +248,6 @@ public class DatafeedUpdate implements ToXContentObject {
     public static class Builder {
 
         private String id;
-        private String jobId;
         private TimeValue queryDelay;
         private TimeValue frequency;
         private List<String> indices;
@@ -275,7 +264,6 @@ public class DatafeedUpdate implements ToXContentObject {
 
         public Builder(DatafeedUpdate config) {
             this.id = config.id;
-            this.jobId = config.jobId;
             this.queryDelay = config.queryDelay;
             this.frequency = config.frequency;
             this.indices = config.indices;
@@ -287,11 +275,6 @@ public class DatafeedUpdate implements ToXContentObject {
             this.delayedDataCheckConfig = config.delayedDataCheckConfig;
         }
 
-        public Builder setJobId(String jobId) {
-            this.jobId = jobId;
-            return this;
-        }
-
         public Builder setIndices(List<String> indices) {
             this.indices = indices;
             return this;
@@ -364,7 +347,7 @@ public class DatafeedUpdate implements ToXContentObject {
         }
 
         public DatafeedUpdate build() {
-            return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize,
+            return new DatafeedUpdate(id, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize,
                 chunkingConfig, delayedDataCheckConfig);
         }
 

+ 0 - 25
client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java

@@ -463,31 +463,6 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
         assertThat(datafeedUpdate.getScrollSize(), equalTo(updatedDatafeed.getScrollSize()));
     }
 
-    public void testUpdateDatafeed_UpdatingJobIdIsDeprecated() throws Exception {
-        MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
-
-        String jobId = randomValidJobId();
-        Job job = buildJob(jobId);
-        execute(new PutJobRequest(job), machineLearningClient::putJob, machineLearningClient::putJobAsync);
-
-        String anotherJobId = randomValidJobId();
-        Job anotherJob = buildJob(anotherJobId);
-        execute(new PutJobRequest(anotherJob), machineLearningClient::putJob, machineLearningClient::putJobAsync);
-
-        String datafeedId = "datafeed-" + jobId;
-        DatafeedConfig datafeedConfig = DatafeedConfig.builder(datafeedId, jobId).setIndices("some_data_index").build();
-        execute(new PutDatafeedRequest(datafeedConfig), machineLearningClient::putDatafeed, machineLearningClient::putDatafeedAsync);
-
-        DatafeedUpdate datafeedUpdateWithChangedJobId = DatafeedUpdate.builder(datafeedId).setJobId(anotherJobId).build();
-        WarningFailureException exception = expectThrows(
-            WarningFailureException.class,
-            () -> execute(
-                new UpdateDatafeedRequest(datafeedUpdateWithChangedJobId),
-                machineLearningClient::updateDatafeed,
-                machineLearningClient::updateDatafeedAsync));
-        assertThat(exception.getResponse().getWarnings(), contains("The ability to update a datafeed's job_id is deprecated."));
-    }
-
     public void testGetDatafeed() throws Exception {
         String jobId1 = "test-get-datafeed-job-1";
         String jobId2 = "test-get-datafeed-job-2";

+ 0 - 3
client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdateTests.java

@@ -34,9 +34,6 @@ public class DatafeedUpdateTests extends AbstractXContentTestCase<DatafeedUpdate
 
     public static DatafeedUpdate createRandom() {
         DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(DatafeedConfigTests.randomValidDatafeedId());
-        if (randomBoolean()) {
-            builder.setJobId(randomAlphaOfLength(10));
-        }
         if (randomBoolean()) {
             builder.setQueryDelay(TimeValue.timeValueMillis(randomIntBetween(1, Integer.MAX_VALUE)));
         }

+ 4 - 7
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java

@@ -5,14 +5,12 @@
  */
 package org.elasticsearch.xpack.core.ml.datafeed;
 
-import org.apache.logging.log4j.LogManager;
 import org.elasticsearch.Version;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
-import org.elasticsearch.common.logging.DeprecationLogger;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.ObjectParser;
@@ -46,8 +44,7 @@ import java.util.stream.Collectors;
  */
 public class DatafeedUpdate implements Writeable, ToXContentObject {
 
-    private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(DatafeedUpdate.class));
-    private static final String DEPRECATION_MESSAGE_ON_JOB_ID_UPDATE = "The ability to update a datafeed's job_id is deprecated.";
+    static final String ERROR_MESSAGE_ON_JOB_ID_UPDATE = "Datafeed's job_id cannot be changed.";
 
     public static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("datafeed_update", Builder::new);
 
@@ -110,9 +107,6 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
         this.scrollSize = scrollSize;
         this.chunkingConfig = chunkingConfig;
         this.delayedDataCheckConfig = delayedDataCheckConfig;
-        if (jobId != null) {
-            deprecationLogger.deprecated(DEPRECATION_MESSAGE_ON_JOB_ID_UPDATE);
-        }
     }
 
     public DatafeedUpdate(StreamInput in) throws IOException {
@@ -298,6 +292,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
 
         DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedConfig);
         if (jobId != null) {
+            if (datafeedConfig.getJobId() != null && datafeedConfig.getJobId().equals(jobId) == false) {
+                throw ExceptionsHelper.badRequestException(ERROR_MESSAGE_ON_JOB_ID_UPDATE);
+            }
             builder.setJobId(jobId);
         }
         if (queryDelay != null) {

+ 1 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedActionRequestTests.java

@@ -30,7 +30,7 @@ public class UpdateDatafeedActionRequestTests extends AbstractSerializingTestCas
 
     @Override
     protected Request createTestInstance() {
-        return new Request(DatafeedUpdateTests.createRandomized(datafeedId, null, false));
+        return new Request(DatafeedUpdateTests.createRandomized(datafeedId));
     }
 
     @Override

+ 27 - 14
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java

@@ -5,6 +5,7 @@
  */
 package org.elasticsearch.xpack.core.ml.datafeed;
 
+import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.Version;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -23,6 +24,7 @@ import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.TermQueryBuilder;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.script.Script;
 import org.elasticsearch.search.SearchModule;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
@@ -77,14 +79,11 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
     }
 
     public static DatafeedUpdate createRandomized(String datafeedId) {
-        return createRandomized(datafeedId, null, true);
+        return createRandomized(datafeedId, null);
     }
 
-    public static DatafeedUpdate createRandomized(String datafeedId, @Nullable DatafeedConfig datafeed, boolean canSetJobId) {
+    public static DatafeedUpdate createRandomized(String datafeedId, @Nullable DatafeedConfig datafeed) {
         DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(datafeedId);
-        if (randomBoolean() && datafeed == null && canSetJobId) {
-            builder.setJobId(randomAlphaOfLength(10));
-        }
         if (randomBoolean()) {
             builder.setQueryDelay(TimeValue.timeValueMillis(randomIntBetween(1, Integer.MAX_VALUE)));
         }
@@ -197,6 +196,24 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
         expectThrows(IllegalArgumentException.class, () -> createRandomized(datafeed.getId() + "_2").apply(datafeed, null));
     }
 
+    public void testApply_failBecauseJobIdChanged() {
+        DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo");
+
+        DatafeedUpdate datafeedUpdateWithUnchangedJobId = new DatafeedUpdate.Builder(datafeed.getId())
+            .setJobId("foo")
+            .build();
+        DatafeedConfig updatedDatafeed = datafeedUpdateWithUnchangedJobId.apply(datafeed, Collections.emptyMap());
+        assertThat(updatedDatafeed, equalTo(datafeed));
+
+        DatafeedUpdate datafeedUpdateWithChangedJobId = new DatafeedUpdate.Builder(datafeed.getId())
+            .setJobId("bar")
+            .build();
+        ElasticsearchStatusException ex = expectThrows(
+            ElasticsearchStatusException.class, () -> datafeedUpdateWithChangedJobId.apply(datafeed, Collections.emptyMap()));
+        assertThat(ex.status(), equalTo(RestStatus.BAD_REQUEST));
+        assertThat(ex.getMessage(), equalTo(DatafeedUpdate.ERROR_MESSAGE_ON_JOB_ID_UPDATE));
+    }
+
     public void testApply_givenEmptyUpdate() {
         DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo");
         DatafeedConfig updatedDatafeed = new DatafeedUpdate.Builder(datafeed.getId()).build().apply(datafeed, Collections.emptyMap());
@@ -223,7 +240,6 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
         DatafeedConfig datafeed = datafeedBuilder.build();
         QueryProvider queryProvider = createRandomValidQueryProvider("a", "b");
         DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeed.getId());
-        update.setJobId("bar");
         update.setIndices(Collections.singletonList("i_2"));
         update.setQueryDelay(TimeValue.timeValueSeconds(42));
         update.setFrequency(TimeValue.timeValueSeconds(142));
@@ -235,7 +251,7 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
 
         DatafeedConfig updatedDatafeed = update.build().apply(datafeed, Collections.emptyMap());
 
-        assertThat(updatedDatafeed.getJobId(), equalTo("bar"));
+        assertThat(updatedDatafeed.getJobId(), equalTo("foo-feed"));
         assertThat(updatedDatafeed.getIndices(), equalTo(Collections.singletonList("i_2")));
         assertThat(updatedDatafeed.getQueryDelay(), equalTo(TimeValue.timeValueSeconds(42)));
         assertThat(updatedDatafeed.getFrequency(), equalTo(TimeValue.timeValueSeconds(142)));
@@ -276,9 +292,9 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
                 withoutAggs.setAggProvider(null);
                 datafeed = withoutAggs.build();
             }
-            DatafeedUpdate update = createRandomized(datafeed.getId(), datafeed, true);
+            DatafeedUpdate update = createRandomized(datafeed.getId(), datafeed);
             while (update.isNoop(datafeed)) {
-                update = createRandomized(datafeed.getId(), datafeed, true);
+                update = createRandomized(datafeed.getId(), datafeed);
             }
 
             DatafeedConfig updatedDatafeed = update.apply(datafeed, Collections.emptyMap());
@@ -339,12 +355,9 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
     @Override
     protected DatafeedUpdate mutateInstance(DatafeedUpdate instance) throws IOException {
         DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(instance);
-        switch (between(0, 9)) {
-        case 0:
-            builder.setId(instance.getId() + DatafeedConfigTests.randomValidDatafeedId());
-            break;
+        switch (between(1, 9)) {
         case 1:
-            builder.setJobId(instance.getJobId() + randomAlphaOfLength(5));
+            builder.setId(instance.getId() + DatafeedConfigTests.randomValidDatafeedId());
             break;
         case 2:
             if (instance.getQueryDelay() == null) {

+ 0 - 43
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java

@@ -10,7 +10,6 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
 import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.common.CheckedConsumer;
 import org.elasticsearch.common.CheckedRunnable;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@@ -137,48 +136,6 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
         openAndRunJob.run();
     }
 
-    public void testDatafeedTimingStats_DatafeedJobIdUpdated() throws Exception {
-        client().admin().indices().prepareCreate("data")
-            .addMapping("type", "time", "type=date")
-            .get();
-        long numDocs = randomIntBetween(32, 2048);
-        Instant now = Instant.now();
-        indexDocs(logger, "data", numDocs, now.minus(Duration.ofDays(14)).toEpochMilli(), now.toEpochMilli());
-
-        Job.Builder jobA = createScheduledJob("lookback-job-jobid-updated");
-        Job.Builder jobB = createScheduledJob("other-lookback-job-jobid-updated");
-        for (Job.Builder job : Arrays.asList(jobA, jobB)) {
-            registerJob(job);
-            putJob(job);
-        }
-
-        String datafeedId = "lookback-datafeed";
-        DatafeedConfig datafeedConfig = createDatafeed(datafeedId, jobA.getId(), Arrays.asList("data"));
-        registerDatafeed(datafeedConfig);
-        putDatafeed(datafeedConfig);
-
-        CheckedConsumer<Job.Builder, Exception> openAndRunJob = job -> {
-            openJob(job.getId());
-            assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
-            // Bind datafeedId to the current job on the list, timing stats are wiped out.
-            // Datafeed did not do anything yet, hence search_count is equal to 0.
-            assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), equalTo(0L));
-            startDatafeed(datafeedId, 0L, now.toEpochMilli());
-            assertBusy(() -> {
-                assertThat(getDataCounts(job.getId()).getProcessedRecordCount(), equalTo(numDocs));
-                // Datafeed processed numDocs documents so search_count must be greater than 0.
-                assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), greaterThan(0L));
-            }, 60, TimeUnit.SECONDS);
-            waitUntilJobIsClosed(job.getId());
-        };
-
-        openAndRunJob.accept(jobA);
-        updateDatafeed(new DatafeedUpdate.Builder(datafeedId).setJobId(jobB.getId()).build());  // wipes out timing stats
-        openAndRunJob.accept(jobB);
-        updateDatafeed(new DatafeedUpdate.Builder(datafeedId).setJobId(jobA.getId()).build());  // wipes out timing stats
-        openAndRunJob.accept(jobA);
-    }
-
     public void testDatafeedTimingStats_QueryDelayUpdated_TimingStatsNotReset() throws Exception {
         client().admin().indices().prepareCreate("data")
             .addMapping("type", "time", "type=date")

+ 8 - 67
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java

@@ -14,12 +14,10 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.CheckedConsumer;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
-import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -33,10 +31,8 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
 import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
 import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
-import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.Map;
 
 public class TransportUpdateDatafeedAction extends
@@ -91,69 +87,14 @@ public class TransportUpdateDatafeedAction extends
             return;
         }
 
-        String datafeedId = request.getUpdate().getId();
-
-        CheckedConsumer<BulkByScrollResponse, Exception> updateConsumer =
-            unused -> {
-                datafeedConfigProvider.updateDatefeedConfig(
-                    datafeedId,
-                    request.getUpdate(),
-                    headers,
-                    jobConfigProvider::validateDatafeedJob,
-                    ActionListener.wrap(
-                        updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)),
-                        listener::onFailure));
-            };
-
-        CheckedConsumer<Boolean, Exception> deleteTimingStatsAndUpdateConsumer =
-            unused -> {
-                datafeedConfigProvider.getDatafeedConfig(
-                    datafeedId,
-                    ActionListener.wrap(
-                        datafeedConfigBuilder -> {
-                            String jobId = datafeedConfigBuilder.build().getJobId();
-                            if (jobId.equals(request.getUpdate().getJobId())) {
-                                // Datafeed's jobId didn't change, no point in deleting datafeed timing stats.
-                                updateConsumer.accept(null);
-                            } else {
-                                JobDataDeleter jobDataDeleter = new JobDataDeleter(client, jobId);
-                                jobDataDeleter.deleteDatafeedTimingStats(ActionListener.wrap(updateConsumer, listener::onFailure));
-                            }
-                        },
-                        listener::onFailure));
-            };
-
-
-        if (request.getUpdate().getJobId() != null) {
-            checkJobDoesNotHaveADifferentDatafeed(
-                request.getUpdate().getJobId(), datafeedId, ActionListener.wrap(deleteTimingStatsAndUpdateConsumer, listener::onFailure));
-        } else {
-            updateConsumer.accept(null);
-        }
-    }
-
-    /*
-     * This is a check against changing the datafeed's jobId and that job
-     * already having a datafeed.
-     * The job the updated datafeed refers to should have no datafeed or
-     * if it does have a datafeed it must be the one we are updating
-     */
-    private void checkJobDoesNotHaveADifferentDatafeed(String jobId, String datafeedId, ActionListener<Boolean> listener) {
-        datafeedConfigProvider.findDatafeedsForJobIds(Collections.singletonList(jobId), ActionListener.wrap(
-                datafeedIds -> {
-                    if (datafeedIds.isEmpty()) {
-                        // Ok the job does not have a datafeed
-                        listener.onResponse(Boolean.TRUE);
-                    } else if (datafeedIds.size() == 1 && datafeedIds.contains(datafeedId)) {
-                        // Ok the job has the datafeed being updated
-                        listener.onResponse(Boolean.TRUE);
-                    } else {
-                        listener.onFailure(ExceptionsHelper.conflictStatusException("A datafeed [" + datafeedIds.iterator().next()
-                                + "] already exists for job [" + jobId + "]"));
-                    }
-                },
-                listener::onFailure
-        ));
+        datafeedConfigProvider.updateDatefeedConfig(
+            request.getUpdate().getId(),
+            request.getUpdate(),
+            headers,
+            jobConfigProvider::validateDatafeedJob,
+            ActionListener.wrap(
+                updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)),
+                listener::onFailure));
     }
 
     @Override

+ 13 - 10
x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml

@@ -203,9 +203,6 @@ setup:
 
 ---
 "Test update datafeed to point to different job":
-  - skip:
-      features: "warnings"
-
   - do:
       ml.put_datafeed:
         datafeed_id: test-datafeed-1
@@ -215,19 +212,25 @@ setup:
             "indexes":["index-foo"],
             "scroll_size": 2000
           }
+  - match: { job_id: "datafeeds-crud-1" }
+
+  - do:
+      ml.update_datafeed:
+        datafeed_id: test-datafeed-1
+        body:  >
+          {
+            "job_id": "datafeeds-crud-1"
+          }
+  - match: { job_id: "datafeeds-crud-1" }
 
   - do:
-      warnings:
-        - The ability to update a datafeed's job_id is deprecated.
+      catch: /Datafeed's job_id cannot be changed/
       ml.update_datafeed:
         datafeed_id: test-datafeed-1
         body:  >
           {
             "job_id": "datafeeds-crud-2"
           }
-  - match: { datafeed_id: "test-datafeed-1" }
-  - match: { job_id: "datafeeds-crud-2" }
-  - match: { indices: ["index-foo"] }
 
 ---
 "Test update datafeed with missing id":
@@ -252,7 +255,7 @@ setup:
           }
 
   - do:
-      catch: /resource_not_found_exception/
+      catch: /Datafeed's job_id cannot be changed/
       ml.update_datafeed:
         datafeed_id: test-datafeed-1
         body:  >
@@ -281,7 +284,7 @@ setup:
           }
 
   - do:
-      catch: /A datafeed \[test-datafeed-2\] already exists for job \[datafeeds-crud-2\]/
+      catch: /Datafeed's job_id cannot be changed/
       ml.update_datafeed:
         datafeed_id: test-datafeed-1
         body:  >