Browse Source

Extract reindexing logic from transport action (#45024)

This commit extracts the reindexing logic from the transport action so
that it can be incorporated into the persistent reindex work without
requiring the usage of the client.
Tim Brooks 6 years ago
parent
commit
f57d67c56c
15 changed files with 531 additions and 445 deletions
  1. 10 8
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java
  2. 3 3
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java
  3. 121 0
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexValidator.java
  4. 368 0
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/Reindexer.java
  5. 1 3
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java
  6. 8 403
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java
  7. 5 5
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java
  8. 1 1
      modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java
  9. 2 2
      modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteBuildRestClientTests.java
  10. 2 2
      modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWhitelistTests.java
  11. 2 2
      modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java
  12. 4 4
      modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexRestClientSslTests.java
  13. 2 10
      modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java
  14. 1 1
      modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexSourceTargetValidationTests.java
  15. 1 1
      modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java

+ 10 - 8
modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java

@@ -36,6 +36,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.support.TransportAction;
 import org.elasticsearch.client.ParentTaskAssigningClient;
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -90,8 +91,9 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
     protected final BulkByScrollTask task;
     protected final WorkerBulkByScrollTaskState worker;
     protected final ThreadPool threadPool;
+    protected final ScriptService scriptService;
+    protected final ReindexSslConfig sslConfig;
 
-    protected final Action mainAction;
     /**
      * The request for this action. Named mainRequest because we create lots of <code>request</code> variables all representing child
      * requests of this mainRequest.
@@ -114,12 +116,13 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
     private final BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> scriptApplier;
     private int lastBatchSize;
 
-    public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourceDocumentVersions,
-                                           boolean needsSourceDocumentSeqNoAndPrimaryTerm, Logger logger, ParentTaskAssigningClient client,
-                                           ThreadPool threadPool, Action mainAction, Request mainRequest, 
-                                           ActionListener<BulkByScrollResponse> listener) {
-
+    AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourceDocumentVersions,
+                                    boolean needsSourceDocumentSeqNoAndPrimaryTerm, Logger logger, ParentTaskAssigningClient client,
+                                    ThreadPool threadPool, Request mainRequest, ActionListener<BulkByScrollResponse> listener,
+                                    @Nullable ScriptService scriptService, @Nullable ReindexSslConfig sslConfig) {
         this.task = task;
+        this.scriptService = scriptService;
+        this.sslConfig = sslConfig;
         if (!task.isWorker()) {
             throw new IllegalArgumentException("Given task [" + task.getId() + "] must have a child worker");
         }
@@ -128,7 +131,6 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
         this.logger = logger;
         this.client = client;
         this.threadPool = threadPool;
-        this.mainAction = mainAction;
         this.mainRequest = mainRequest;
         this.listener = listener;
         BackoffPolicy backoffPolicy = buildBackoffPolicy();
@@ -158,7 +160,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
         // The default script applier executes a no-op
         return (request, searchHit) -> request;
     }
-    
+
     /**
      * Build the {@link RequestWrapper} for a single search hit. This shouldn't handle
      * metadata or scripting. That will be handled by copyMetadata and

+ 3 - 3
modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java

@@ -32,9 +32,9 @@ import org.elasticsearch.threadpool.ThreadPool;
 public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest, TransportDeleteByQueryAction> {
 
     public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
-                                    ThreadPool threadPool, TransportDeleteByQueryAction action, DeleteByQueryRequest request,
-                                    ScriptService scriptService, ActionListener<BulkByScrollResponse> listener) {
-        super(task, false, true, logger, client, threadPool, action, request, listener);
+                                    ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService,
+                                    ActionListener<BulkByScrollResponse> listener) {
+        super(task, false, true, logger, client, threadPool, request, listener, scriptService, null);
     }
 
     @Override

+ 121 - 0
modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexValidator.java

@@ -0,0 +1,121 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.reindex;
+
+import org.apache.lucene.util.automaton.Automata;
+import org.apache.lucene.util.automaton.Automaton;
+import org.apache.lucene.util.automaton.CharacterRunAutomaton;
+import org.apache.lucene.util.automaton.MinimizationOperations;
+import org.apache.lucene.util.automaton.Operations;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.support.AutoCreateIndex;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.regex.Regex;
+import org.elasticsearch.common.settings.Settings;
+
+import java.util.List;
+
+class ReindexValidator {
+
+    private final CharacterRunAutomaton remoteWhitelist;
+    private final ClusterService clusterService;
+    private final IndexNameExpressionResolver resolver;
+    private final AutoCreateIndex autoCreateIndex;
+
+    ReindexValidator(Settings settings, ClusterService clusterService, IndexNameExpressionResolver resolver,
+                     AutoCreateIndex autoCreateIndex) {
+        this.remoteWhitelist = buildRemoteWhitelist(TransportReindexAction.REMOTE_CLUSTER_WHITELIST.get(settings));
+        this.clusterService = clusterService;
+        this.resolver = resolver;
+        this.autoCreateIndex = autoCreateIndex;
+    }
+
+    void initialValidation(ReindexRequest request) {
+        checkRemoteWhitelist(remoteWhitelist, request.getRemoteInfo());
+        ClusterState state = clusterService.state();
+        validateAgainstAliases(request.getSearchRequest(), request.getDestination(), request.getRemoteInfo(), resolver, autoCreateIndex,
+            state);
+    }
+
+    static void checkRemoteWhitelist(CharacterRunAutomaton whitelist, RemoteInfo remoteInfo) {
+        if (remoteInfo == null) {
+            return;
+        }
+        String check = remoteInfo.getHost() + ':' + remoteInfo.getPort();
+        if (whitelist.run(check)) {
+            return;
+        }
+        String whiteListKey = TransportReindexAction.REMOTE_CLUSTER_WHITELIST.getKey();
+        throw new IllegalArgumentException('[' + check + "] not whitelisted in " + whiteListKey);
+    }
+
+    /**
+     * Build the {@link CharacterRunAutomaton} that represents the reindex-from-remote whitelist and make sure that it doesn't whitelist
+     * the world.
+     */
+    static CharacterRunAutomaton buildRemoteWhitelist(List<String> whitelist) {
+        if (whitelist.isEmpty()) {
+            return new CharacterRunAutomaton(Automata.makeEmpty());
+        }
+        Automaton automaton = Regex.simpleMatchToAutomaton(whitelist.toArray(Strings.EMPTY_ARRAY));
+        automaton = MinimizationOperations.minimize(automaton, Operations.DEFAULT_MAX_DETERMINIZED_STATES);
+        if (Operations.isTotal(automaton)) {
+            throw new IllegalArgumentException("Refusing to start because whitelist " + whitelist + " accepts all addresses. "
+                + "This would allow users to reindex-from-remote any URL they like effectively having Elasticsearch make HTTP GETs "
+                + "for them.");
+        }
+        return new CharacterRunAutomaton(automaton);
+    }
+
+    /**
+     * Throws an ActionRequestValidationException if the request tries to index
+     * back into the same index or into an index that points to two indexes.
+     * This cannot be done during request validation because the cluster state
+     * isn't available then. Package private for testing.
+     */
+    static void validateAgainstAliases(SearchRequest source, IndexRequest destination, RemoteInfo remoteInfo,
+                                       IndexNameExpressionResolver indexNameExpressionResolver, AutoCreateIndex autoCreateIndex,
+                                       ClusterState clusterState) {
+        if (remoteInfo != null) {
+            return;
+        }
+        String target = destination.index();
+        if (false == autoCreateIndex.shouldAutoCreate(target, clusterState)) {
+            /*
+             * If we're going to autocreate the index we don't need to resolve
+             * it. This is the same sort of dance that TransportIndexRequest
+             * uses to decide to autocreate the index.
+             */
+            target = indexNameExpressionResolver.concreteWriteIndex(clusterState, destination).getName();
+        }
+        for (String sourceIndex : indexNameExpressionResolver.concreteIndexNames(clusterState, source)) {
+            if (sourceIndex.equals(target)) {
+                ActionRequestValidationException e = new ActionRequestValidationException();
+                e.addValidationError("reindex cannot write into an index its reading from [" + target + ']');
+                throw e;
+            }
+        }
+    }
+}

