Browse Source

Using the credentials of the user who calls reindex data stream (#117938) (#118149)

Keith Massey 10 months ago
parent
commit
ca99157d33

+ 3 - 1
x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java

@@ -20,6 +20,7 @@ import org.elasticsearch.persistent.PersistentTasksService;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamRequest;
 import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamResponse;
 import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTask;
@@ -72,7 +73,8 @@ public class ReindexDataStreamTransportAction extends HandledTransportAction<Rei
             sourceDataStreamName,
             transportService.getThreadPool().absoluteTimeInMillis(),
             totalIndices,
-            totalIndicesToBeUpgraded
+            totalIndicesToBeUpgraded,
+            ClientHelper.getPersistableSafeSecurityHeaders(transportService.getThreadPool().getThreadContext(), clusterService.state())
         );
         String persistentTaskId = getPersistentTaskId(sourceDataStreamName);
         persistentTasksService.sendStartRequest(

+ 40 - 0
x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ExecuteWithHeadersClient.java

@@ -0,0 +1,40 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.migrate.task;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.client.internal.support.AbstractClient;
+import org.elasticsearch.xpack.core.ClientHelper;
+
+import java.util.Map;
+
+public class ExecuteWithHeadersClient extends AbstractClient {
+
+    private final Client client;
+    private final Map<String, String> headers;
+
+    public ExecuteWithHeadersClient(Client client, Map<String, String> headers) {
+        super(client.settings(), client.threadPool());
+        this.client = client;
+        this.headers = headers;
+    }
+
+    @Override
+    protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
+        ActionType<Response> action,
+        Request request,
+        ActionListener<Response> listener
+    ) {
+        ClientHelper.executeWithHeadersAsync(headers, null, client, action, request, listener);
+    }
+
+}

+ 2 - 1
x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamPersistentTaskExecutor.java

@@ -66,7 +66,8 @@ public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExec
         GetDataStreamAction.Request request = new GetDataStreamAction.Request(TimeValue.MAX_VALUE, new String[] { sourceDataStream });
         assert task instanceof ReindexDataStreamTask;
         final ReindexDataStreamTask reindexDataStreamTask = (ReindexDataStreamTask) task;
