浏览代码

Consolidate some reindex utility classes (#22666)

Everything that extended `AbstractAsyncBulkByScrollAction` also
extended `AbstractAsyncBulkIndexByScrollAction` so this removes
`AbstractAsyncBulkIndexByScrollAction`, merging it into
`AbstractAsyncBulkByScrollAction`.
Nik Everett 8 年之前
父节点
当前提交
ee5f8c4522

+ 505 - 2
modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java

@@ -21,6 +21,7 @@ package org.elasticsearch.index.reindex;
 
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
@@ -30,27 +31,50 @@ import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.bulk.Retry;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.ParentTaskAssigningClient;
+import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.elasticsearch.index.VersionType;
+import org.elasticsearch.index.mapper.IdFieldMapper;
+import org.elasticsearch.index.mapper.IndexFieldMapper;
+import org.elasticsearch.index.mapper.ParentFieldMapper;
+import org.elasticsearch.index.mapper.RoutingFieldMapper;
+import org.elasticsearch.index.mapper.SourceFieldMapper;
+import org.elasticsearch.index.mapper.TypeFieldMapper;
+import org.elasticsearch.index.mapper.VersionFieldMapper;
 import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
+import org.elasticsearch.script.CompiledScript;
+import org.elasticsearch.script.ExecutableScript;
+import org.elasticsearch.script.Script;
+import org.elasticsearch.script.ScriptContext;
+import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.search.sort.SortBuilder;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
 
 import static java.lang.Math.max;
 import static java.lang.Math.min;
 import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
 import static java.util.Collections.unmodifiableList;
 import static org.elasticsearch.action.bulk.BackoffPolicy.exponentialBackoff;
 import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
@@ -66,6 +90,9 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
     protected final Logger logger;
     protected final WorkingBulkByScrollTask task;
     protected final ThreadPool threadPool;
+    protected final ScriptService scriptService;
+    protected final ClusterState clusterState;
+
     /**
      * The request for this action. Named mainRequest because we create lots of <code>request</code> variables all representing child
      * requests of this mainRequest.
@@ -80,17 +107,28 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
     private final Retry bulkRetry;
     private final ScrollableHitSource scrollSource;
 
+    /**
+     * This BiFunction is used to apply various changes depending of the Reindex action and  the search hit,
+     * from copying search hit metadata (parent, routing, etc) to potentially transforming the
+     * {@link RequestWrapper} completely.
+     */
+    private final BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> scriptApplier;
+
     public AbstractAsyncBulkByScrollAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
-                                           ThreadPool threadPool, Request mainRequest, ActionListener<BulkIndexByScrollResponse> listener) {
+            ThreadPool threadPool, Request mainRequest, ScriptService scriptService, ClusterState clusterState,
+            ActionListener<BulkIndexByScrollResponse> listener) {
         this.task = task;
         this.logger = logger;
         this.client = client;
         this.threadPool = threadPool;
+        this.scriptService = scriptService;
+        this.clusterState = clusterState;
         this.mainRequest = mainRequest;
         this.listener = listener;
         BackoffPolicy backoffPolicy = buildBackoffPolicy();
         bulkRetry = Retry.on(EsRejectedExecutionException.class).policy(BackoffPolicy.wrap(backoffPolicy, task::countBulkRetry));
         scrollSource = buildScrollableResultSource(backoffPolicy);
+        scriptApplier = Objects.requireNonNull(buildScriptApplier(), "script applier must not be null");
         /*
          * Default to sorting by doc. We can't do this in the request itself because it is normal to *add* to the sorts rather than replace
          * them and if we add _doc as the first sort by default then sorts will never work.... So we add it here, only if there isn't
@@ -103,12 +141,71 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
         mainRequest.getSearchRequest().source().version(needsSourceDocumentVersions());
     }
 
+    /**
+     * Build the {@link BiFunction} to apply to all {@link RequestWrapper}.
+     */
+    protected BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> buildScriptApplier() {
+        // The default script applier executes a no-op
+        return (request, searchHit) -> request;
+    }
+
     /**
      * Does this operation need the versions of the source documents?
      */
     protected abstract boolean needsSourceDocumentVersions();
 
-    protected abstract BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs);
+    /**
+     * Build the {@link RequestWrapper} for a single search hit. This shouldn't handle
+     * metadata or scripting. That will be handled by copyMetadata and
+     * apply functions that can be overridden.
+     */
+    protected abstract RequestWrapper<?> buildRequest(ScrollableHitSource.Hit doc);
+
+    /**
+     * Copies the metadata from a hit to the request.
+     */
+    protected RequestWrapper<?> copyMetadata(RequestWrapper<?> request, ScrollableHitSource.Hit doc) {
+        request.setParent(doc.getParent());
+        copyRouting(request, doc.getRouting());
+        return request;
+    }
+
+    /**
+     * Copy the routing from a search hit to the request.
+     */
+    protected void copyRouting(RequestWrapper<?> request, String routing) {
+        request.setRouting(routing);
+    }
+
+    /**
+     * Used to accept or ignore a search hit. Ignored search hits will be excluded
+     * from the bulk request. It is also where we fail on invalid search hits, like
+     * when the document has no source but it's required.
+     */
+    protected boolean accept(ScrollableHitSource.Hit doc) {
+        if (doc.getSource() == null) {
+            /*
+             * Either the document didn't store _source or we didn't fetch it for some reason. Since we don't allow the user to
+             * change the "fields" part of the search request it is unlikely that we got here because we didn't fetch _source.
+             * Thus the error message assumes that it wasn't stored.
+             */
+            throw new IllegalArgumentException("[" + doc.getIndex() + "][" + doc.getType() + "][" + doc.getId() + "] didn't store _source");
+        }
+        return true;
+    }
+
+    private BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs) {
+        BulkRequest bulkRequest = new BulkRequest();
+        for (ScrollableHitSource.Hit doc : docs) {
+            if (accept(doc)) {
+                RequestWrapper<?> request = scriptApplier.apply(copyMetadata(buildRequest(doc), doc), doc);
+                if (request != null) {
+                    bulkRequest.add(request.self());
+                }
+            }
+        }
+        return bulkRequest;
+    }
 
     protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) {
         return new ClientScrollableHitSource(logger, backoffPolicy, threadPool, task::countSearchRetry, this::finishHim, client,
@@ -397,4 +494,410 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
     void setScroll(String scroll) {
         scrollSource.setScroll(scroll);
     }
+
+    /**
+     * Wrapper for the {@link DocWriteRequest} that are used in this action class.
+     */
+    interface RequestWrapper<Self extends DocWriteRequest<Self>> {
+
+        void setIndex(String index);
+
+        String getIndex();
+
+        void setType(String type);
+
+        String getType();
+
+        void setId(String id);
+
+        String getId();
+
+        void setVersion(long version);
+
+        long getVersion();
+
+        void setVersionType(VersionType versionType);
+
+        void setParent(String parent);
+
+        String getParent();
+
+        void setRouting(String routing);
+
+        String getRouting();
+
+        void setSource(Map<String, Object> source);
+
+        Map<String, Object> getSource();
+
+        Self self();
+    }
+
+    /**
+     * {@link RequestWrapper} for {@link IndexRequest}
+     */
+    public static class IndexRequestWrapper implements RequestWrapper<IndexRequest> {
+
+        private final IndexRequest request;
+
+        IndexRequestWrapper(IndexRequest request) {
+            this.request = Objects.requireNonNull(request, "Wrapped IndexRequest can not be null");
+        }
+
+        @Override
+        public void setIndex(String index) {
+            request.index(index);
+        }
+
+        @Override
+        public String getIndex() {
+            return request.index();
+        }
+
+        @Override
+        public void setType(String type) {
+            request.type(type);
+        }
+
+        @Override
+        public String getType() {
+            return request.type();
+        }
+
+        @Override
+        public void setId(String id) {
+            request.id(id);
+        }
+
+        @Override
+        public String getId() {
+            return request.id();
+        }
+
+        @Override
+        public void setVersion(long version) {
+            request.version(version);
+        }
+
+        @Override
+        public long getVersion() {
+            return request.version();
+        }
+
+        @Override
+        public void setVersionType(VersionType versionType) {
+            request.versionType(versionType);
+        }
+
+        @Override
+        public void setParent(String parent) {
+            request.parent(parent);
+        }
+
+        @Override
+        public String getParent() {
+            return request.parent();
+        }
+
+        @Override
+        public void setRouting(String routing) {
+            request.routing(routing);
+        }
+
+        @Override
+        public String getRouting() {
+            return request.routing();
+        }
+
+        @Override
+        public Map<String, Object> getSource() {
+            return request.sourceAsMap();
+        }
+
+        @Override
+        public void setSource(Map<String, Object> source) {
+            request.source(source);
+        }
+
+        @Override
+        public IndexRequest self() {
+            return request;
+        }
+    }
+
+    /**
+     * Wraps a {@link IndexRequest} in a {@link RequestWrapper}
+     */
+    static RequestWrapper<IndexRequest> wrap(IndexRequest request) {
+        return new IndexRequestWrapper(request);
+    }
+
+    /**
+     * {@link RequestWrapper} for {@link DeleteRequest}
+     */
+    public static class DeleteRequestWrapper implements RequestWrapper<DeleteRequest> {
+
+        private final DeleteRequest request;
+
+        DeleteRequestWrapper(DeleteRequest request) {
+            this.request = Objects.requireNonNull(request, "Wrapped DeleteRequest can not be null");
+        }
+
+        @Override
+        public void setIndex(String index) {
+            request.index(index);
+        }
+
+        @Override
+        public String getIndex() {
+            return request.index();
+        }
+
+        @Override
+        public void setType(String type) {
+            request.type(type);
+        }
+
+        @Override
+        public String getType() {
+            return request.type();
+        }
+
+        @Override
+        public void setId(String id) {
+            request.id(id);
+        }
+
+        @Override
+        public String getId() {
+            return request.id();
+        }
+
+        @Override
+        public void setVersion(long version) {
+            request.version(version);
+        }
+
+        @Override
+        public long getVersion() {
+            return request.version();
+        }
+
+        @Override
+        public void setVersionType(VersionType versionType) {
+            request.versionType(versionType);
+        }
+
+        @Override
+        public void setParent(String parent) {
+            request.parent(parent);
+        }
+
+        @Override
+        public String getParent() {
+            return request.parent();
+        }
+
+        @Override
+        public void setRouting(String routing) {
+            request.routing(routing);
+        }
+
+        @Override
+        public String getRouting() {
+            return request.routing();
+        }
+
+        @Override
+        public Map<String, Object> getSource() {
+            throw new UnsupportedOperationException("unable to get source from action request [" + request.getClass() + "]");
+        }
+
+        @Override
+        public void setSource(Map<String, Object> source) {
+            throw new UnsupportedOperationException("unable to set [source] on action request [" + request.getClass() + "]");
+        }
+
+        @Override
+        public DeleteRequest self() {
+            return request;
+        }
+    }
+
+    /**
+     * Wraps a {@link DeleteRequest} in a {@link RequestWrapper}
+     */
+    static RequestWrapper<DeleteRequest> wrap(DeleteRequest request) {
+        return new DeleteRequestWrapper(request);
+    }
+
+    /**
+     * Apply a {@link Script} to a {@link RequestWrapper}
+     */
+    public abstract class ScriptApplier implements BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> {
+
+        private final WorkingBulkByScrollTask task;
+        private final ScriptService scriptService;
+        private final Script script;
+        private final Map<String, Object> params;
+
+        private ExecutableScript executable;
+        private Map<String, Object> context;
+
+        public ScriptApplier(WorkingBulkByScrollTask task, ScriptService scriptService, Script script,
+                             Map<String, Object> params) {
+            this.task = task;
+            this.scriptService = scriptService;
+            this.script = script;
+            this.params = params;
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public RequestWrapper<?> apply(RequestWrapper<?> request, ScrollableHitSource.Hit doc) {
+            if (script == null) {
+                return request;
+            }
+            if (executable == null) {
+                CompiledScript compiled = scriptService.compile(script, ScriptContext.Standard.UPDATE, emptyMap());
+                executable = scriptService.executable(compiled, params);
+            }
+            if (context == null) {
+                context = new HashMap<>();
+            } else {
+                context.clear();
+            }
+
+            context.put(IndexFieldMapper.NAME, doc.getIndex());
+            context.put(TypeFieldMapper.NAME, doc.getType());
+            context.put(IdFieldMapper.NAME, doc.getId());
+            Long oldVersion = doc.getVersion();
+            context.put(VersionFieldMapper.NAME, oldVersion);
+            String oldParent = doc.getParent();
+            context.put(ParentFieldMapper.NAME, oldParent);
+            String oldRouting = doc.getRouting();
+            context.put(RoutingFieldMapper.NAME, oldRouting);
+            context.put(SourceFieldMapper.NAME, request.getSource());
+
+            OpType oldOpType = OpType.INDEX;
+            context.put("op", oldOpType.toString());
+
+            executable.setNextVar("ctx", context);
+            executable.run();
+
+            Map<String, Object> resultCtx = (Map<String, Object>) executable.unwrap(context);
+            String newOp = (String) resultCtx.remove("op");
+            if (newOp == null) {
+                throw new IllegalArgumentException("Script cleared operation type");
+            }
+
+            /*
+             * It'd be lovely to only set the source if we know its been modified
+             * but it isn't worth keeping two copies of it around just to check!
+             */
+            request.setSource((Map<String, Object>) resultCtx.remove(SourceFieldMapper.NAME));
+
+            Object newValue = resultCtx.remove(IndexFieldMapper.NAME);
+            if (false == doc.getIndex().equals(newValue)) {
+                scriptChangedIndex(request, newValue);
+            }
+            newValue = resultCtx.remove(TypeFieldMapper.NAME);
+            if (false == doc.getType().equals(newValue)) {
+                scriptChangedType(request, newValue);
+            }
+            newValue = resultCtx.remove(IdFieldMapper.NAME);
+            if (false == doc.getId().equals(newValue)) {
+                scriptChangedId(request, newValue);
+            }
+            newValue = resultCtx.remove(VersionFieldMapper.NAME);
+            if (false == Objects.equals(oldVersion, newValue)) {
+                scriptChangedVersion(request, newValue);
+            }
+            newValue = resultCtx.remove(ParentFieldMapper.NAME);
+            if (false == Objects.equals(oldParent, newValue)) {
+                scriptChangedParent(request, newValue);
+            }
+            /*
+             * Its important that routing comes after parent in case you want to
+             * change them both.
+             */
+            newValue = resultCtx.remove(RoutingFieldMapper.NAME);
+            if (false == Objects.equals(oldRouting, newValue)) {
+                scriptChangedRouting(request, newValue);
+            }
+
+            OpType newOpType = OpType.fromString(newOp);
+            if (newOpType != oldOpType) {
+                return scriptChangedOpType(request, oldOpType, newOpType);
+            }
+
+            if (false == resultCtx.isEmpty()) {
+                throw new IllegalArgumentException("Invalid fields added to context [" + String.join(",", resultCtx.keySet()) + ']');
+            }
+            return request;
+        }
+
+        protected RequestWrapper<?> scriptChangedOpType(RequestWrapper<?> request, OpType oldOpType, OpType newOpType) {
+            switch (newOpType) {
+            case NOOP:
+                task.countNoop();
+                return null;
+            case DELETE:
+                RequestWrapper<DeleteRequest> delete = wrap(new DeleteRequest(request.getIndex(), request.getType(), request.getId()));
+                delete.setVersion(request.getVersion());
+                delete.setVersionType(VersionType.INTERNAL);
+                delete.setParent(request.getParent());
+                delete.setRouting(request.getRouting());
+                return delete;
+            default:
+                throw new IllegalArgumentException("Unsupported operation type change from [" + oldOpType + "] to [" + newOpType + "]");
+            }
+        }
+
+        protected abstract void scriptChangedIndex(RequestWrapper<?> request, Object to);
+
+        protected abstract void scriptChangedType(RequestWrapper<?> request, Object to);
+
+        protected abstract void scriptChangedId(RequestWrapper<?> request, Object to);
+
+        protected abstract void scriptChangedVersion(RequestWrapper<?> request, Object to);
+
+        protected abstract void scriptChangedRouting(RequestWrapper<?> request, Object to);
+
+        protected abstract void scriptChangedParent(RequestWrapper<?> request, Object to);
+
+    }
+
+    public enum OpType {
+
+        NOOP("noop"),
+        INDEX("index"),
+        DELETE("delete");
+
+        private final String id;
+
+        OpType(String id) {
+            this.id = id;
+        }
+
+        public static OpType fromString(String opType) {
+            String lowerOpType = opType.toLowerCase(Locale.ROOT);
+            switch (lowerOpType) {
+                case "noop":
+                    return OpType.NOOP;
+                case "index":
+                    return OpType.INDEX;
+                case "delete":
+                    return OpType.DELETE;
+                default:
+                    throw new IllegalArgumentException("Operation type [" + lowerOpType + "] not allowed, only " +
+                            Arrays.toString(values()) + " are allowed");
+            }
+        }
+
+        @Override
+        public String toString() {
+            return id.toLowerCase(Locale.ROOT);
+        }
+    }
 }

+ 0 - 548
modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollAction.java

@@ -1,548 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.index.reindex;
-
-import org.apache.logging.log4j.Logger;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.DocWriteRequest;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.ParentTaskAssigningClient;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.index.VersionType;
-import org.elasticsearch.index.mapper.IdFieldMapper;
-import org.elasticsearch.index.mapper.IndexFieldMapper;
-import org.elasticsearch.index.mapper.ParentFieldMapper;
-import org.elasticsearch.index.mapper.RoutingFieldMapper;
-import org.elasticsearch.index.mapper.SourceFieldMapper;
-import org.elasticsearch.index.mapper.TypeFieldMapper;
-import org.elasticsearch.index.mapper.VersionFieldMapper;
-import org.elasticsearch.script.CompiledScript;
-import org.elasticsearch.script.ExecutableScript;
-import org.elasticsearch.script.Script;
-import org.elasticsearch.script.ScriptContext;
-import org.elasticsearch.script.ScriptService;
-import org.elasticsearch.threadpool.ThreadPool;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-import java.util.function.BiFunction;
-
-import static java.util.Collections.emptyMap;
-
-/**
- * Abstract base for scrolling across a search and executing bulk indexes on all
- * results.
- */
-public abstract class AbstractAsyncBulkIndexByScrollAction<Request extends AbstractBulkByScrollRequest<Request>>
-        extends AbstractAsyncBulkByScrollAction<Request> {
-
-    protected final ScriptService scriptService;
-    protected final ClusterState clusterState;
-
-    /**
-     * This BiFunction is used to apply various changes depending of the Reindex action and  the search hit,
-     * from copying search hit metadata (parent, routing, etc) to potentially transforming the
-     * {@link RequestWrapper} completely.
-     */
-    private final BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> scriptApplier;
-
-    public AbstractAsyncBulkIndexByScrollAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
-                                                ThreadPool threadPool, Request mainRequest,
-                                                ActionListener<BulkIndexByScrollResponse> listener,
-                                                ScriptService scriptService, ClusterState clusterState) {
-        super(task, logger, client, threadPool, mainRequest, listener);
-        this.scriptService = scriptService;
-        this.clusterState = clusterState;
-        this.scriptApplier = Objects.requireNonNull(buildScriptApplier(), "script applier must not be null");
-    }
-
-    /**
-     * Build the {@link BiFunction} to apply to all {@link RequestWrapper}.
-     */
-    protected BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> buildScriptApplier() {
-        // The default script applier executes a no-op
-        return (request, searchHit) -> request;
-    }
-
-    @Override
-    protected BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs) {
-        BulkRequest bulkRequest = new BulkRequest();
-        for (ScrollableHitSource.Hit doc : docs) {
-            if (accept(doc)) {
-                RequestWrapper<?> request = scriptApplier.apply(copyMetadata(buildRequest(doc), doc), doc);
-                if (request != null) {
-                    bulkRequest.add(request.self());
-                }
-            }
-        }
-        return bulkRequest;
-    }
-
-    /**
-     * Used to accept or ignore a search hit. Ignored search hits will be excluded
-     * from the bulk request. It is also where we fail on invalid search hits, like
-     * when the document has no source but it's required.
-     */
-    protected boolean accept(ScrollableHitSource.Hit doc) {
-        if (doc.getSource() == null) {
-            /*
-             * Either the document didn't store _source or we didn't fetch it for some reason. Since we don't allow the user to
-             * change the "fields" part of the search request it is unlikely that we got here because we didn't fetch _source.
-             * Thus the error message assumes that it wasn't stored.
-             */
-            throw new IllegalArgumentException("[" + doc.getIndex() + "][" + doc.getType() + "][" + doc.getId() + "] didn't store _source");
-        }
-        return true;
-    }
-
-    /**
-     * Build the {@link RequestWrapper} for a single search hit. This shouldn't handle
-     * metadata or scripting. That will be handled by copyMetadata and
-     * apply functions that can be overridden.
-     */
-    protected abstract RequestWrapper<?> buildRequest(ScrollableHitSource.Hit doc);
-
-    /**
-     * Copies the metadata from a hit to the request.
-     */
-    protected RequestWrapper<?> copyMetadata(RequestWrapper<?> request, ScrollableHitSource.Hit doc) {
-        request.setParent(doc.getParent());
-        copyRouting(request, doc.getRouting());
-        return request;
-    }
-
-    /**
-     * Copy the routing from a search hit to the request.
-     */
-    protected void copyRouting(RequestWrapper<?> request, String routing) {
-        request.setRouting(routing);
-    }
-
-    /**
-     * Wrapper for the {@link DocWriteRequest} that are used in this action class.
-     */
-    interface RequestWrapper<Self extends DocWriteRequest<Self>> {
-
-        void setIndex(String index);
-
-        String getIndex();
-
-        void setType(String type);
-
-        String getType();
-
-        void setId(String id);
-
-        String getId();
-
-        void setVersion(long version);
-
-        long getVersion();
-
-        void setVersionType(VersionType versionType);
-
-        void setParent(String parent);
-
-        String getParent();
-
-        void setRouting(String routing);
-
-        String getRouting();
-
-        void setSource(Map<String, Object> source);
-
-        Map<String, Object> getSource();
-
-        Self self();
-    }
-
-    /**
-     * {@link RequestWrapper} for {@link IndexRequest}
-     */
-    public static class IndexRequestWrapper implements RequestWrapper<IndexRequest> {
-
-        private final IndexRequest request;
-
-        IndexRequestWrapper(IndexRequest request) {
-            this.request = Objects.requireNonNull(request, "Wrapped IndexRequest can not be null");
-        }
-
-        @Override
-        public void setIndex(String index) {
-            request.index(index);
-        }
-
-        @Override
-        public String getIndex() {
-            return request.index();
-        }
-
-        @Override
-        public void setType(String type) {
-            request.type(type);
-        }
-
-        @Override
-        public String getType() {
-            return request.type();
-        }
-
-        @Override
-        public void setId(String id) {
-            request.id(id);
-        }
-
-        @Override
-        public String getId() {
-            return request.id();
-        }
-
-        @Override
-        public void setVersion(long version) {
-            request.version(version);
-        }
-
-        @Override
-        public long getVersion() {
-            return request.version();
-        }
-
-        @Override
-        public void setVersionType(VersionType versionType) {
-            request.versionType(versionType);
-        }
-
-        @Override
-        public void setParent(String parent) {
-            request.parent(parent);
-        }
-
-        @Override
-        public String getParent() {
-            return request.parent();
-        }
-
-        @Override
-        public void setRouting(String routing) {
-            request.routing(routing);
-        }
-
-        @Override
-        public String getRouting() {
-            return request.routing();
-        }
-
-        @Override
-        public Map<String, Object> getSource() {
-            return request.sourceAsMap();
-        }
-
-        @Override
-        public void setSource(Map<String, Object> source) {
-            request.source(source);
-        }
-
-        @Override
-        public IndexRequest self() {
-            return request;
-        }
-    }
-
-    /**
-     * Wraps a {@link IndexRequest} in a {@link RequestWrapper}
-     */
-    static RequestWrapper<IndexRequest> wrap(IndexRequest request) {
-        return new IndexRequestWrapper(request);
-    }
-
-    /**
-     * {@link RequestWrapper} for {@link DeleteRequest}
-     */
-    public static class DeleteRequestWrapper implements RequestWrapper<DeleteRequest> {
-
-        private final DeleteRequest request;
-
-        DeleteRequestWrapper(DeleteRequest request) {
-            this.request = Objects.requireNonNull(request, "Wrapped DeleteRequest can not be null");
-        }
-
-        @Override
-        public void setIndex(String index) {
-            request.index(index);
-        }
-
-        @Override
-        public String getIndex() {
-            return request.index();
-        }
-
-        @Override
-        public void setType(String type) {
-            request.type(type);
-        }
-
-        @Override
-        public String getType() {
-            return request.type();
-        }
-
-        @Override
-        public void setId(String id) {
-            request.id(id);
-        }
-
-        @Override
-        public String getId() {
-            return request.id();
-        }
-
-        @Override
-        public void setVersion(long version) {
-            request.version(version);
-        }
-
-        @Override
-        public long getVersion() {
-            return request.version();
-        }
-
-        @Override
-        public void setVersionType(VersionType versionType) {
-            request.versionType(versionType);
-        }
-
-        @Override
-        public void setParent(String parent) {
-            request.parent(parent);
-        }
-
-        @Override
-        public String getParent() {
-            return request.parent();
-        }
-
-        @Override
-        public void setRouting(String routing) {
-            request.routing(routing);
-        }
-
-        @Override
-        public String getRouting() {
-            return request.routing();
-        }
-
-        @Override
-        public Map<String, Object> getSource() {
-            throw new UnsupportedOperationException("unable to get source from action request [" + request.getClass() + "]");
-        }
-
-        @Override
-        public void setSource(Map<String, Object> source) {
-            throw new UnsupportedOperationException("unable to set [source] on action request [" + request.getClass() + "]");
-        }
-
-        @Override
-        public DeleteRequest self() {
-            return request;
-        }
-    }
-
-    /**
-     * Wraps a {@link DeleteRequest} in a {@link RequestWrapper}
-     */
-    static RequestWrapper<DeleteRequest> wrap(DeleteRequest request) {
-        return new DeleteRequestWrapper(request);
-    }
-
-    /**
-     * Apply a {@link Script} to a {@link RequestWrapper}
-     */
-    public abstract class ScriptApplier implements BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> {
-
-        private final WorkingBulkByScrollTask task;
-        private final ScriptService scriptService;
-        private final Script script;
-        private final Map<String, Object> params;
-
-        private ExecutableScript executable;
-        private Map<String, Object> context;
-
-        public ScriptApplier(WorkingBulkByScrollTask task, ScriptService scriptService, Script script,
-                             Map<String, Object> params) {
-            this.task = task;
-            this.scriptService = scriptService;
-            this.script = script;
-            this.params = params;
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        public RequestWrapper<?> apply(RequestWrapper<?> request, ScrollableHitSource.Hit doc) {
-            if (script == null) {
-                return request;
-            }
-            if (executable == null) {
-                CompiledScript compiled = scriptService.compile(script, ScriptContext.Standard.UPDATE, emptyMap());
-                executable = scriptService.executable(compiled, params);
-            }
-            if (context == null) {
-                context = new HashMap<>();
-            } else {
-                context.clear();
-            }
-
-            context.put(IndexFieldMapper.NAME, doc.getIndex());
-            context.put(TypeFieldMapper.NAME, doc.getType());
-            context.put(IdFieldMapper.NAME, doc.getId());
-            Long oldVersion = doc.getVersion();
-            context.put(VersionFieldMapper.NAME, oldVersion);
-            String oldParent = doc.getParent();
-            context.put(ParentFieldMapper.NAME, oldParent);
-            String oldRouting = doc.getRouting();
-            context.put(RoutingFieldMapper.NAME, oldRouting);
-            context.put(SourceFieldMapper.NAME, request.getSource());
-
-            OpType oldOpType = OpType.INDEX;
-            context.put("op", oldOpType.toString());
-
-            executable.setNextVar("ctx", context);
-            executable.run();
-
-            Map<String, Object> resultCtx = (Map<String, Object>) executable.unwrap(context);
-            String newOp = (String) resultCtx.remove("op");
-            if (newOp == null) {
-                throw new IllegalArgumentException("Script cleared operation type");
-            }
-
-            /*
-             * It'd be lovely to only set the source if we know its been modified
-             * but it isn't worth keeping two copies of it around just to check!
-             */
-            request.setSource((Map<String, Object>) resultCtx.remove(SourceFieldMapper.NAME));
-
-            Object newValue = resultCtx.remove(IndexFieldMapper.NAME);
-            if (false == doc.getIndex().equals(newValue)) {
-                scriptChangedIndex(request, newValue);
-            }
-            newValue = resultCtx.remove(TypeFieldMapper.NAME);
-            if (false == doc.getType().equals(newValue)) {
-                scriptChangedType(request, newValue);
-            }
-            newValue = resultCtx.remove(IdFieldMapper.NAME);
-            if (false == doc.getId().equals(newValue)) {
-                scriptChangedId(request, newValue);
-            }
-            newValue = resultCtx.remove(VersionFieldMapper.NAME);
-            if (false == Objects.equals(oldVersion, newValue)) {
-                scriptChangedVersion(request, newValue);
-            }
-            newValue = resultCtx.remove(ParentFieldMapper.NAME);
-            if (false == Objects.equals(oldParent, newValue)) {
-                scriptChangedParent(request, newValue);
-            }
-            /*
-             * Its important that routing comes after parent in case you want to
-             * change them both.
-             */
-            newValue = resultCtx.remove(RoutingFieldMapper.NAME);
-            if (false == Objects.equals(oldRouting, newValue)) {
-                scriptChangedRouting(request, newValue);
-            }
-
-            OpType newOpType = OpType.fromString(newOp);
-            if (newOpType != oldOpType) {
-                return scriptChangedOpType(request, oldOpType, newOpType);
-            }
-
-            if (false == resultCtx.isEmpty()) {
-                throw new IllegalArgumentException("Invalid fields added to context [" + String.join(",", resultCtx.keySet()) + ']');
-            }
-            return request;
-        }
-
-        protected RequestWrapper<?> scriptChangedOpType(RequestWrapper<?> request, OpType oldOpType, OpType newOpType) {
-            switch (newOpType) {
-            case NOOP:
-                task.countNoop();
-                return null;
-            case DELETE:
-                RequestWrapper<DeleteRequest> delete = wrap(new DeleteRequest(request.getIndex(), request.getType(), request.getId()));
-                delete.setVersion(request.getVersion());
-                delete.setVersionType(VersionType.INTERNAL);
-                delete.setParent(request.getParent());
-                delete.setRouting(request.getRouting());
-                return delete;
-            default:
-                throw new IllegalArgumentException("Unsupported operation type change from [" + oldOpType + "] to [" + newOpType + "]");
-            }
-        }
-
-        protected abstract void scriptChangedIndex(RequestWrapper<?> request, Object to);
-
-        protected abstract void scriptChangedType(RequestWrapper<?> request, Object to);
-
-        protected abstract void scriptChangedId(RequestWrapper<?> request, Object to);
-
-        protected abstract void scriptChangedVersion(RequestWrapper<?> request, Object to);
-
-        protected abstract void scriptChangedRouting(RequestWrapper<?> request, Object to);
-
-        protected abstract void scriptChangedParent(RequestWrapper<?> request, Object to);
-
-    }
-
-    public enum OpType {
-
-        NOOP("noop"),
-        INDEX("index"),
-        DELETE("delete");
-
-        private final String id;
-
-        OpType(String id) {
-            this.id = id;
-        }
-
-        public static OpType fromString(String opType) {
-            String lowerOpType = opType.toLowerCase(Locale.ROOT);
-            switch (lowerOpType) {
-                case "noop":
-                    return OpType.NOOP;
-                case "index":
-                    return OpType.INDEX;
-                case "delete":
-                    return OpType.DELETE;
-                default:
-                    throw new IllegalArgumentException("Operation type [" + lowerOpType + "] not allowed, only " +
-                            Arrays.toString(values()) + " are allowed");
-            }
-        }
-
-        @Override
-        public String toString() {
-            return id.toLowerCase(Locale.ROOT);
-        }
-    }
-}

+ 7 - 9
modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java

@@ -59,8 +59,8 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
         } else {
             ClusterState state = clusterService.state();
             ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task);
-            new AsyncDeleteBySearchAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, listener, scriptService,
-                    state).start();
+            new AsyncDeleteBySearchAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, scriptService, state,
+                    listener).start();
         }
     }
 
@@ -72,12 +72,11 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
     /**
      * Implementation of delete-by-query using scrolling and bulk.
      */
-    static class AsyncDeleteBySearchAction extends AbstractAsyncBulkIndexByScrollAction<DeleteByQueryRequest> {
-
+    static class AsyncDeleteBySearchAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest> {
         public AsyncDeleteBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
-                ThreadPool threadPool, DeleteByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener,
-                ScriptService scriptService, ClusterState clusterState) {
-            super(task, logger, client, threadPool, request, listener, scriptService, clusterState);
+                ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService, ClusterState clusterState,
+                ActionListener<BulkIndexByScrollResponse> listener) {
+            super(task, logger, client, threadPool, request, scriptService, clusterState, listener);
         }
 
         @Override
@@ -107,8 +106,7 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
         }
 
         /**
-         * Overrides the parent {@link AbstractAsyncBulkIndexByScrollAction#copyMetadata(RequestWrapper, ScrollableHitSource.Hit)}
-         * method that is much more Update/Reindex oriented and so also copies things like timestamp/ttl which we
+         * Overrides the parent's implementation is much more Update/Reindex oriented and so also copies things like timestamp/ttl which we
          * don't care for a deletion.
          */
         @Override

+ 6 - 6
modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java

@@ -113,8 +113,8 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
             validateAgainstAliases(request.getSearchRequest(), request.getDestination(), request.getRemoteInfo(),
                     indexNameExpressionResolver, autoCreateIndex, state);
             ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task);
-            new AsyncIndexBySearchAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, listener, scriptService,
-                    state).start();
+            new AsyncIndexBySearchAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, scriptService, state,
+                    listener).start();
         }
     }
 
@@ -230,7 +230,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
      * but this makes no attempt to do any of them so it can be as simple
      * possible.
      */
-    static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<ReindexRequest> {
+    static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<ReindexRequest> {
         /**
          * List of threads created by this process. Usually actions don't create threads in Elasticsearch. Instead they use the builtin
          * {@link ThreadPool}s. But reindex-from-remote uses Elasticsearch's {@link RestClient} which doesn't use the
@@ -240,9 +240,9 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
         private List<Thread> createdThreads = emptyList();
 
         public AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
-                ThreadPool threadPool, ReindexRequest request, ActionListener<BulkIndexByScrollResponse> listener,
-                ScriptService scriptService, ClusterState clusterState) {
-            super(task, logger, client, threadPool, request, listener, scriptService, clusterState);
+                ThreadPool threadPool, ReindexRequest request, ScriptService scriptService, ClusterState clusterState,
+                ActionListener<BulkIndexByScrollResponse> listener) {
+            super(task, logger, client, threadPool, request, scriptService, clusterState, listener);
         }
 
         @Override

+ 6 - 7
modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java

@@ -70,8 +70,8 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
         } else {
             ClusterState state = clusterService.state();
             ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task);
-            new AsyncIndexBySearchAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, listener, scriptService,
-                    state).start();
+            new AsyncIndexBySearchAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, scriptService, state,
+                    listener).start();
         }
     }
 
@@ -83,12 +83,11 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
     /**
      * Simple implementation of update-by-query using scrolling and bulk.
      */
-    static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest> {
-
+    static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<UpdateByQueryRequest> {
         public AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
-                ThreadPool threadPool, UpdateByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener,
-                ScriptService scriptService, ClusterState clusterState) {
-            super(task, logger, client, threadPool, request, listener, scriptService, clusterState);
+                ThreadPool threadPool, UpdateByQueryRequest request, ScriptService scriptService, ClusterState clusterState,
+                ActionListener<BulkIndexByScrollResponse> listener) {
+            super(task, logger, client, threadPool, request, scriptService, clusterState, listener);
         }
 
         @Override

+ 3 - 3
modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexbyScrollActionMetadataTestCase.java → modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionMetadataTestCase.java

@@ -19,14 +19,14 @@
 
 package org.elasticsearch.index.reindex;
 
-public abstract class AbstractAsyncBulkIndexbyScrollActionMetadataTestCase<
+public abstract class AbstractAsyncBulkByScrollActionMetadataTestCase<
                 Request extends AbstractBulkIndexByScrollRequest<Request>,
                 Response extends BulkIndexByScrollResponse>
-        extends AbstractAsyncBulkIndexByScrollActionTestCase<Request, Response> {
+        extends AbstractAsyncBulkByScrollActionTestCase<Request, Response> {
 
     protected ScrollableHitSource.BasicHit doc() {
         return new ScrollableHitSource.BasicHit("index", "type", "id", 0);
     }
 
-    protected abstract AbstractAsyncBulkIndexByScrollAction<Request> action();
+    protected abstract AbstractAsyncBulkByScrollAction<Request> action();
 }

+ 7 - 7
modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollActionScriptTestCase.java → modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionScriptTestCase.java

@@ -22,8 +22,8 @@ package org.elasticsearch.index.reindex;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.index.reindex.AbstractAsyncBulkIndexByScrollAction.OpType;
-import org.elasticsearch.index.reindex.AbstractAsyncBulkIndexByScrollAction.RequestWrapper;
+import org.elasticsearch.index.reindex.AbstractAsyncBulkByScrollAction.OpType;
+import org.elasticsearch.index.reindex.AbstractAsyncBulkByScrollAction.RequestWrapper;
 import org.elasticsearch.script.CompiledScript;
 import org.elasticsearch.script.ExecutableScript;
 import org.elasticsearch.script.Script;
@@ -40,10 +40,10 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public abstract class AbstractAsyncBulkIndexByScrollActionScriptTestCase<
+public abstract class AbstractAsyncBulkByScrollActionScriptTestCase<
                 Request extends AbstractBulkIndexByScrollRequest<Request>,
                 Response extends BulkIndexByScrollResponse>
-        extends AbstractAsyncBulkIndexByScrollActionTestCase<Request, Response> {
+        extends AbstractAsyncBulkByScrollActionTestCase<Request, Response> {
 
     private static final Script EMPTY_SCRIPT = new Script("");
 
@@ -62,8 +62,8 @@ public abstract class AbstractAsyncBulkIndexByScrollActionScriptTestCase<
 
         when(scriptService.executable(any(CompiledScript.class), Matchers.<Map<String, Object>>any()))
                 .thenReturn(executableScript);
-        AbstractAsyncBulkIndexByScrollAction<Request> action = action(scriptService, request().setScript(EMPTY_SCRIPT));
-        RequestWrapper<?> result = action.buildScriptApplier().apply(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc);
+        AbstractAsyncBulkByScrollAction<Request> action = action(scriptService, request().setScript(EMPTY_SCRIPT));
+        RequestWrapper<?> result = action.buildScriptApplier().apply(AbstractAsyncBulkByScrollAction.wrap(index), doc);
         return (result != null) ? (T) result.self() : null;
     }
 
@@ -104,5 +104,5 @@ public abstract class AbstractAsyncBulkIndexByScrollActionScriptTestCase<
         assertThat(e.getMessage(), equalTo("Operation type [unknown] not allowed, only [noop, index, delete] are allowed"));
     }
 
-    protected abstract AbstractAsyncBulkIndexByScrollAction<Request> action(ScriptService scriptService, Request request);
+    protected abstract AbstractAsyncBulkByScrollAction<Request> action(ScriptService scriptService, Request request);
 }

+ 1 - 1
modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollActionTestCase.java → modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollActionTestCase.java

@@ -27,7 +27,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.junit.After;
 import org.junit.Before;
 
-public abstract class AbstractAsyncBulkIndexByScrollActionTestCase<
+public abstract class AbstractAsyncBulkByScrollActionTestCase<
                 Request extends AbstractBulkIndexByScrollRequest<Request>,
                 Response extends BulkIndexByScrollResponse>
         extends ESTestCase {

+ 38 - 37
modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java

@@ -55,6 +55,7 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.client.FilterClient;
 import org.elasticsearch.client.ParentTaskAssigningClient;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.text.Text;
 import org.elasticsearch.common.unit.TimeValue;
@@ -170,7 +171,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
 
     public void testStartRetriesOnRejectionAndSucceeds() throws Exception {
         client.searchesToReject = randomIntBetween(0, testRequest.getMaxRetries() - 1);
-        DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
+        DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
         action.start();
         assertBusy(() -> assertEquals(client.searchesToReject + 1, client.searchAttempts.get()));
         if (listener.isDone()) {
@@ -183,7 +184,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
 
     public void testStartRetriesOnRejectionButFailsOnTooManyRejections() throws Exception {
         client.searchesToReject = testRequest.getMaxRetries() + randomIntBetween(1, 100);
-        DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
+        DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
         action.start();
         assertBusy(() -> assertEquals(testRequest.getMaxRetries() + 1, client.searchAttempts.get()));
         assertBusy(() -> assertTrue(listener.isDone()));
@@ -195,7 +196,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
 
     public void testStartNextScrollRetriesOnRejectionAndSucceeds() throws Exception {
         client.scrollsToReject = randomIntBetween(0, testRequest.getMaxRetries() - 1);
-        DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
+        DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
         action.setScroll(scrollId());
         action.startNextScroll(timeValueNanos(System.nanoTime()), 0);
         assertBusy(() -> assertEquals(client.scrollsToReject + 1, client.scrollAttempts.get()));
@@ -209,7 +210,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
 
     public void testStartNextScrollRetriesOnRejectionButFailsOnTooManyRejections() throws Exception {
         client.scrollsToReject = testRequest.getMaxRetries() + randomIntBetween(1, 100);
-        DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
+        DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
         action.setScroll(scrollId());
         action.startNextScroll(timeValueNanos(System.nanoTime()), 0);
         assertBusy(() -> assertEquals(testRequest.getMaxRetries() + 1, client.scrollAttempts.get()));
@@ -226,7 +227,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
 
         long total = randomIntBetween(0, Integer.MAX_VALUE);
         ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null);
-        simulateScrollResponse(new DummyAbstractAsyncBulkByScrollAction(), timeValueSeconds(0), 0, response);
+        simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueSeconds(0), 0, response);
         assertEquals(total, testTask.getStatus().getTotal());
     }
 
@@ -238,7 +239,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
         for (int batches = 1; batches < maxBatches; batches++) {
             Hit hit = new ScrollableHitSource.BasicHit("index", "type", "id", 0);
             ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null);
-            DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
+            DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction();
             simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response);
 
             // Use assert busy because the update happens on another thread
@@ -291,7 +292,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
                     opType,
                     new IndexResponse(shardId, "type", "id" + i, randomInt(20), randomInt(), createdResponse));
             }
-            new DummyAbstractAsyncBulkByScrollAction().onBulkResponse(timeValueNanos(System.nanoTime()), new BulkResponse(responses, 0));
+            new DummyAsyncBulkByScrollAction().onBulkResponse(timeValueNanos(System.nanoTime()), new BulkResponse(responses, 0));
             assertEquals(versionConflicts, testTask.getStatus().getVersionConflicts());
             assertEquals(updated, testTask.getStatus().getUpdated());
             assertEquals(created, testTask.getStatus().getCreated());
@@ -316,7 +317,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
             }
         });
         ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null);
-        simulateScrollResponse(new DummyAbstractAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 10, response);
+        simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 10, response);
         ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
         assertThat(e.getMessage(), equalTo("EsRejectedExecutionException[test]"));
         assertThat(client.scrollsCleared, contains(scrollId));
@@ -333,7 +334,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
         SearchFailure shardFailure = new SearchFailure(new RuntimeException("test"));
         ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(false, singletonList(shardFailure), 0,
                 emptyList(), null);
-        simulateScrollResponse(new DummyAbstractAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
+        simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
         BulkIndexByScrollResponse response = listener.get();
         assertThat(response.getBulkFailures(), empty());
         assertThat(response.getSearchFailures(), contains(shardFailure));
@@ -347,7 +348,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
      */
     public void testSearchTimeoutsAbortRequest() throws Exception {
         ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(true, emptyList(), 0, emptyList(), null);
-        simulateScrollResponse(new DummyAbstractAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
+        simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
         BulkIndexByScrollResponse response = listener.get();
         assertThat(response.getBulkFailures(), empty());
         assertThat(response.getSearchFailures(), empty());
@@ -361,7 +362,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
      */
     public void testBulkFailuresAbortRequest() throws Exception {
         Failure failure = new Failure("index", "type", "id", new RuntimeException("test"));
-        DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
+        DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction();
         BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]
             {new BulkItemResponse(0, DocWriteRequest.OpType.CREATE, failure)}, randomLong());
         action.onBulkResponse(timeValueNanos(System.nanoTime()), bulkResponse);
@@ -374,14 +375,15 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
     /**
      * Mimicks script failures or general wrongness by implementers.
      */
-    public void testListenerReceiveBuildBulkExceptions() throws Exception {
-        DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction() {
+    public void testBuildRequestThrowsException() throws Exception {
+        DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction() {
             @Override
-            protected BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs) {
+            protected AbstractAsyncBulkByScrollAction.RequestWrapper<?> buildRequest(Hit doc) {
                 throw new RuntimeException("surprise");
             }
         };
-        Hit hit = new ScrollableHitSource.BasicHit("index", "type", "id", 0);
+        ScrollableHitSource.BasicHit hit = new ScrollableHitSource.BasicHit("index", "type", "id", 0);
+        hit.setSource(new BytesArray("{}"));
         ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null);
         simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response);
         ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
@@ -426,7 +428,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
             }
         });
 
-        DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
+        DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction();
         action.setScroll(scrollId());
 
         // Set the base for the scroll to wait - this is added to the figure we calculate below
@@ -478,7 +480,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
          * deal with it. We just wait for it to happen.
          */
         CountDownLatch successLatch = new CountDownLatch(1);
-        DummyAbstractAsyncBulkByScrollAction action = new DummyActionWithoutBackoff() {
+        DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff() {
             @Override
             void startNextScroll(TimeValue lastBatchStartTime, int lastBatchSize) {
                 successLatch.countDown();
@@ -504,7 +506,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
      * The default retry time matches what we say it is in the javadoc for the request.
      */
     public void testDefaultRetryTimes() {
-        Iterator<TimeValue> policy = new DummyAbstractAsyncBulkByScrollAction().buildBackoffPolicy().iterator();
+        Iterator<TimeValue> policy = new DummyAsyncBulkByScrollAction().buildBackoffPolicy().iterator();
         long millis = 0;
         while (policy.hasNext()) {
             millis += policy.next().millis();
@@ -537,7 +539,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
         if (refresh != null) {
             testRequest.setRefresh(refresh);
         }
-        DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
+        DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction();
         if (addDestinationIndexes) {
             action.addDestinationIndices(singleton("foo"));
         }
@@ -550,34 +552,34 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
     }
 
     public void testCancelBeforeInitialSearch() throws Exception {
-        cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.start());
+        cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.start());
     }
 
     public void testCancelBeforeScrollResponse() throws Exception {
         // We bail so early we don't need to pass in a half way valid response.
-        cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1,
+        cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1,
                 null));
     }
 
     public void testCancelBeforeSendBulkRequest() throws Exception {
         // We bail so early we don't need to pass in a half way valid request.
-        cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.sendBulkRequest(timeValueNanos(System.nanoTime()), null));
+        cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.sendBulkRequest(timeValueNanos(System.nanoTime()), null));
     }
 
     public void testCancelBeforeOnBulkResponse() throws Exception {
         // We bail so early we don't need to pass in a half way valid response.
-        cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) ->
+        cancelTaskCase((DummyAsyncBulkByScrollAction action) ->
                 action.onBulkResponse(timeValueNanos(System.nanoTime()), new BulkResponse(new BulkItemResponse[0], 0)));
     }
 
     public void testCancelBeforeStartNextScroll() throws Exception {
-        cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.startNextScroll(timeValueNanos(System.nanoTime()), 0));
+        cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.startNextScroll(timeValueNanos(System.nanoTime()), 0));
     }
 
     public void testCancelBeforeRefreshAndFinish() throws Exception {
         // Refresh or not doesn't matter - we don't try to refresh.
         testRequest.setRefresh(usually());
-        cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.refreshAndFinish(emptyList(), emptyList(), false));
+        cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.refreshAndFinish(emptyList(), emptyList(), false));
         assertNull("No refresh was attempted", client.lastRefreshRequest.get());
     }
 
@@ -610,7 +612,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
         });
 
         // Send the scroll response which will trigger the custom thread pool above, canceling the request before running the response
-        DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
+        DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction();
         boolean previousScrollSet = usually();
         if (previousScrollSet) {
             action.setScroll(scrollId());
@@ -629,8 +631,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
         }
     }
 
-    private void cancelTaskCase(Consumer<DummyAbstractAsyncBulkByScrollAction> testMe) throws Exception {
-        DummyAbstractAsyncBulkByScrollAction action = new DummyAbstractAsyncBulkByScrollAction();
+    private void cancelTaskCase(Consumer<DummyAsyncBulkByScrollAction> testMe) throws Exception {
+        DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction();
         boolean previousScrollSet = usually();
         if (previousScrollSet) {
             action.setScroll(scrollId());
@@ -648,17 +650,16 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
     /**
      * Simulate a scroll response by setting the scroll id and firing the onScrollResponse method.
      */
-    private void simulateScrollResponse(DummyAbstractAsyncBulkByScrollAction action, TimeValue lastBatchTime, int lastBatchSize,
+    private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, TimeValue lastBatchTime, int lastBatchSize,
             ScrollableHitSource.Response response) {
         action.setScroll(scrollId());
         action.onScrollResponse(lastBatchTime, lastBatchSize, response);
     }
 
-    private class DummyAbstractAsyncBulkByScrollAction
-            extends AbstractAsyncBulkByScrollAction<DummyAbstractBulkByScrollRequest> {
-        public DummyAbstractAsyncBulkByScrollAction() {
+    private class DummyAsyncBulkByScrollAction extends AbstractAsyncBulkByScrollAction<DummyAbstractBulkByScrollRequest> {
+        public DummyAsyncBulkByScrollAction() {
             super(testTask, AsyncBulkByScrollActionTests.this.logger, new ParentTaskAssigningClient(client, localNode, testTask),
-                    client.threadPool(), testRequest, listener);
+                    client.threadPool(), testRequest, null, null, listener);
         }
 
         @Override
@@ -667,15 +668,15 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
         }
 
         @Override
-        protected BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs) {
-            return new BulkRequest();
+        protected AbstractAsyncBulkByScrollAction.RequestWrapper<?> buildRequest(Hit doc) {
+            throw new UnsupportedOperationException("Use another override to test this.");
         }
     }
 
     /**
-     * An extension to {@linkplain DummyAbstractAsyncBulkByScrollAction} that uses a 0 delaying backoff policy.
+     * An extension to {@linkplain DummyAsyncBulkByScrollAction} that uses a 0 delaying backoff policy.
      */
-    private class DummyActionWithoutBackoff extends DummyAbstractAsyncBulkByScrollAction {
+    private class DummyActionWithoutBackoff extends DummyAsyncBulkByScrollAction {
         @Override
         BackoffPolicy buildBackoffPolicy() {
             // Force a backoff time of 0 to prevent sleeping

+ 7 - 7
modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java

@@ -25,10 +25,10 @@ import org.elasticsearch.action.search.SearchRequest;
 /**
  * Index-by-search test for ttl, timestamp, and routing.
  */
-public class ReindexMetadataTests extends AbstractAsyncBulkIndexbyScrollActionMetadataTestCase<ReindexRequest, BulkIndexByScrollResponse> {
+public class ReindexMetadataTests extends AbstractAsyncBulkByScrollActionMetadataTestCase<ReindexRequest, BulkIndexByScrollResponse> {
     public void testRoutingCopiedByDefault() throws Exception {
         IndexRequest index = new IndexRequest();
-        action().copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc().setRouting("foo"));
+        action().copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo"));
         assertEquals("foo", index.routing());
     }
 
@@ -36,7 +36,7 @@ public class ReindexMetadataTests extends AbstractAsyncBulkIndexbyScrollActionMe
         TransportReindexAction.AsyncIndexBySearchAction action = action();
         action.mainRequest.getDestination().routing("keep");
         IndexRequest index = new IndexRequest();
-        action.copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc().setRouting("foo"));
+        action.copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo"));
         assertEquals("foo", index.routing());
     }
 
@@ -44,7 +44,7 @@ public class ReindexMetadataTests extends AbstractAsyncBulkIndexbyScrollActionMe
         TransportReindexAction.AsyncIndexBySearchAction action = action();
         action.mainRequest.getDestination().routing("discard");
         IndexRequest index = new IndexRequest();
-        action.copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc().setRouting("foo"));
+        action.copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo"));
         assertEquals(null, index.routing());
     }
 
@@ -52,7 +52,7 @@ public class ReindexMetadataTests extends AbstractAsyncBulkIndexbyScrollActionMe
         TransportReindexAction.AsyncIndexBySearchAction action = action();
         action.mainRequest.getDestination().routing("=cat");
         IndexRequest index = new IndexRequest();
-        action.copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc().setRouting("foo"));
+        action.copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo"));
         assertEquals("cat", index.routing());
     }
 
@@ -60,13 +60,13 @@ public class ReindexMetadataTests extends AbstractAsyncBulkIndexbyScrollActionMe
         TransportReindexAction.AsyncIndexBySearchAction action = action();
         action.mainRequest.getDestination().routing("==]");
         IndexRequest index = new IndexRequest();
-        action.copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc().setRouting("foo"));
+        action.copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo"));
         assertEquals("=]", index.routing());
     }
 
     @Override
     protected TransportReindexAction.AsyncIndexBySearchAction action() {
-        return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request(), listener(), null, null);
+        return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request(), null, null, listener());
     }
 
     @Override

+ 4 - 4
modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java

@@ -31,7 +31,7 @@ import static org.hamcrest.Matchers.containsString;
 /**
  * Tests index-by-search with a script modifying the documents.
  */
-public class ReindexScriptTests extends AbstractAsyncBulkIndexByScrollActionScriptTestCase<ReindexRequest, BulkIndexByScrollResponse> {
+public class ReindexScriptTests extends AbstractAsyncBulkByScrollActionScriptTestCase<ReindexRequest, BulkIndexByScrollResponse> {
 
     public void testSetIndex() throws Exception {
         Object dest = randomFrom(new Object[] {234, 234L, "pancake"});
@@ -109,8 +109,8 @@ public class ReindexScriptTests extends AbstractAsyncBulkIndexByScrollActionScri
     }
 
     @Override
-    protected AbstractAsyncBulkIndexByScrollAction<ReindexRequest> action(ScriptService scriptService, ReindexRequest request) {
-        return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request, listener(), scriptService,
-                null);
+    protected TransportReindexAction.AsyncIndexBySearchAction action(ScriptService scriptService, ReindexRequest request) {
+        return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request, scriptService, null,
+                listener());
     }
 }

+ 3 - 3
modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryMetadataTests.java

@@ -23,16 +23,16 @@ import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequest;
 
 public class UpdateByQueryMetadataTests
-        extends AbstractAsyncBulkIndexbyScrollActionMetadataTestCase<UpdateByQueryRequest, BulkIndexByScrollResponse> {
+        extends AbstractAsyncBulkByScrollActionMetadataTestCase<UpdateByQueryRequest, BulkIndexByScrollResponse> {
     public void testRoutingIsCopied() throws Exception {
         IndexRequest index = new IndexRequest();
-        action().copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc().setRouting("foo"));
+        action().copyMetadata(AbstractAsyncBulkByScrollAction.wrap(index), doc().setRouting("foo"));
         assertEquals("foo", index.routing());
     }
 
     @Override
     protected TransportUpdateByQueryAction.AsyncIndexBySearchAction action() {
-        return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request(), listener(), null, null);
+        return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request(), null, null, listener());
     }
 
     @Override

+ 4 - 4
modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryWithScriptTests.java

@@ -28,7 +28,7 @@ import java.util.Map;
 import static org.hamcrest.Matchers.containsString;
 
 public class UpdateByQueryWithScriptTests
-        extends AbstractAsyncBulkIndexByScrollActionScriptTestCase<UpdateByQueryRequest, BulkIndexByScrollResponse> {
+        extends AbstractAsyncBulkByScrollActionScriptTestCase<UpdateByQueryRequest, BulkIndexByScrollResponse> {
 
     public void testModifyingCtxNotAllowed() {
         /*
@@ -53,8 +53,8 @@ public class UpdateByQueryWithScriptTests
     }
 
     @Override
-    protected AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest> action(ScriptService scriptService, UpdateByQueryRequest request) {
-        return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request, listener(),
-                scriptService, null);
+    protected TransportUpdateByQueryAction.AsyncIndexBySearchAction action(ScriptService scriptService, UpdateByQueryRequest request) {
+        return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request, scriptService, null,
+                listener());
     }
 }