Przeglądaj źródła

[ML] Include authorization info in responses when creating jobs (#87950)

https://github.com/elastic/elasticsearch/pull/87884 added
authorization information to the datafeed and data frame
analytics job configs returned by listing them, but not to
the ones returned from creating or updating them. For
consistency it's best that the same fields are present in
both places.
David Roberts 3 lat temu
rodzic
commit
ced9e5ca7c

+ 14 - 10
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java

@@ -13,6 +13,7 @@ import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.delete.DeleteResponse;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.xpack.core.ClientHelper;
@@ -76,18 +77,19 @@ public class DatafeedConfigProviderIT extends MlSingleNodeTestCase {
     public void testCrud() throws InterruptedException {
         String datafeedId = "df1";
 
-        AtomicReference<IndexResponse> indexResponseHolder = new AtomicReference<>();
+        AtomicReference<Tuple<DatafeedConfig, IndexResponse>> responseHolder = new AtomicReference<>();
         AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
 
         // Create datafeed config
         DatafeedConfig.Builder config = createDatafeedConfig(datafeedId, "j1");
         blockingCall(
             actionListener -> datafeedConfigProvider.putDatafeedConfig(config.build(), createSecurityHeader(), actionListener),
-            indexResponseHolder,
+            responseHolder,
             exceptionHolder
         );
         assertNull(exceptionHolder.get());
-        assertEquals(RestStatus.CREATED, indexResponseHolder.get().status());
+        assertEquals(RestStatus.CREATED, responseHolder.get().v2().status());
+        assertThat(responseHolder.get().v1().getHeaders(), not(anEmptyMap()));
 
         // Read datafeed config
         AtomicReference<DatafeedConfig.Builder> configBuilderHolder = new AtomicReference<>();
@@ -162,27 +164,27 @@ public class DatafeedConfigProviderIT extends MlSingleNodeTestCase {
     public void testMultipleCreateAndDeletes() throws InterruptedException {
         String datafeedId = "df2";
 
-        AtomicReference<IndexResponse> indexResponseHolder = new AtomicReference<>();
+        AtomicReference<Tuple<DatafeedConfig, IndexResponse>> responseHolder = new AtomicReference<>();
         AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
 
         // Create datafeed config
         DatafeedConfig.Builder config = createDatafeedConfig(datafeedId, "j1");
         blockingCall(
             actionListener -> datafeedConfigProvider.putDatafeedConfig(config.build(), Collections.emptyMap(), actionListener),
-            indexResponseHolder,
+            responseHolder,
             exceptionHolder
         );
         assertNull(exceptionHolder.get());
-        assertEquals(RestStatus.CREATED, indexResponseHolder.get().status());
+        assertEquals(RestStatus.CREATED, responseHolder.get().v2().status());
 
         // cannot create another with the same id
-        indexResponseHolder.set(null);
+        responseHolder.set(null);
         blockingCall(
             actionListener -> datafeedConfigProvider.putDatafeedConfig(config.build(), Collections.emptyMap(), actionListener),
-            indexResponseHolder,
+            responseHolder,
             exceptionHolder
         );
-        assertNull(indexResponseHolder.get());
+        assertNull(responseHolder.get());
         assertThat(exceptionHolder.get(), instanceOf(ResourceAlreadyExistsException.class));
         assertEquals("A datafeed with id [df2] already exists", exceptionHolder.get().getMessage());
 
@@ -567,7 +569,9 @@ public class DatafeedConfigProviderIT extends MlSingleNodeTestCase {
     private DatafeedConfig putDatafeedConfig(DatafeedConfig.Builder builder, Map<String, String> headers) throws Exception {
         builder.setHeaders(headers);
         DatafeedConfig config = builder.build();
-        this.<IndexResponse>blockingCall(actionListener -> datafeedConfigProvider.putDatafeedConfig(config, headers, actionListener));
+        this.<Tuple<DatafeedConfig, IndexResponse>>blockingCall(
+            actionListener -> datafeedConfigProvider.putDatafeedConfig(config, headers, actionListener)
+        );
         return config;
     }
 }

+ 12 - 10
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java

@@ -188,7 +188,7 @@ public class TransportPutDataFrameAnalyticsAction extends TransportMasterNodeAct
                 threadPool.getThreadContext().getHeaders(),
                 masterNodeTimeout,
                 ActionListener.wrap(
-                    unused -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(preparedForPutConfig)),
+                    finalConfig -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(finalConfig)),
                     listener::onFailure
                 )
             );
@@ -208,7 +208,7 @@ public class TransportPutDataFrameAnalyticsAction extends TransportMasterNodeAct
                 threadPool.getThreadContext().getHeaders(),
                 masterNodeTimeout,
                 ActionListener.wrap(
-                    unused -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)),
+                    finalConfig -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(finalConfig)),
                     listener::onFailure
                 )
             );
