|
@@ -43,6 +43,7 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
|
|
import org.elasticsearch.common.io.stream.Streamable;
|
|
|
+import org.elasticsearch.common.io.stream.Writeable;
|
|
|
import org.elasticsearch.tasks.Task;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.transport.NodeShouldNotConnectException;
|
|
@@ -61,7 +62,6 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicReferenceArray;
|
|
|
-import java.util.function.Supplier;
|
|
|
|
|
|
/**
|
|
|
* Abstraction for transporting aggregated shard-level operations in a single request (NodeRequest) per-node
|
|
@@ -90,7 +90,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
|
|
|
TransportService transportService,
|
|
|
ActionFilters actionFilters,
|
|
|
IndexNameExpressionResolver indexNameExpressionResolver,
|
|
|
- Supplier<Request> request,
|
|
|
+ Writeable.Reader<Request> request,
|
|
|
String executor) {
|
|
|
this(actionName, clusterService, transportService, actionFilters, indexNameExpressionResolver, request, executor, true);
|
|
|
}
|
|
@@ -101,10 +101,10 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
|
|
|
TransportService transportService,
|
|
|
ActionFilters actionFilters,
|
|
|
IndexNameExpressionResolver indexNameExpressionResolver,
|
|
|
- Supplier<Request> request,
|
|
|
+ Writeable.Reader<Request> request,
|
|
|
String executor,
|
|
|
boolean canTripCircuitBreaker) {
|
|
|
- super(actionName, canTripCircuitBreaker, transportService, request, actionFilters);
|
|
|
+ super(actionName, canTripCircuitBreaker, transportService, actionFilters, request);
|
|
|
|
|
|
this.clusterService = clusterService;
|
|
|
this.transportService = transportService;
|
|
@@ -314,9 +314,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
|
|
|
transportService.sendRequest(node, transportNodeBroadcastAction, nodeRequest, new TransportResponseHandler<NodeResponse>() {
|
|
|
@Override
|
|
|
public NodeResponse read(StreamInput in) throws IOException {
|
|
|
- NodeResponse nodeResponse = new NodeResponse();
|
|
|
- nodeResponse.readFrom(in);
|
|
|
- return nodeResponse;
|
|
|
+ return new NodeResponse(in);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -505,7 +503,16 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
|
|
|
protected List<BroadcastShardOperationFailedException> exceptions;
|
|
|
protected List<ShardOperationResult> results;
|
|
|
|
|
|
- NodeResponse() {
|
|
|
+ NodeResponse(StreamInput in) throws IOException {
|
|
|
+ super(in);
|
|
|
+ nodeId = in.readString();
|
|
|
+ totalShards = in.readVInt();
|
|
|
+ results = in.readList((stream) -> stream.readBoolean() ? readShardResult(stream) : null);
|
|
|
+ if (in.readBoolean()) {
|
|
|
+ exceptions = in.readList(BroadcastShardOperationFailedException::new);
|
|
|
+ } else {
|
|
|
+ exceptions = null;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
NodeResponse(String nodeId,
|
|
@@ -536,15 +543,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
|
|
|
|
|
|
@Override
|
|
|
public void readFrom(StreamInput in) throws IOException {
|
|
|
- super.readFrom(in);
|
|
|
- nodeId = in.readString();
|
|
|
- totalShards = in.readVInt();
|
|
|
- results = in.readList((stream) -> stream.readBoolean() ? readShardResult(stream) : null);
|
|
|
- if (in.readBoolean()) {
|
|
|
- exceptions = in.readList(BroadcastShardOperationFailedException::new);
|
|
|
- } else {
|
|
|
- exceptions = null;
|
|
|
- }
|
|
|
+ throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
|
|
|
}
|
|
|
|
|
|
@Override
|