ソースを参照

Changed ReindexRequest to use Writeable.Reader (#32401)

-- This is a pre-stage for adding the reindex API to the REST high-level-client
-- Follows the pattern set in #26315
Sohaib Iftikhar 7 年 前
コミット
4fa92cbf49

+ 2 - 2
modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java

@@ -27,13 +27,13 @@ import org.elasticsearch.client.ParentTaskAssigningClient;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
-import java.util.function.Supplier;
 
 public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteByQueryRequest, BulkByScrollResponse> {
 
@@ -46,7 +46,7 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
     public TransportDeleteByQueryAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters, Client client,
                                         TransportService transportService, ScriptService scriptService, ClusterService clusterService) {
         super(settings, DeleteByQueryAction.NAME, transportService, actionFilters,
-            (Supplier<DeleteByQueryRequest>) DeleteByQueryRequest::new);
+            (Writeable.Reader<DeleteByQueryRequest>) DeleteByQueryRequest::new);
         this.threadPool = threadPool;
         this.client = client;
         this.scriptService = scriptService;

+ 2 - 1
modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java

@@ -39,6 +39,7 @@ import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
 import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.xcontent.DeprecationHandler;
 import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
 import org.elasticsearch.action.index.IndexRequest;
@@ -104,7 +105,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
     public TransportReindexAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
             IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, ScriptService scriptService,
             AutoCreateIndex autoCreateIndex, Client client, TransportService transportService) {
-        super(settings, ReindexAction.NAME, transportService, actionFilters, ReindexRequest::new);
+        super(settings, ReindexAction.NAME, transportService, actionFilters, (Writeable.Reader<ReindexRequest>)ReindexRequest::new);
         this.threadPool = threadPool;
         this.clusterService = clusterService;
         this.scriptService = scriptService;

+ 2 - 2
modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java

@@ -29,6 +29,7 @@ import org.elasticsearch.client.ParentTaskAssigningClient;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.mapper.IdFieldMapper;
@@ -43,7 +44,6 @@ import org.elasticsearch.transport.TransportService;
 
 import java.util.Map;
 import java.util.function.BiFunction;
-import java.util.function.Supplier;
 
 public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateByQueryRequest, BulkByScrollResponse> {
 
@@ -56,7 +56,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
     public TransportUpdateByQueryAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters, Client client,
                                         TransportService transportService, ScriptService scriptService, ClusterService clusterService) {
         super(settings, UpdateByQueryAction.NAME, transportService, actionFilters,
-            (Supplier<UpdateByQueryRequest>) UpdateByQueryRequest::new);
+            (Writeable.Reader<UpdateByQueryRequest>) UpdateByQueryRequest::new);
         this.threadPool = threadPool;
         this.client = client;
         this.scriptService = scriptService;

+ 15 - 20
modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java

@@ -67,19 +67,17 @@ public class RoundTripTests extends ESTestCase {
                 new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), port, null,
                     query, username, password, headers, socketTimeout, connectTimeout));
         }
-        ReindexRequest tripped = new ReindexRequest();
-        roundTrip(reindex, tripped);
+        ReindexRequest tripped = new ReindexRequest(toInputByteStream(reindex));
         assertRequestEquals(reindex, tripped);
 
         // Try slices=auto with a version that doesn't support it, which should fail
         reindex.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
-        Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, reindex, null));
+        Exception e = expectThrows(IllegalArgumentException.class, () -> toInputByteStream(Version.V_6_0_0_alpha1, reindex));
         assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage());
 
         // Try regular slices with a version that doesn't support slices=auto, which should succeed
-        tripped = new ReindexRequest();
         reindex.setSlices(between(1, Integer.MAX_VALUE));
-        roundTrip(Version.V_6_0_0_alpha1, reindex, tripped);
+        tripped = new ReindexRequest(toInputByteStream(reindex));
         assertRequestEquals(Version.V_6_0_0_alpha1, reindex, tripped);
     }
 
@@ -89,20 +87,18 @@ public class RoundTripTests extends ESTestCase {
         if (randomBoolean()) {
             update.setPipeline(randomAlphaOfLength(5));
         }
-        UpdateByQueryRequest tripped = new UpdateByQueryRequest();
-        roundTrip(update, tripped);
+        UpdateByQueryRequest tripped = new UpdateByQueryRequest(toInputByteStream(update));
         assertRequestEquals(update, tripped);
         assertEquals(update.getPipeline(), tripped.getPipeline());
 
         // Try slices=auto with a version that doesn't support it, which should fail
         update.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
-        Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, update, null));
+        Exception e = expectThrows(IllegalArgumentException.class, () -> toInputByteStream(Version.V_6_0_0_alpha1, update));
         assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage());
 
         // Try regular slices with a version that doesn't support slices=auto, which should succeed
-        tripped = new UpdateByQueryRequest();
         update.setSlices(between(1, Integer.MAX_VALUE));
-        roundTrip(Version.V_6_0_0_alpha1, update, tripped);
+        tripped = new UpdateByQueryRequest(toInputByteStream(update));
         assertRequestEquals(update, tripped);
         assertEquals(update.getPipeline(), tripped.getPipeline());
     }