@@ -238,10 +238,18 @@ public class TransportPutDataFrameAnalyticsAction extends TransportMasterNodeAct
         TimeValue masterNodeTimeout,
         ActionListener<DataFrameAnalyticsConfig> listener
     ) {
+        ActionListener<DataFrameAnalyticsConfig> auditingListener = ActionListener.wrap(finalConfig -> {
+            auditor.info(
+                finalConfig.getId(),
+                Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_CREATED, finalConfig.getAnalysis().getWriteableName())
+            );
+            listener.onResponse(finalConfig);
+        }, listener::onFailure);
+
         ClusterState clusterState = clusterService.state();
         if (clusterState == null) {
             logger.warn("Cannot update doc mapping because clusterState == null");
-            configProvider.put(config, headers, masterNodeTimeout, listener);
+            configProvider.put(config, headers, masterNodeTimeout, auditingListener);
             return;
         }
         ElasticsearchMappings.addDocMappingIfMissing(
@@ -250,13 +258,7 @@ public class TransportPutDataFrameAnalyticsAction extends TransportMasterNodeAct
             client,
             clusterState,
             masterNodeTimeout,
-            ActionListener.wrap(unused -> configProvider.put(config, headers, masterNodeTimeout, ActionListener.wrap(indexResponse -> {
-                auditor.info(
-                    config.getId(),
-                    Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_CREATED, config.getAnalysis().getWriteableName())
-                );
-                listener.onResponse(config);
-            }, listener::onFailure)), listener::onFailure)
+            ActionListener.wrap(unused -> configProvider.put(config, headers, masterNodeTimeout, auditingListener), listener::onFailure)
         );
     }
 

+ 13 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutJobAction.java

@@ -31,6 +31,8 @@ import org.elasticsearch.xpack.core.ml.MachineLearningField;
 import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
 import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
 import org.elasticsearch.xpack.core.ml.action.PutJobAction;
+import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
+import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.security.SecurityContext;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
 import org.elasticsearch.xpack.ml.job.JobManager;
@@ -98,8 +100,17 @@ public class TransportPutJobAction extends TransportMasterNodeAction<PutJobActio
                 licenseState,
                 securityContext,
                 threadPool,