-        client.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> {
+        ExecuteWithHeadersClient reindexClient = new ExecuteWithHeadersClient(client, params.headers());
+        reindexClient.execute(GetDataStreamAction.INSTANCE, request, ActionListener.wrap(response -> {
             List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = response.getDataStreams();
             if (dataStreamInfos.size() == 1) {
                 List<Index> indices = dataStreamInfos.get(0).getDataStream().getIndices();

+ 42 - 8
x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTaskParams.java

@@ -9,41 +9,65 @@ package org.elasticsearch.xpack.migrate.task;
 
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.TransportVersions;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.persistent.PersistentTaskParams;
 import org.elasticsearch.xcontent.ConstructingObjectParser;
+import org.elasticsearch.xcontent.ObjectParser;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentParser;
 
 import java.io.IOException;
+import java.util.Map;
 
 import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
 
-public record ReindexDataStreamTaskParams(String sourceDataStream, long startTime, int totalIndices, int totalIndicesToBeUpgraded)
-    implements
-        PersistentTaskParams {
+public record ReindexDataStreamTaskParams(
+    String sourceDataStream,
+    long startTime,
+    int totalIndices,
+    int totalIndicesToBeUpgraded,
+    Map<String, String> headers
+) implements PersistentTaskParams {
+
+    private static final String API_CONTEXT = Metadata.XContentContext.API.toString();
 
     public static final String NAME = ReindexDataStreamTask.TASK_NAME;
     private static final String SOURCE_DATA_STREAM_FIELD = "source_data_stream";
     private static final String START_TIME_FIELD = "start_time";
     private static final String TOTAL_INDICES_FIELD = "total_indices";
     private static final String TOTAL_INDICES_TO_BE_UPGRADED_FIELD = "total_indices_to_be_upgraded";
+    private static final String HEADERS_FIELD = "headers";
+    @SuppressWarnings("unchecked")
     private static final ConstructingObjectParser<ReindexDataStreamTaskParams, Void> PARSER = new ConstructingObjectParser<>(
         NAME,
         true,
-        args -> new ReindexDataStreamTaskParams((String) args[0], (long) args[1], (int) args[2], (int) args[3])
+        args -> new ReindexDataStreamTaskParams(
+            (String) args[0],
+            (long) args[1],
+            (int) args[2],
+            (int) args[3],
+            args[4] == null ? Map.of() : (Map<String, String>) args[4]
+        )
     );
     static {
         PARSER.declareString(constructorArg(), new ParseField(SOURCE_DATA_STREAM_FIELD));
         PARSER.declareLong(constructorArg(), new ParseField(START_TIME_FIELD));
         PARSER.declareInt(constructorArg(), new ParseField(TOTAL_INDICES_FIELD));
         PARSER.declareInt(constructorArg(), new ParseField(TOTAL_INDICES_TO_BE_UPGRADED_FIELD));
+        PARSER.declareField(
+            ConstructingObjectParser.optionalConstructorArg(),
+            XContentParser::mapStrings,
+            new ParseField(HEADERS_FIELD),
+            ObjectParser.ValueType.OBJECT
+        );
     }
 
+    @SuppressWarnings("unchecked")
     public ReindexDataStreamTaskParams(StreamInput in) throws IOException {
-        this(in.readString(), in.readLong(), in.readInt(), in.readInt());
+        this(in.readString(), in.readLong(), in.readInt(), in.readInt(), (Map<String, String>) in.readGenericValue());
     }
 
     @Override
@@ -62,16 +86,22 @@ public record ReindexDataStreamTaskParams(String sourceDataStream, long startTim
         out.writeLong(startTime);
         out.writeInt(totalIndices);
         out.writeInt(totalIndicesToBeUpgraded);
+        out.writeGenericValue(headers);
     }
 
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
-        return builder.startObject()
+        builder.startObject()
             .field(SOURCE_DATA_STREAM_FIELD, sourceDataStream)
             .field(START_TIME_FIELD, startTime)
             .field(TOTAL_INDICES_FIELD, totalIndices)
-            .field(TOTAL_INDICES_TO_BE_UPGRADED_FIELD, totalIndicesToBeUpgraded)
-            .endObject();
+            .field(TOTAL_INDICES_TO_BE_UPGRADED_FIELD, totalIndicesToBeUpgraded);
+        if (API_CONTEXT.equals(params.param(Metadata.CONTEXT_MODE_PARAM, API_CONTEXT)) == false) {
+            // This makes sure that we don't return the headers to an api request, like _cluster/state
+            builder.stringStringMap(HEADERS_FIELD, headers);
+        }
+        builder.endObject();
+        return builder;
     }
 
     public String getSourceDataStream() {
@@ -81,4 +111,8 @@ public record ReindexDataStreamTaskParams(String sourceDataStream, long startTim
     public static ReindexDataStreamTaskParams fromXContent(XContentParser parser) {
         return PARSER.apply(parser, null);
     }
+
+    public Map<String, String> getHeaders() {
+        return headers;
+    }
 }

+ 76 - 3
x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/task/ReindexDataStreamTaskParamsTests.java

@@ -7,11 +7,14 @@
 
 package org.elasticsearch.xpack.migrate.task;
 
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.test.AbstractXContentSerializingTestCase;
+import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.XContentType;
 import org.elasticsearch.xcontent.json.JsonXContent;
 
 import java.io.IOException;
@@ -29,7 +32,26 @@ public class ReindexDataStreamTaskParamsTests extends AbstractXContentSerializin
 
     @Override
     protected ReindexDataStreamTaskParams createTestInstance() {
-        return new ReindexDataStreamTaskParams(randomAlphaOfLength(50), randomLong(), randomNonNegativeInt(), randomNonNegativeInt());
+        return createTestInstance(randomBoolean());
+    }
+
+    @Override
+    protected ReindexDataStreamTaskParams createXContextTestInstance(XContentType xContentType) {
+        /*
+         * Since we filter out headers from xcontent in some cases, we can't use them in the standard xcontent round trip testing.
+         * Headers are covered in testToXContentContextMode
+         */
+        return createTestInstance(false);
+    }
+
+    private ReindexDataStreamTaskParams createTestInstance(boolean withHeaders) {
+        return new ReindexDataStreamTaskParams(
+            randomAlphaOfLength(50),
+            randomLong(),
+            randomNonNegativeInt(),
+            randomNonNegativeInt(),
+            getTestHeaders(withHeaders)
+        );
     }
 
     @Override
@@ -38,14 +60,16 @@ public class ReindexDataStreamTaskParamsTests extends AbstractXContentSerializin
         long startTime = instance.startTime();
         int totalIndices = instance.totalIndices();
         int totalIndicesToBeUpgraded = instance.totalIndicesToBeUpgraded();
-        switch (randomIntBetween(0, 3)) {
+        Map<String, String> headers = instance.headers();
+        switch (randomIntBetween(0, 4)) {
             case 0 -> sourceDataStream = randomAlphaOfLength(50);
             case 1 -> startTime = randomLong();
             case 2 -> totalIndices = totalIndices + 1;
             case 3 -> totalIndices = totalIndicesToBeUpgraded + 1;
+            case 4 -> headers = headers.isEmpty() ? getTestHeaders(true) : getTestHeaders();
             default -> throw new UnsupportedOperationException();
         }
-        return new ReindexDataStreamTaskParams(sourceDataStream, startTime, totalIndices, totalIndicesToBeUpgraded);
+        return new ReindexDataStreamTaskParams(sourceDataStream, startTime, totalIndices, totalIndicesToBeUpgraded, headers);
     }
 
     @Override
@@ -53,6 +77,18 @@ public class ReindexDataStreamTaskParamsTests extends AbstractXContentSerializin
         return ReindexDataStreamTaskParams.fromXContent(parser);
     }
 
+    private Map<String, String> getTestHeaders() {
+        return getTestHeaders(randomBoolean());
+    }
+
+    private Map<String, String> getTestHeaders(boolean nonEmpty) {
+        if (nonEmpty) {
+            return Map.of(randomAlphaOfLength(20), randomAlphaOfLength(30));
+        } else {
+            return Map.of();
+        }
+    }
+
     public void testToXContent() throws IOException {
         ReindexDataStreamTaskParams params = createTestInstance();
         try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent)) {
@@ -65,4 +101,41 @@ public class ReindexDataStreamTaskParamsTests extends AbstractXContentSerializin
             }
         }
     }
+
+    public void testToXContentContextMode() throws IOException {
+        ReindexDataStreamTaskParams params = createTestInstance(true);
+
+        // We do not expect to get headers if the "content_mode" is "api"
+        try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent)) {
+            builder.humanReadable(true);
+            ToXContent.Params xContentParams = new ToXContent.MapParams(
+                Map.of(Metadata.CONTEXT_MODE_PARAM, Metadata.XContentContext.API.toString())
+            );
+            params.toXContent(builder, xContentParams);
+            try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) {
+                Map<String, Object> parserMap = parser.map();
+                assertThat(parserMap.get("source_data_stream"), equalTo(params.sourceDataStream()));
+                assertThat(((Number) parserMap.get("start_time")).longValue(), equalTo(params.startTime()));
+                assertThat(parserMap.containsKey("headers"), equalTo(false));
+            }
+        }
+
+        // We do expect to get headers if the "content_mode" is anything but "api"
+        try (XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent)) {
+            builder.humanReadable(true);
+            ToXContent.Params xContentParams = new ToXContent.MapParams(
+                Map.of(
+                    Metadata.CONTEXT_MODE_PARAM,
+                    randomFrom(Metadata.XContentContext.GATEWAY.toString(), Metadata.XContentContext.SNAPSHOT.toString())
+                )
+            );
+            params.toXContent(builder, xContentParams);
+            try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) {
+                Map<String, Object> parserMap = parser.map();
+                assertThat(parserMap.get("source_data_stream"), equalTo(params.sourceDataStream()));
+                assertThat(((Number) parserMap.get("start_time")).longValue(), equalTo(params.startTime()));
+                assertThat(parserMap.get("headers"), equalTo(params.getHeaders()));
+            }
+        }
+    }
 }