+ 368 - 0
modules/reindex/src/main/java/org/elasticsearch/index/reindex/Reindexer.java

@@ -0,0 +1,368 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.reindex;
+
+import org.apache.http.Header;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.message.BasicHeader;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.ParentTaskAssigningClient;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.lucene.uid.Versions;
+import org.elasticsearch.common.xcontent.DeprecationHandler;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.VersionType;
+import org.elasticsearch.index.mapper.VersionFieldMapper;
+import org.elasticsearch.index.reindex.remote.RemoteScrollableHitSource;
+import org.elasticsearch.script.Script;
+import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.synchronizedList;
+import static java.util.Objects.requireNonNull;
+import static org.elasticsearch.index.VersionType.INTERNAL;
+
+public class Reindexer {
+
+    private static final Logger logger = LogManager.getLogger(Reindexer.class);
+
+    private final ClusterService clusterService;
+    private final Client client;
+    private final ThreadPool threadPool;
+    private final ScriptService scriptService;
+    private final ReindexSslConfig reindexSslConfig;
+
+    Reindexer(ClusterService clusterService, Client client, ThreadPool threadPool, ScriptService scriptService,
+              ReindexSslConfig reindexSslConfig) {
+        this.clusterService = clusterService;
+        this.client = client;
+        this.threadPool = threadPool;
+        this.scriptService = scriptService;
+        this.reindexSslConfig = reindexSslConfig;
+    }
+
+    public void execute(BulkByScrollTask task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
+        BulkByScrollParallelizationHelper.startSlicedAction(request, task, ReindexAction.INSTANCE, listener, client,
+            clusterService.localNode(),
+            () -> {
+                ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task);
+                AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction(task, logger, assigningClient, threadPool,
+                    scriptService, reindexSslConfig, request, listener);
+                searchAction.start();
+            }
+        );
+
+    }
+
+    /**
+     * Build the {@link RestClient} used for reindexing from remote clusters.
+     * @param remoteInfo connection information for the remote cluster
+     * @param sslConfig configuration for potential outgoing HTTPS connections
+     * @param taskId the id of the current task. This is added to the thread name for easier tracking
+     * @param threadCollector a list in which we collect all the threads created by the client
+     */
+    static RestClient buildRestClient(RemoteInfo remoteInfo, ReindexSslConfig sslConfig, long taskId, List<Thread> threadCollector) {
+        Header[] clientHeaders = new Header[remoteInfo.getHeaders().size()];
+        int i = 0;
+        for (Map.Entry<String, String> header : remoteInfo.getHeaders().entrySet()) {
+            clientHeaders[i++] = new BasicHeader(header.getKey(), header.getValue());
+        }
+        final RestClientBuilder builder =
+            RestClient.builder(new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme()))
+                .setDefaultHeaders(clientHeaders)
+                .setRequestConfigCallback(c -> {
+                    c.setConnectTimeout(Math.toIntExact(remoteInfo.getConnectTimeout().millis()));
+                    c.setSocketTimeout(Math.toIntExact(remoteInfo.getSocketTimeout().millis()));
+                    return c;
+                })
+                .setHttpClientConfigCallback(c -> {
+                    // Enable basic auth if it is configured
+                    if (remoteInfo.getUsername() != null) {
+                        UsernamePasswordCredentials creds = new UsernamePasswordCredentials(remoteInfo.getUsername(),
+                            remoteInfo.getPassword());
+                        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+                        credentialsProvider.setCredentials(AuthScope.ANY, creds);
+                        c.setDefaultCredentialsProvider(credentialsProvider);
+                    }
+                    // Stick the task id in the thread name so we can track down tasks from stack traces
+                    AtomicInteger threads = new AtomicInteger();
+                    c.setThreadFactory(r -> {
+                        String name = "es-client-" + taskId + "-" + threads.getAndIncrement();
+                        Thread t = new Thread(r, name);
+                        threadCollector.add(t);
+                        return t;
+                    });
+                    // Limit ourselves to one reactor thread because for now the search process is single threaded.
+                    c.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
+                    c.setSSLStrategy(sslConfig.getStrategy());
+                    return c;
+                });
+        if (Strings.hasLength(remoteInfo.getPathPrefix()) && "/".equals(remoteInfo.getPathPrefix()) == false) {
+            builder.setPathPrefix(remoteInfo.getPathPrefix());
+        }
+        return builder.build();
+    }
+
+    /**
+     * Simple implementation of reindex using scrolling and bulk. There are tons
+     * of optimizations that can be done on certain types of reindex requests
+     * but this makes no attempt to do any of them so it can be as simple
+     * possible.
+     */
+    static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<ReindexRequest, TransportReindexAction> {
+
+        /**
+         * List of threads created by this process. Usually actions don't create threads in Elasticsearch. Instead they use the builtin
+         * {@link ThreadPool}s. But reindex-from-remote uses Elasticsearch's {@link RestClient} which doesn't use the
+         * {@linkplain ThreadPool}s because it uses httpasyncclient. It'd be a ton of trouble to work around creating those threads. So
+         * instead we let it create threads but we watch them carefully and assert that they are dead when the process is over.
+         */
+        private List<Thread> createdThreads = emptyList();
+
+        AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
+                                 ThreadPool threadPool, ScriptService scriptService, ReindexSslConfig sslConfig, ReindexRequest request,
+                                 ActionListener<BulkByScrollResponse> listener) {
+            super(task,
+                /*
+                 * We only need the source version if we're going to use it when write and we only do that when the destination request uses
+                 * external versioning.
+                 */
+                request.getDestination().versionType() != VersionType.INTERNAL,
+                false, logger, client, threadPool, request, listener, scriptService, sslConfig);
+        }
+
+        @Override
+        protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) {
+            if (mainRequest.getRemoteInfo() != null) {
+                RemoteInfo remoteInfo = mainRequest.getRemoteInfo();
+                createdThreads = synchronizedList(new ArrayList<>());
+                assert sslConfig != null : "Reindex ssl config must be set";
+                RestClient restClient = buildRestClient(remoteInfo, sslConfig, task.getId(), createdThreads);
+                return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry,
+                    this::onScrollResponse, this::finishHim,
+                    restClient, remoteInfo.getQuery(), mainRequest.getSearchRequest());
+            }
+            return super.buildScrollableResultSource(backoffPolicy);
+        }
+
+        @Override
+        protected void finishHim(Exception failure, List<BulkItemResponse.Failure> indexingFailures,
+                                 List<ScrollableHitSource.SearchFailure> searchFailures, boolean timedOut) {
+            super.finishHim(failure, indexingFailures, searchFailures, timedOut);
+            // A little extra paranoia so we log something if we leave any threads running
+            for (Thread thread : createdThreads) {
+                if (thread.isAlive()) {
+                    assert false: "Failed to properly stop client thread [" + thread.getName() + "]";
+                    logger.error("Failed to properly stop client thread [{}]", thread.getName());
+                }
+            }
+        }
+
+        @Override
+        public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> buildScriptApplier() {
+            Script script = mainRequest.getScript();
+            if (script != null) {
+                assert scriptService != null : "Script service must be set";
+                return new Reindexer.AsyncIndexBySearchAction.ReindexScriptApplier(worker, scriptService, script, script.getParams());
+            }
+            return super.buildScriptApplier();
+        }
+
+        @Override
+        protected RequestWrapper<IndexRequest> buildRequest(ScrollableHitSource.Hit doc) {
+            IndexRequest index = new IndexRequest();
+
+            // Copy the index from the request so we always write where it asked to write
+            index.index(mainRequest.getDestination().index());
+
+            // If the request override's type then the user wants all documents in that type. Otherwise keep the doc's type.
+            if (mainRequest.getDestination().type() == null) {
+                index.type(doc.getType());
+            } else {
+                index.type(mainRequest.getDestination().type());
+            }
+
+            /*
+             * Internal versioning can just use what we copied from the destination request. Otherwise we assume we're using external
+             * versioning and use the doc's version.
+             */
+            index.versionType(mainRequest.getDestination().versionType());
+            if (index.versionType() == INTERNAL) {
+                assert doc.getVersion() == -1 : "fetched version when we didn't have to";
+                index.version(mainRequest.getDestination().version());
+            } else {
+                index.version(doc.getVersion());
+            }
+
+            // id and source always come from the found doc. Scripts can change them but they operate on the index request.
+            index.id(doc.getId());
+
+            // the source xcontent type and destination could be different
+            final XContentType sourceXContentType = doc.getXContentType();
+            final XContentType mainRequestXContentType = mainRequest.getDestination().getContentType();
+            if (mainRequestXContentType != null && doc.getXContentType() != mainRequestXContentType) {
+                // we need to convert
+                try (InputStream stream = doc.getSource().streamInput();
+                     XContentParser parser = sourceXContentType.xContent()
+                         .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, stream);
+                     XContentBuilder builder = XContentBuilder.builder(mainRequestXContentType.xContent())) {
+                    parser.nextToken();
+                    builder.copyCurrentStructure(parser);
+                    index.source(BytesReference.bytes(builder), builder.contentType());
+                } catch (IOException e) {
+                    throw new UncheckedIOException("failed to convert hit from " + sourceXContentType + " to "
+                        + mainRequestXContentType, e);
+                }
+            } else {
+                index.source(doc.getSource(), doc.getXContentType());
+            }
+
+            /*
+             * The rest of the index request just has to be copied from the template. It may be changed later from scripts or the superclass
+             * here on out operates on the index request rather than the template.
+             */
+            index.routing(mainRequest.getDestination().routing());
+            index.setPipeline(mainRequest.getDestination().getPipeline());
+            // OpType is synthesized from version so it is handled when we copy version above.
+
+            return wrap(index);
+        }
+
+        /**
+         * Override the simple copy behavior to allow more fine grained control.
+         */
+        @Override
+        protected void copyRouting(RequestWrapper<?> request, String routing) {
+            String routingSpec = mainRequest.getDestination().routing();
+            if (routingSpec == null) {
+                super.copyRouting(request, routing);
+                return;
+            }
+            if (routingSpec.startsWith("=")) {
+                super.copyRouting(request, mainRequest.getDestination().routing().substring(1));
+                return;
+            }
+            switch (routingSpec) {
+                case "keep":
+                    super.copyRouting(request, routing);
+                    break;
+                case "discard":
+                    super.copyRouting(request, null);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unsupported routing command");
+            }
+        }
+
+        class ReindexScriptApplier extends ScriptApplier {
+
+            ReindexScriptApplier(WorkerBulkByScrollTaskState taskWorker, ScriptService scriptService, Script script,
+                                 Map<String, Object> params) {
+                super(taskWorker, scriptService, script, params);
+            }
+
+            /*
+             * Methods below here handle script updating the index request. They try
+             * to be pretty liberal with regards to types because script are often
+             * dynamically typed.
+             */
+
+            @Override
+            protected void scriptChangedIndex(RequestWrapper<?> request, Object to) {
+                requireNonNull(to, "Can't reindex without a destination index!");
+                request.setIndex(to.toString());
+            }
+
+            @Override
+            protected void scriptChangedType(RequestWrapper<?> request, Object to) {
+                requireNonNull(to, "Can't reindex without a destination type!");
+                request.setType(to.toString());
+            }
+
+            @Override
+            protected void scriptChangedId(RequestWrapper<?> request, Object to) {
+                request.setId(Objects.toString(to, null));
+            }
+
+            @Override
+            protected void scriptChangedVersion(RequestWrapper<?> request, Object to) {
+                if (to == null) {
+                    request.setVersion(Versions.MATCH_ANY);
+                    request.setVersionType(INTERNAL);
+                } else {
+                    request.setVersion(asLong(to, VersionFieldMapper.NAME));
+                }
+            }
+
+            @Override
+            protected void scriptChangedRouting(RequestWrapper<?> request, Object to) {
+                request.setRouting(Objects.toString(to, null));
+            }
+
+            private long asLong(Object from, String name) {
+                /*
+                 * Stuffing a number into the map will have converted it to
+                 * some Number.
+                 * */
+                Number fromNumber;
+                try {
+                    fromNumber = (Number) from;
+                } catch (ClassCastException e) {
+                    throw new IllegalArgumentException(name + " may only be set to an int or a long but was [" + from + "]", e);
+                }
+                long l = fromNumber.longValue();
+                // Check that we didn't round when we fetched the value.
+                if (fromNumber.doubleValue() != l) {
+                    throw new IllegalArgumentException(name + " may only be set to an int or a long but was [" + from + "]");
+                }
+                return l;
+            }
+        }
+    }
+}