-                ActionListener.wrap(
-                    createdDatafeed -> listener.onResponse(jobCreated),
+                ActionListener.wrap(createdDatafeed -> {
+                    // We might need to add the authorization info to the embedded datafeed config in the response
+                    if (createdDatafeed.getResponse().getHeaders().isEmpty()) {
+                        listener.onResponse(jobCreated);
+                    } else {
+                        Job.Builder finalJobBuilder = new Job.Builder(jobCreated.getResponse()).setDatafeed(
+                            new DatafeedConfig.Builder(createdDatafeed.getResponse())
+                        );
+                        listener.onResponse(new PutJobAction.Response(finalJobBuilder.build()));
+                    }
+                },
                     failed -> jobManager.deleteJob(
                         new DeleteJobAction.Request(request.getJobBuilder().getId()),
                         state,

+ 1 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java

@@ -297,10 +297,7 @@ public final class DatafeedManager {
         CheckedConsumer<Boolean, Exception> mappingsUpdated = ok -> datafeedConfigProvider.putDatafeedConfig(
             request.getDatafeed(),
             headers,
-            ActionListener.wrap(
-                indexResponse -> listener.onResponse(new PutDatafeedAction.Response(request.getDatafeed())),
-                listener::onFailure
-            )
+            ActionListener.wrap(response -> listener.onResponse(new PutDatafeedAction.Response(response.v1())), listener::onFailure)
         );
 
         CheckedConsumer<Boolean, Exception> validationOk = ok -> {

+ 29 - 15
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java

@@ -31,6 +31,7 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.engine.VersionConflictEngineException;
 import org.elasticsearch.index.query.BoolQueryBuilder;
@@ -104,38 +105,51 @@ public class DatafeedConfigProvider {
      * the config will not be overwritten.
      *
      * @param config The datafeed configuration
-     * @param listener Index response listener
+     * @param listener Listener that returns config augmented with security headers and index response
      */
-    public void putDatafeedConfig(DatafeedConfig config, Map<String, String> headers, ActionListener<IndexResponse> listener) {
+    public void putDatafeedConfig(
+        DatafeedConfig config,
+        Map<String, String> headers,
+        ActionListener<Tuple<DatafeedConfig, IndexResponse>> listener
+    ) {
 
-        if (headers.isEmpty() == false) {
+        DatafeedConfig finalConfig;
+        if (headers.isEmpty()) {
+            finalConfig = config;
+        } else {
             // Filter any values in headers that aren't security fields
-            config = new DatafeedConfig.Builder(config).setHeaders(
+            finalConfig = new DatafeedConfig.Builder(config).setHeaders(
                 ClientHelper.getPersistableSafeSecurityHeaders(headers, clusterService.state())
             ).build();
         }
 
-        final String datafeedId = config.getId();
+        final String datafeedId = finalConfig.getId();
 
         try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
-            XContentBuilder source = config.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
+            XContentBuilder source = finalConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
 
             IndexRequest indexRequest = new IndexRequest(MlConfigIndex.indexName()).id(DatafeedConfig.documentId(datafeedId))
                 .source(source)
                 .opType(DocWriteRequest.OpType.CREATE)
                 .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
 
-            executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(listener::onResponse, e -> {
-                if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
-                    // the dafafeed already exists
-                    listener.onFailure(ExceptionsHelper.datafeedAlreadyExists(datafeedId));
-                } else {
-                    listener.onFailure(e);
-                }
-            }));
+            executeAsyncWithOrigin(
+                client,
+                ML_ORIGIN,
+                IndexAction.INSTANCE,
+                indexRequest,
+                ActionListener.wrap(r -> listener.onResponse(Tuple.tuple(finalConfig, r)), e -> {
+                    if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
+                        // the datafeed already exists
+                        listener.onFailure(ExceptionsHelper.datafeedAlreadyExists(datafeedId));
+                    } else {
+                        listener.onFailure(e);
+                    }
+                })
+            );
 
         } catch (IOException e) {
-            listener.onFailure(new ElasticsearchParseException("Failed to serialise datafeed config with id [" + config.getId() + "]", e));
+            listener.onFailure(new ElasticsearchParseException("Failed to serialise datafeed config with id [" + datafeedId + "]", e));
         }
     }
 

+ 2 - 0
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml

@@ -64,6 +64,7 @@ setup:
   - match: { analyzed_fields: {"includes" : ["obj1.*", "obj2.*" ], "excludes": [] } }
   - is_true: create_time
   - is_true: version
+  - is_true: authorization.roles
 
   - do:
       ml.get_data_frame_analytics:
@@ -2058,6 +2059,7 @@ setup:
   - match: { model_memory_limit: "30mb" }
   - match: { allow_lazy_start: true }
   - match: { max_num_threads: 2 }
+  - is_true: authorization.roles
 
 ---
 "Test update given missing analytics":

+ 2 - 0
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/datafeeds_crud.yml

@@ -118,6 +118,7 @@ setup:
   - match: { scroll_size: 1000 }
   - is_true: query.match_all
   - match: { chunking_config: { mode: "auto" }}
+  - is_true: authorization.roles
 
 ---
 "Test put datafeed whose id is already taken":
@@ -210,6 +211,7 @@ setup:
   - match: { frequency: "2m" }
   - match: { query_delay: "0s" }
   - is_false: max_empty_searches
+  - is_true: authorization.roles
 
 ---
 "Test update datafeed to point to different job":

+ 1 - 0
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/jobs_crud.yml

@@ -1238,6 +1238,7 @@
   - is_true: datafeed_config
   - match: { datafeed_config.job_id: "jobs-crud-put-with-datafeed" }
   - match: { datafeed_config.datafeed_id: "jobs-crud-put-with-datafeed" }
+  - is_true: datafeed_config.authorization.roles
   - is_true: create_time
 
   - do: