|
@@ -30,11 +30,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
|
|
import org.elasticsearch.core.Nullable;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
import org.elasticsearch.index.VersionType;
|
|
|
-import org.elasticsearch.index.mapper.IdFieldMapper;
|
|
|
-import org.elasticsearch.index.mapper.IndexFieldMapper;
|
|
|
-import org.elasticsearch.index.mapper.RoutingFieldMapper;
|
|
|
-import org.elasticsearch.index.mapper.SourceFieldMapper;
|
|
|
-import org.elasticsearch.index.mapper.VersionFieldMapper;
|
|
|
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
|
|
|
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
|
|
import org.elasticsearch.index.reindex.BulkByScrollTask;
|
|
@@ -42,6 +37,8 @@ import org.elasticsearch.index.reindex.ClientScrollableHitSource;
|
|
|
import org.elasticsearch.index.reindex.ScrollableHitSource;
|
|
|
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
|
|
|
import org.elasticsearch.index.reindex.WorkerBulkByScrollTaskState;
|
|
|
+import org.elasticsearch.script.CtxMap;
|
|
|
+import org.elasticsearch.script.Metadata;
|
|
|
import org.elasticsearch.script.Script;
|
|
|
import org.elasticsearch.script.ScriptService;
|
|
|
import org.elasticsearch.search.Scroll;
|
|
@@ -50,13 +47,10 @@ import org.elasticsearch.search.sort.SortBuilder;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
-import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
-import java.util.Locale;
|
|
|
import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
import java.util.Set;
|
|
@@ -64,6 +58,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.function.BiFunction;
|
|
|
+import java.util.function.LongSupplier;
|
|
|
|
|
|
import static java.lang.Math.max;
|
|
|
import static java.lang.Math.min;
|
|
@@ -819,147 +814,73 @@ public abstract class AbstractAsyncBulkByScrollAction<
|
|
|
/**
|
|
|
* Apply a {@link Script} to a {@link RequestWrapper}
|
|
|
*/
|
|
|
- public abstract static class ScriptApplier implements BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> {
|
|
|
+ public abstract static class ScriptApplier<T extends Metadata>
|
|
|
+ implements
|
|
|
+ BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> {
|
|
|
+
|
|
|
+ // "index" is the default operation
|
|
|
+ protected static final String INDEX = "index";
|
|
|
|
|
|
private final WorkerBulkByScrollTaskState taskWorker;
|
|
|
protected final ScriptService scriptService;
|
|
|
protected final Script script;
|
|
|
protected final Map<String, Object> params;
|
|
|
+ protected final LongSupplier nowInMillisSupplier;
|
|
|
|
|
|
public ScriptApplier(
|
|
|
WorkerBulkByScrollTaskState taskWorker,
|
|
|
ScriptService scriptService,
|
|
|
Script script,
|
|
|
- Map<String, Object> params
|
|
|
+ Map<String, Object> params,
|
|
|
+ LongSupplier nowInMillisSupplier
|
|
|
) {
|
|
|
this.taskWorker = taskWorker;
|
|
|
this.scriptService = scriptService;
|
|
|
this.script = script;
|
|
|
this.params = params;
|
|
|
+ this.nowInMillisSupplier = nowInMillisSupplier;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
public RequestWrapper<?> apply(RequestWrapper<?> request, ScrollableHitSource.Hit doc) {
|
|
|
if (script == null) {
|
|
|
return request;
|
|
|
}
|
|
|
|
|
|
- Map<String, Object> context = new HashMap<>();
|
|
|
- context.put(IndexFieldMapper.NAME, doc.getIndex());
|
|
|
- context.put(IdFieldMapper.NAME, doc.getId());
|
|
|
- Long oldVersion = doc.getVersion();
|
|
|
- context.put(VersionFieldMapper.NAME, oldVersion);
|
|
|
- String oldRouting = doc.getRouting();
|
|
|
- context.put(RoutingFieldMapper.NAME, oldRouting);
|
|
|
- context.put(SourceFieldMapper.NAME, request.getSource());
|
|
|
-
|
|
|
- OpType oldOpType = OpType.INDEX;
|
|
|
- context.put("op", oldOpType.toString());
|
|
|
+ CtxMap<T> ctxMap = execute(doc, request.getSource());
|
|
|
|
|
|
- execute(context);
|
|
|
+ T metadata = ctxMap.getMetadata();
|
|
|
|
|
|
- String newOp = (String) context.remove("op");
|
|
|
- if (newOp == null) {
|
|
|
- throw new IllegalArgumentException("Script cleared operation type");
|
|
|
- }
|
|
|
+ request.setSource(ctxMap.getSource());
|
|
|
|
|
|
- /*
|
|
|
- * It'd be lovely to only set the source if we know its been modified
|
|
|
- * but it isn't worth keeping two copies of it around just to check!
|
|
|
- */
|
|
|
- request.setSource((Map<String, Object>) context.remove(SourceFieldMapper.NAME));
|
|
|
+ updateRequest(request, metadata);
|
|
|
|
|
|
- Object newValue = context.remove(IndexFieldMapper.NAME);
|
|
|
- if (false == doc.getIndex().equals(newValue)) {
|
|
|
- scriptChangedIndex(request, newValue);
|
|
|
- }
|
|
|
- newValue = context.remove(IdFieldMapper.NAME);
|
|
|
- if (false == doc.getId().equals(newValue)) {
|
|
|
- scriptChangedId(request, newValue);
|
|
|
- }
|
|
|
- newValue = context.remove(VersionFieldMapper.NAME);
|
|
|
- if (false == Objects.equals(oldVersion, newValue)) {
|
|
|
- scriptChangedVersion(request, newValue);
|
|
|
- }
|
|
|
- /*
|
|
|
- * Its important that routing comes after parent in case you want to
|
|
|
- * change them both.
|
|
|
- */
|
|
|
- newValue = context.remove(RoutingFieldMapper.NAME);
|
|
|
- if (false == Objects.equals(oldRouting, newValue)) {
|
|
|
- scriptChangedRouting(request, newValue);
|
|
|
- }
|
|
|
+ return requestFromOp(request, metadata.getOp());
|
|
|
+ }
|
|
|
|
|
|
- OpType newOpType = OpType.fromString(newOp);
|
|
|
- if (newOpType != oldOpType) {
|
|
|
- return scriptChangedOpType(request, oldOpType, newOpType);
|
|
|
- }
|
|
|
+ protected abstract CtxMap<T> execute(ScrollableHitSource.Hit doc, Map<String, Object> source);
|
|
|
|
|
|
- if (false == context.isEmpty()) {
|
|
|
- throw new IllegalArgumentException("Invalid fields added to context [" + String.join(",", context.keySet()) + ']');
|
|
|
- }
|
|
|
- return request;
|
|
|
- }
|
|
|
+ protected abstract void updateRequest(RequestWrapper<?> request, T metadata);
|
|
|
|
|
|
- protected RequestWrapper<?> scriptChangedOpType(RequestWrapper<?> request, OpType oldOpType, OpType newOpType) {
|
|
|
- switch (newOpType) {
|
|
|
- case NOOP -> {
|
|
|
+ protected RequestWrapper<?> requestFromOp(RequestWrapper<?> request, String op) {
|
|
|
+ switch (op) {
|
|
|
+ case "noop" -> {
|
|
|
taskWorker.countNoop();
|
|
|
return null;
|
|
|
}
|
|
|
- case DELETE -> {
|
|
|
+ case "delete" -> {
|
|
|
RequestWrapper<DeleteRequest> delete = wrap(new DeleteRequest(request.getIndex(), request.getId()));
|
|
|
delete.setVersion(request.getVersion());
|
|
|
delete.setVersionType(VersionType.INTERNAL);
|
|
|
delete.setRouting(request.getRouting());
|
|
|
return delete;
|
|
|
}
|
|
|
- default -> throw new IllegalArgumentException(
|
|
|
- "Unsupported operation type change from [" + oldOpType + "] to [" + newOpType + "]"
|
|
|
- );
|
|
|
+ case INDEX -> {
|
|
|
+ return request;
|
|
|
+ }
|
|
|
+ default -> throw new IllegalArgumentException("Unsupported operation type change from [" + INDEX + "] to [" + op + "]");
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- protected abstract void scriptChangedIndex(RequestWrapper<?> request, Object to);
|
|
|
-
|
|
|
- protected abstract void scriptChangedId(RequestWrapper<?> request, Object to);
|
|
|
-
|
|
|
- protected abstract void scriptChangedVersion(RequestWrapper<?> request, Object to);
|
|
|
-
|
|
|
- protected abstract void scriptChangedRouting(RequestWrapper<?> request, Object to);
|
|
|
-
|
|
|
- protected abstract void execute(Map<String, Object> ctx);
|
|
|
- }
|
|
|
-
|
|
|
- public enum OpType {
|
|
|
-
|
|
|
- NOOP("noop"),
|
|
|
- INDEX("index"),
|
|
|
- DELETE("delete");
|
|
|
-
|
|
|
- private final String id;
|
|
|
-
|
|
|
- OpType(String id) {
|
|
|
- this.id = id;
|
|
|
- }
|
|
|
-
|
|
|
- public static OpType fromString(String opType) {
|
|
|
- String lowerOpType = opType.toLowerCase(Locale.ROOT);
|
|
|
- return switch (lowerOpType) {
|
|
|
- case "noop" -> OpType.NOOP;
|
|
|
- case "index" -> OpType.INDEX;
|
|
|
- case "delete" -> OpType.DELETE;
|
|
|
- default -> throw new IllegalArgumentException(
|
|
|
- "Operation type [" + lowerOpType + "] not allowed, only " + Arrays.toString(values()) + " are allowed"
|
|
|
- );
|
|
|
- };
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public String toString() {
|
|
|
- return id.toLowerCase(Locale.ROOT);
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
static class ScrollConsumableHitsResponse {
|