+ 1 - 3
modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java

@@ -24,7 +24,6 @@ import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.ParentTaskAssigningClient;
-import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -58,10 +57,9 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
         BulkByScrollParallelizationHelper.startSlicedAction(request, bulkByScrollTask, DeleteByQueryAction.INSTANCE, listener, client,
             clusterService.localNode(),
             () -> {
-                ClusterState state = clusterService.state();
                 ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
                     bulkByScrollTask);
-                new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, scriptService,
+                new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, request, scriptService,
                     listener).start();
             }
         );

+ 8 - 403
modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java

@@ -19,443 +19,48 @@
 
 package org.elasticsearch.index.reindex;
 
-import org.apache.http.Header;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.nio.reactor.IOReactorConfig;
-import org.apache.http.message.BasicHeader;
-import org.apache.logging.log4j.Logger;
-import org.apache.lucene.util.automaton.Automata;
-import org.apache.lucene.util.automaton.Automaton;
-import org.apache.lucene.util.automaton.CharacterRunAutomaton;
-import org.apache.lucene.util.automaton.MinimizationOperations;
-import org.apache.lucene.util.automaton.Operations;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ActionRequestValidationException;
-import org.elasticsearch.action.bulk.BackoffPolicy;
-import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.AutoCreateIndex;
 import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.client.ParentTaskAssigningClient;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.io.stream.Writeable;
-import org.elasticsearch.common.lucene.uid.Versions;
-import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.DeprecationHandler;
-import org.elasticsearch.common.xcontent.NamedXContentRegistry;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentParser;
-import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.index.VersionType;
-import org.elasticsearch.index.mapper.VersionFieldMapper;
-import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
-import org.elasticsearch.index.reindex.remote.RemoteScrollableHitSource;
-import org.elasticsearch.script.Script;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.UncheckedIOException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.BiFunction;
 import java.util.function.Function;
 
 import static java.util.Collections.emptyList;
-import static java.util.Collections.synchronizedList;
-import static java.util.Objects.requireNonNull;
-import static org.elasticsearch.index.VersionType.INTERNAL;
 
 public class TransportReindexAction extends HandledTransportAction<ReindexRequest, BulkByScrollResponse> {
     public static final Setting<List<String>> REMOTE_CLUSTER_WHITELIST =
             Setting.listSetting("reindex.remote.whitelist", emptyList(), Function.identity(), Property.NodeScope);
 
-    private final ThreadPool threadPool;
-    private final ClusterService clusterService;
-    private final ScriptService scriptService;
-    private final AutoCreateIndex autoCreateIndex;
-    private final Client client;
-    private final CharacterRunAutomaton remoteWhitelist;
-    private final IndexNameExpressionResolver indexNameExpressionResolver;
-
-    private final ReindexSslConfig sslConfig;
+    private final ReindexValidator reindexValidator;
+    private final Reindexer reindexer;
 
     @Inject
     public TransportReindexAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
             IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, ScriptService scriptService,
             AutoCreateIndex autoCreateIndex, Client client, TransportService transportService, ReindexSslConfig sslConfig) {
-        super(ReindexAction.NAME, transportService, actionFilters, (Writeable.Reader<ReindexRequest>)ReindexRequest::new);
-        this.threadPool = threadPool;
-        this.clusterService = clusterService;
-        this.scriptService = scriptService;
-        this.autoCreateIndex = autoCreateIndex;
-        this.client = client;
-        remoteWhitelist = buildRemoteWhitelist(REMOTE_CLUSTER_WHITELIST.get(settings));
-        this.indexNameExpressionResolver = indexNameExpressionResolver;
-        this.sslConfig = sslConfig;
+        super(ReindexAction.NAME, transportService, actionFilters, ReindexRequest::new);
+        this.reindexValidator = new ReindexValidator(settings, clusterService, indexNameExpressionResolver, autoCreateIndex);
+        this.reindexer = new Reindexer(clusterService, client, threadPool, scriptService, sslConfig);
+
     }
 
     @Override
     protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
-        checkRemoteWhitelist(remoteWhitelist, request.getRemoteInfo());
-        ClusterState state = clusterService.state();
-        validateAgainstAliases(request.getSearchRequest(), request.getDestination(), request.getRemoteInfo(),
-            indexNameExpressionResolver, autoCreateIndex, state);
-
+        reindexValidator.initialValidation(request);
         BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;
-
-        BulkByScrollParallelizationHelper.startSlicedAction(request, bulkByScrollTask, ReindexAction.INSTANCE, listener, client,
-            clusterService.localNode(),
-            () -> {
-                ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
-                    bulkByScrollTask);
-                new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, state,
-                    listener).start();
-            }
-        );
-    }
-
-    static void checkRemoteWhitelist(CharacterRunAutomaton whitelist, RemoteInfo remoteInfo) {
-        if (remoteInfo == null) {
-            return;
-        }
-        String check = remoteInfo.getHost() + ':' + remoteInfo.getPort();
-        if (whitelist.run(check)) {
-            return;
-        }
-        throw new IllegalArgumentException('[' + check + "] not whitelisted in " + REMOTE_CLUSTER_WHITELIST.getKey());
-    }
-
-    /**
-     * Build the {@link CharacterRunAutomaton} that represents the reindex-from-remote whitelist and make sure that it doesn't whitelist
-     * the world.
-     */
-    static CharacterRunAutomaton buildRemoteWhitelist(List<String> whitelist) {
-        if (whitelist.isEmpty()) {
-            return new CharacterRunAutomaton(Automata.makeEmpty());
-        }
-        Automaton automaton = Regex.simpleMatchToAutomaton(whitelist.toArray(Strings.EMPTY_ARRAY));
-        automaton = MinimizationOperations.minimize(automaton, Operations.DEFAULT_MAX_DETERMINIZED_STATES);
-        if (Operations.isTotal(automaton)) {
-            throw new IllegalArgumentException("Refusing to start because whitelist " + whitelist + " accepts all addresses. "
-                    + "This would allow users to reindex-from-remote any URL they like effectively having Elasticsearch make HTTP GETs "
-                    + "for them.");
-        }
-        return new CharacterRunAutomaton(automaton);
-    }
-
-    /**
-     * Throws an ActionRequestValidationException if the request tries to index
-     * back into the same index or into an index that points to two indexes.
-     * This cannot be done during request validation because the cluster state
-     * isn't available then. Package private for testing.
-     */
-    static void validateAgainstAliases(SearchRequest source, IndexRequest destination, RemoteInfo remoteInfo,
-                                         IndexNameExpressionResolver indexNameExpressionResolver, AutoCreateIndex autoCreateIndex,
-                                         ClusterState clusterState) {
-        if (remoteInfo != null) {
-            return;
-        }
-        String target = destination.index();
-        if (false == autoCreateIndex.shouldAutoCreate(target, clusterState)) {
-            /*
-             * If we're going to autocreate the index we don't need to resolve
-             * it. This is the same sort of dance that TransportIndexRequest
-             * uses to decide to autocreate the index.
-             */
-            target = indexNameExpressionResolver.concreteWriteIndex(clusterState, destination).getName();
-        }
-        for (String sourceIndex : indexNameExpressionResolver.concreteIndexNames(clusterState, source)) {
-            if (sourceIndex.equals(target)) {
-                ActionRequestValidationException e = new ActionRequestValidationException();
-                e.addValidationError("reindex cannot write into an index its reading from [" + target + ']');
-                throw e;
-            }
-        }
-    }
-
-    /**
-     * Build the {@link RestClient} used for reindexing from remote clusters.
-     * @param remoteInfo connection information for the remote cluster
-     * @param sslConfig configuration for potential outgoing HTTPS connections
-     * @param taskId the id of the current task. This is added to the thread name for easier tracking
-     * @param threadCollector a list in which we collect all the threads created by the client
-     */
-    static RestClient buildRestClient(RemoteInfo remoteInfo, ReindexSslConfig sslConfig, long taskId, List<Thread> threadCollector) {
-        Header[] clientHeaders = new Header[remoteInfo.getHeaders().size()];
-        int i = 0;
-        for (Map.Entry<String, String> header : remoteInfo.getHeaders().entrySet()) {
-            clientHeaders[i++] = new BasicHeader(header.getKey(), header.getValue());
-        }
-        final RestClientBuilder builder =
-            RestClient.builder(new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme()))
-            .setDefaultHeaders(clientHeaders)
-            .setRequestConfigCallback(c -> {
-                c.setConnectTimeout(Math.toIntExact(remoteInfo.getConnectTimeout().millis()));
-                c.setSocketTimeout(Math.toIntExact(remoteInfo.getSocketTimeout().millis()));
-                return c;
-            })
-            .setHttpClientConfigCallback(c -> {
-                // Enable basic auth if it is configured
-                if (remoteInfo.getUsername() != null) {
-                    UsernamePasswordCredentials creds = new UsernamePasswordCredentials(remoteInfo.getUsername(),
-                        remoteInfo.getPassword());
-                    CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
-                    credentialsProvider.setCredentials(AuthScope.ANY, creds);
-                    c.setDefaultCredentialsProvider(credentialsProvider);
-                }
-                // Stick the task id in the thread name so we can track down tasks from stack traces
-                AtomicInteger threads = new AtomicInteger();
-                c.setThreadFactory(r -> {
-                    String name = "es-client-" + taskId + "-" + threads.getAndIncrement();
-                    Thread t = new Thread(r, name);
-                    threadCollector.add(t);
-                    return t;
-                });
-                // Limit ourselves to one reactor thread because for now the search process is single threaded.
-                c.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
-                c.setSSLStrategy(sslConfig.getStrategy());
-                return c;
-            });
-        if (Strings.hasLength(remoteInfo.getPathPrefix()) && "/".equals(remoteInfo.getPathPrefix()) == false) {
-            builder.setPathPrefix(remoteInfo.getPathPrefix());
-        }
-        return builder.build();
-    }
-
-    /**
-     * Simple implementation of reindex using scrolling and bulk. There are tons
-     * of optimizations that can be done on certain types of reindex requests
-     * but this makes no attempt to do any of them so it can be as simple
-     * possible.
-     */
-    static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<ReindexRequest, TransportReindexAction> {
-        /**
-         * List of threads created by this process. Usually actions don't create threads in Elasticsearch. Instead they use the builtin
-         * {@link ThreadPool}s. But reindex-from-remote uses Elasticsearch's {@link RestClient} which doesn't use the
-         * {@linkplain ThreadPool}s because it uses httpasyncclient. It'd be a ton of trouble to work around creating those threads. So
-         * instead we let it create threads but we watch them carefully and assert that they are dead when the process is over.
-         */
-        private List<Thread> createdThreads = emptyList();
-
-        AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
-                ThreadPool threadPool, TransportReindexAction action, ReindexRequest request, ClusterState clusterState,
-                ActionListener<BulkByScrollResponse> listener) {
-            super(task,
-                /*
-                 * We only need the source version if we're going to use it when write and we only do that when the destination request uses
-                 * external versioning.
-                 */
-                request.getDestination().versionType() != VersionType.INTERNAL,
-                false, logger, client, threadPool, action, request, listener);
-        }
-
-        @Override
-        protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) {
-            if (mainRequest.getRemoteInfo() != null) {
-                RemoteInfo remoteInfo = mainRequest.getRemoteInfo();
-                createdThreads = synchronizedList(new ArrayList<>());
-                RestClient restClient = buildRestClient(remoteInfo, mainAction.sslConfig, task.getId(), createdThreads);
-                return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry,
-                    this::onScrollResponse, this::finishHim,
-                    restClient, remoteInfo.getQuery(), mainRequest.getSearchRequest());
-            }
-            return super.buildScrollableResultSource(backoffPolicy);
-        }
-
-        @Override
-        protected void finishHim(Exception failure, List<Failure> indexingFailures, List<SearchFailure> searchFailures, boolean timedOut) {
-            super.finishHim(failure, indexingFailures, searchFailures, timedOut);
-            // A little extra paranoia so we log something if we leave any threads running
-            for (Thread thread : createdThreads) {
-                if (thread.isAlive()) {
-                    assert false: "Failed to properly stop client thread [" + thread.getName() + "]";
-                    logger.error("Failed to properly stop client thread [{}]", thread.getName());
-                }
-            }
-        }
-
-        @Override
-        public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> buildScriptApplier() {
-            Script script = mainRequest.getScript();
-            if (script != null) {
-                return new ReindexScriptApplier(worker, mainAction.scriptService, script, script.getParams());
-            }
-            return super.buildScriptApplier();
-        }
-
-        @Override
-        protected RequestWrapper<IndexRequest> buildRequest(ScrollableHitSource.Hit doc) {
-            IndexRequest index = new IndexRequest();
-
-            // Copy the index from the request so we always write where it asked to write
-            index.index(mainRequest.getDestination().index());
-
-            // If the request override's type then the user wants all documents in that type. Otherwise keep the doc's type.
-            if (mainRequest.getDestination().type() == null) {
-                index.type(doc.getType());
-            } else {
-                index.type(mainRequest.getDestination().type());
-            }
-
-            /*
-             * Internal versioning can just use what we copied from the destination request. Otherwise we assume we're using external
-             * versioning and use the doc's version.
-             */
-            index.versionType(mainRequest.getDestination().versionType());
-            if (index.versionType() == INTERNAL) {
-                assert doc.getVersion() == -1 : "fetched version when we didn't have to";
-                index.version(mainRequest.getDestination().version());
-            } else {
-                index.version(doc.getVersion());
-            }
-
-            // id and source always come from the found doc. Scripts can change them but they operate on the index request.
-            index.id(doc.getId());
-
-            // the source xcontent type and destination could be different
-            final XContentType sourceXContentType = doc.getXContentType();
-            final XContentType mainRequestXContentType = mainRequest.getDestination().getContentType();
-            if (mainRequestXContentType != null && doc.getXContentType() != mainRequestXContentType) {
-                // we need to convert
-                try (InputStream stream = doc.getSource().streamInput();
-                     XContentParser parser = sourceXContentType.xContent()
-                         .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, stream);
-                     XContentBuilder builder = XContentBuilder.builder(mainRequestXContentType.xContent())) {
-                    parser.nextToken();
-                    builder.copyCurrentStructure(parser);
-                    index.source(BytesReference.bytes(builder), builder.contentType());
-                } catch (IOException e) {
-                    throw new UncheckedIOException("failed to convert hit from " + sourceXContentType + " to "
-                        + mainRequestXContentType, e);
-                }
-            } else {
-                index.source(doc.getSource(), doc.getXContentType());
-            }
-
-            /*
-             * The rest of the index request just has to be copied from the template. It may be changed later from scripts or the superclass
-             * here on out operates on the index request rather than the template.
-             */
-            index.routing(mainRequest.getDestination().routing());
-            index.setPipeline(mainRequest.getDestination().getPipeline());
-            // OpType is synthesized from version so it is handled when we copy version above.
-
-            return wrap(index);
-        }
-
-        /**
-         * Override the simple copy behavior to allow more fine grained control.
-         */
-        @Override
-        protected void copyRouting(RequestWrapper<?> request, String routing) {
-            String routingSpec = mainRequest.getDestination().routing();
-            if (routingSpec == null) {
-                super.copyRouting(request, routing);
-                return;
-            }
-            if (routingSpec.startsWith("=")) {
-                super.copyRouting(request, mainRequest.getDestination().routing().substring(1));
-                return;
-            }
-            switch (routingSpec) {
-            case "keep":
-                super.copyRouting(request, routing);
-                break;
-            case "discard":
-                super.copyRouting(request, null);
-                break;
-            default:
-                throw new IllegalArgumentException("Unsupported routing command");
-            }
-        }
-
-        class ReindexScriptApplier extends ScriptApplier {
-
-            ReindexScriptApplier(WorkerBulkByScrollTaskState taskWorker, ScriptService scriptService, Script script,
-                                 Map<String, Object> params) {
-                super(taskWorker, scriptService, script, params);
-            }
-
-            /*
-             * Methods below here handle script updating the index request. They try
-             * to be pretty liberal with regards to types because script are often
-             * dynamically typed.
-             */
-
-            @Override
-            protected void scriptChangedIndex(RequestWrapper<?> request, Object to) {
-                requireNonNull(to, "Can't reindex without a destination index!");
-                request.setIndex(to.toString());
-            }
-
-            @Override
-            protected void scriptChangedType(RequestWrapper<?> request, Object to) {
-                requireNonNull(to, "Can't reindex without a destination type!");
-                request.setType(to.toString());
-            }
-
-            @Override
-            protected void scriptChangedId(RequestWrapper<?> request, Object to) {
-                request.setId(Objects.toString(to, null));
-            }
-
-            @Override
-            protected void scriptChangedVersion(RequestWrapper<?> request, Object to) {
-                if (to == null) {
-                    request.setVersion(Versions.MATCH_ANY);
-                    request.setVersionType(INTERNAL);
-                } else {
-                    request.setVersion(asLong(to, VersionFieldMapper.NAME));
-                }
-            }
-
-            @Override
-            protected void scriptChangedRouting(RequestWrapper<?> request, Object to) {
-                request.setRouting(Objects.toString(to, null));
-            }
-
-            private long asLong(Object from, String name) {
-                /*
-                 * Stuffing a number into the map will have converted it to
-                 * some Number.
-                 * */
-                Number fromNumber;
-                try {
-                    fromNumber = (Number) from;
-                } catch (ClassCastException e) {
-                    throw new IllegalArgumentException(name + " may only be set to an int or a long but was [" + from + "]", e);
-                }
-                long l = fromNumber.longValue();
-                // Check that we didn't round when we fetched the value.
-                if (fromNumber.doubleValue() != l) {
-                    throw new IllegalArgumentException(name + " may only be set to an int or a long but was [" + from + "]");
-                }
-                return l;
-            }
-        }
+        reindexer.execute(bulkByScrollTask, request, listener);
     }
 }

+ 5 - 5
modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java

@@ -70,7 +70,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
                 ClusterState state = clusterService.state();
                 ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
                     bulkByScrollTask);
-                new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, state,
+                new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, scriptService, request, state,
                     listener).start();
             }
         );
@@ -82,19 +82,19 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
     static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<UpdateByQueryRequest, TransportUpdateByQueryAction> {
 
         AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
-                ThreadPool threadPool, TransportUpdateByQueryAction action, UpdateByQueryRequest request, ClusterState clusterState,
-                ActionListener<BulkByScrollResponse> listener) {
+                                 ThreadPool threadPool, ScriptService scriptService, UpdateByQueryRequest request,
+                                 ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
             super(task,
                 // use sequence number powered optimistic concurrency control
                 false, true,
-                logger, client, threadPool, action, request, listener);
+                logger, client, threadPool, request, listener, scriptService, null);
         }
 
         @Override
         public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> buildScriptApplier() {
             Script script = mainRequest.getScript();
             if (script != null) {
-                return new UpdateByQueryScriptApplier(worker, mainAction.scriptService, script, script.getParams());
+                return new UpdateByQueryScriptApplier(worker, scriptService, script, script.getParams());
             }
             return super.buildScriptApplier();
         }

+ 1 - 1
modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java

@@ -733,7 +733,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
         extends AbstractAsyncBulkByScrollAction<DummyAbstractBulkByScrollRequest, DummyTransportAsyncBulkByScrollAction> {
         DummyAsyncBulkByScrollAction() {
             super(testTask, randomBoolean(), randomBoolean(), AsyncBulkByScrollActionTests.this.logger,
-                new ParentTaskAssigningClient(client, localNode, testTask), client.threadPool(), null, testRequest, listener);
+                new ParentTaskAssigningClient(client, localNode, testTask), client.threadPool(), testRequest, listener, null, null);
         }
 
         @Override

+ 2 - 2
modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteBuildRestClientTests.java

@@ -49,7 +49,7 @@ public class ReindexFromRemoteBuildRestClientTests extends RestClientBuilderTest
                 RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
             long taskId = randomLong();
             List<Thread> threads = synchronizedList(new ArrayList<>());
-            RestClient client = TransportReindexAction.buildRestClient(remoteInfo, sslConfig(), taskId, threads);
+            RestClient client = Reindexer.buildRestClient(remoteInfo, sslConfig(), taskId, threads);
             try {
                 assertBusy(() -> assertThat(threads, hasSize(2)));
                 int i = 0;
@@ -73,7 +73,7 @@ public class ReindexFromRemoteBuildRestClientTests extends RestClientBuilderTest
             headers, RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
         long taskId = randomLong();
         List<Thread> threads = synchronizedList(new ArrayList<>());
-        RestClient client = TransportReindexAction.buildRestClient(remoteInfo, sslConfig(), taskId, threads);
+        RestClient client = Reindexer.buildRestClient(remoteInfo, sslConfig(), taskId, threads);
         try {
             assertHeaders(client, headers);
         } finally {

+ 2 - 2
modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWhitelistTests.java

@@ -31,8 +31,8 @@ import java.util.List;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
-import static org.elasticsearch.index.reindex.TransportReindexAction.buildRemoteWhitelist;
-import static org.elasticsearch.index.reindex.TransportReindexAction.checkRemoteWhitelist;
+import static org.elasticsearch.index.reindex.ReindexValidator.buildRemoteWhitelist;
+import static org.elasticsearch.index.reindex.ReindexValidator.checkRemoteWhitelist;
 
 /**
  * Tests the reindex-from-remote whitelist of remotes.

+ 2 - 2
modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java

@@ -74,10 +74,10 @@ public class ReindexMetadataTests extends AbstractAsyncBulkByScrollActionMetadat
         return new ReindexRequest();
     }
 
-    private class TestAction extends TransportReindexAction.AsyncIndexBySearchAction {
+    private class TestAction extends Reindexer.AsyncIndexBySearchAction {
         TestAction() {
             super(ReindexMetadataTests.this.task, ReindexMetadataTests.this.logger, null, ReindexMetadataTests.this.threadPool,
-                null, request(), null, listener());
+                null, null, request(), listener());
         }
 
         public ReindexRequest mainRequest() {

+ 4 - 4
modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexRestClientSslTests.java

@@ -124,7 +124,7 @@ public class ReindexRestClientSslTests extends ESTestCase {
             .build();
         final Environment environment = TestEnvironment.newEnvironment(settings);
         final ReindexSslConfig ssl = new ReindexSslConfig(settings, environment, mock(ResourceWatcherService.class));
-        try (RestClient client = TransportReindexAction.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
+        try (RestClient client = Reindexer.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
             expectThrows(SSLHandshakeException.class, () -> client.performRequest(new Request("GET", "/")));
         }
     }
@@ -139,7 +139,7 @@ public class ReindexRestClientSslTests extends ESTestCase {
             .build();
         final Environment environment = TestEnvironment.newEnvironment(settings);
         final ReindexSslConfig ssl = new ReindexSslConfig(settings, environment, mock(ResourceWatcherService.class));
-        try (RestClient client = TransportReindexAction.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
+        try (RestClient client = Reindexer.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
             final Response response = client.performRequest(new Request("GET", "/"));
             assertThat(response.getStatusLine().getStatusCode(), Matchers.is(200));
         }
@@ -155,7 +155,7 @@ public class ReindexRestClientSslTests extends ESTestCase {
             .build();
         final Environment environment = TestEnvironment.newEnvironment(settings);
         final ReindexSslConfig ssl = new ReindexSslConfig(settings, environment, mock(ResourceWatcherService.class));
-        try (RestClient client = TransportReindexAction.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
+        try (RestClient client = Reindexer.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
             final Response response = client.performRequest(new Request("GET", "/"));
             assertThat(response.getStatusLine().getStatusCode(), Matchers.is(200));
         }
@@ -185,7 +185,7 @@ public class ReindexRestClientSslTests extends ESTestCase {
         };
         final Environment environment = TestEnvironment.newEnvironment(settings);
         final ReindexSslConfig ssl = new ReindexSslConfig(settings, environment, mock(ResourceWatcherService.class));
-        try (RestClient client = TransportReindexAction.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
+        try (RestClient client = Reindexer.buildRestClient(getRemoteInfo(), ssl, 1L, threads)) {
             final Response response = client.performRequest(new Request("GET", "/"));
             assertThat(response.getStatusLine().getStatusCode(), Matchers.is(200));
             final Certificate[] certs = clientCertificates.get();

+ 2 - 10
modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java

@@ -20,14 +20,10 @@
 package org.elasticsearch.index.reindex;
 
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.common.lucene.uid.Versions;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.script.ScriptService;
-import org.elasticsearch.transport.TransportService;
 import org.mockito.Mockito;
 
-import java.util.Collections;
 import java.util.Map;
 
 import static org.hamcrest.Matchers.containsString;
@@ -107,12 +103,8 @@ public class ReindexScriptTests extends AbstractAsyncBulkByScrollActionScriptTes
     }
 
     @Override
-    protected TransportReindexAction.AsyncIndexBySearchAction action(ScriptService scriptService, ReindexRequest request) {
-        TransportService transportService = Mockito.mock(TransportService.class);
+    protected Reindexer.AsyncIndexBySearchAction action(ScriptService scriptService, ReindexRequest request) {
         ReindexSslConfig sslConfig = Mockito.mock(ReindexSslConfig.class);
-        TransportReindexAction transportAction = new TransportReindexAction(Settings.EMPTY, threadPool,
-            new ActionFilters(Collections.emptySet()), null, null, scriptService, null, null, transportService, sslConfig);
-        return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, threadPool, transportAction, request,
-            null, listener());
+        return new Reindexer.AsyncIndexBySearchAction(task, logger, null, threadPool, scriptService, sslConfig, request, listener());
     }
 }

+ 1 - 1
modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexSourceTargetValidationTests.java

@@ -126,7 +126,7 @@ public class ReindexSourceTargetValidationTests extends ESTestCase {
     }
 
     private void succeeds(RemoteInfo remoteInfo, String target, String... sources) {
-        TransportReindexAction.validateAgainstAliases(new SearchRequest(sources), new IndexRequest(target), remoteInfo,
+        ReindexValidator.validateAgainstAliases(new SearchRequest(sources), new IndexRequest(target), remoteInfo,
                 INDEX_NAME_EXPRESSION_RESOLVER, AUTO_CREATE_INDEX, STATE);
     }
 

+ 1 - 1
modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java

@@ -61,7 +61,7 @@ public class UpdateByQueryWithScriptTests
         TransportService transportService = mock(TransportService.class);
         TransportUpdateByQueryAction transportAction = new TransportUpdateByQueryAction(threadPool,
             new ActionFilters(Collections.emptySet()), null, transportService, scriptService, null);
-        return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, transportAction, request,
+        return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, scriptService, request,
                 ClusterState.EMPTY_STATE, listener());
     }
 }