@@ -110,19 +106,17 @@ public class RoundTripTests extends ESTestCase {
     public void testDeleteByQueryRequest() throws IOException {
         DeleteByQueryRequest delete = new DeleteByQueryRequest(new SearchRequest());
         randomRequest(delete);
-        DeleteByQueryRequest tripped = new DeleteByQueryRequest();
-        roundTrip(delete, tripped);
+        DeleteByQueryRequest tripped = new DeleteByQueryRequest(toInputByteStream(delete));
         assertRequestEquals(delete, tripped);
 
         // Try slices=auto with a version that doesn't support it, which should fail
         delete.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
-        Exception e = expectThrows(IllegalArgumentException.class, () -> roundTrip(Version.V_6_0_0_alpha1, delete, null));
+        Exception e = expectThrows(IllegalArgumentException.class, () -> toInputByteStream(Version.V_6_0_0_alpha1, delete));
         assertEquals("Slices set as \"auto\" are not supported before version [6.1.0]. Found version [6.0.0-alpha1]", e.getMessage());
 
         // Try regular slices with a version that doesn't support slices=auto, which should succeed
-        tripped = new DeleteByQueryRequest();
         delete.setSlices(between(1, Integer.MAX_VALUE));
-        roundTrip(Version.V_6_0_0_alpha1, delete, tripped);
+        tripped = new DeleteByQueryRequest(toInputByteStream(delete));
         assertRequestEquals(delete, tripped);
     }
 
@@ -198,23 +192,24 @@ public class RoundTripTests extends ESTestCase {
             request.setTaskId(new TaskId(randomAlphaOfLength(5), randomLong()));
         }
         RethrottleRequest tripped = new RethrottleRequest();
-        roundTrip(request, tripped);
+        // We use readFrom here because Rethrottle does not support the Writeable.Reader interface
+        tripped.readFrom(toInputByteStream(request));
         assertEquals(request.getRequestsPerSecond(), tripped.getRequestsPerSecond(), 0.00001);
         assertArrayEquals(request.getActions(), tripped.getActions());
         assertEquals(request.getTaskId(), tripped.getTaskId());
     }
 
-    private void roundTrip(Streamable example, Streamable empty) throws IOException {
-        roundTrip(Version.CURRENT, example, empty);
+    private StreamInput toInputByteStream(Streamable example) throws IOException {
+        return toInputByteStream(Version.CURRENT, example);
     }
 
-    private void roundTrip(Version version, Streamable example, Streamable empty) throws IOException {
+    private StreamInput toInputByteStream(Version version, Streamable example) throws IOException {
         BytesStreamOutput out = new BytesStreamOutput();
         out.setVersion(version);
         example.writeTo(out);
         StreamInput in = out.bytes().streamInput();
         in.setVersion(version);
-        empty.readFrom(in);
+        return in;
     }
 
     private Script randomScript() {

+ 7 - 0
server/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java

@@ -23,8 +23,11 @@ import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.tasks.TaskId;
 
+import java.io.IOException;
+
 import static org.elasticsearch.action.ValidateActions.addValidationError;
 
 /**
@@ -53,6 +56,10 @@ public class DeleteByQueryRequest extends AbstractBulkByScrollRequest<DeleteByQu
         this(search, true);
     }
 
+    public DeleteByQueryRequest(StreamInput in) throws IOException {
+        super.readFrom(in);
+    }
+
     private DeleteByQueryRequest(SearchRequest search, boolean setDefaults) {
         super(search, setDefaults);
         // Delete-By-Query does not require the source

+ 8 - 4
server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java

@@ -59,6 +59,13 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ
         this.destination = destination;
     }
 
+    public ReindexRequest(StreamInput in) throws IOException {
+        super.readFrom(in);
+        destination = new IndexRequest();
+        destination.readFrom(in);
+        remoteInfo = in.readOptionalWriteable(RemoteInfo::new);
+    }
+
     @Override
     protected ReindexRequest self() {
         return this;
@@ -135,10 +142,7 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ
 
     @Override
     public void readFrom(StreamInput in) throws IOException {
-        super.readFrom(in);
-        destination = new IndexRequest();
-        destination.readFrom(in);
-        remoteInfo = in.readOptionalWriteable(RemoteInfo::new);
+        throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
     }
 
     @Override

+ 6 - 2
server/src/main/java/org/elasticsearch/index/reindex/UpdateByQueryRequest.java

@@ -47,6 +47,11 @@ public class UpdateByQueryRequest extends AbstractBulkIndexByScrollRequest<Updat
         this(search, true);
     }
 
+    public UpdateByQueryRequest(StreamInput in) throws IOException {
+        super.readFrom(in);
+        pipeline = in.readOptionalString();
+    }
+
     private UpdateByQueryRequest(SearchRequest search, boolean setDefaults) {
         super(search, setDefaults);
     }
@@ -108,8 +113,7 @@ public class UpdateByQueryRequest extends AbstractBulkIndexByScrollRequest<Updat
 
     @Override
     public void readFrom(StreamInput in) throws IOException {
-        super.readFrom(in);
-        pipeline = in.readOptionalString();
+        throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
     }
 
     @Override