Browse Source

Properly serialize remote query in ReindexRequest (#43457)

This commit modifies the RemoteInfo to clarify that a search query
must always be serialized as JSON. Additionally, it adds an assertion
to ensure that this is the case. This fixes #43406.

Additionally, this PR implements AbstractXContentTestCase for the
reindex request. This is related to #43456.
Tim Brooks 6 years ago
parent
commit
00334cc9b7

+ 7 - 2
modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteBuildRestClientTests.java

@@ -22,9 +22,11 @@ package org.elasticsearch.index.reindex;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilderTestCase;
 import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.TestEnvironment;
+import org.elasticsearch.index.query.MatchAllQueryBuilder;
 import org.elasticsearch.watcher.ResourceWatcherService;
 
 import java.util.ArrayList;
@@ -38,9 +40,12 @@ import static org.hamcrest.Matchers.hasSize;
 import static org.mockito.Mockito.mock;
 
 public class ReindexFromRemoteBuildRestClientTests extends RestClientBuilderTestCase {
+
+    private final BytesReference matchAll = new BytesArray(new MatchAllQueryBuilder().toString());
+
     public void testBuildRestClient() throws Exception {
         for(final String path: new String[]{"", null, "/", "path"}) {
-            RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, path, new BytesArray("ignored"), null, null, emptyMap(),
+            RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, path, matchAll, null, null, emptyMap(),
                 RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
             long taskId = randomLong();
             List<Thread> threads = synchronizedList(new ArrayList<>());
@@ -64,7 +69,7 @@ public class ReindexFromRemoteBuildRestClientTests extends RestClientBuilderTest
         for (int i = 0; i < numHeaders; ++i) {
             headers.put("header" + i, Integer.toString(i));
         }
-        RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, null, new BytesArray("ignored"), null, null,
+        RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, null, matchAll, null, null,
             headers, RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
         long taskId = randomLong();
         List<Thread> threads = synchronizedList(new ArrayList<>());

+ 6 - 2
modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWhitelistTests.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.index.reindex;
 
 import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.test.ESTestCase;
 
 import java.net.UnknownHostException;
@@ -37,6 +38,9 @@ import static org.elasticsearch.index.reindex.TransportReindexAction.checkRemote
  * Tests the reindex-from-remote whitelist of remotes.
  */
 public class ReindexFromRemoteWhitelistTests extends ESTestCase {
+
+    private final BytesReference query = new BytesArray("{ \"foo\" : \"bar\" }");
+
     public void testLocalRequestWithoutWhitelist() {
         checkRemoteWhitelist(buildRemoteWhitelist(emptyList()), null);
     }
@@ -49,7 +53,7 @@ public class ReindexFromRemoteWhitelistTests extends ESTestCase {
      * Build a {@link RemoteInfo}, defaulting values that we don't care about in this test to values that don't hurt anything.
      */
     private RemoteInfo newRemoteInfo(String host, int port) {
-        return new RemoteInfo(randomAlphaOfLength(5), host, port, null, new BytesArray("test"), null, null, emptyMap(),
+        return new RemoteInfo(randomAlphaOfLength(5), host, port, null, query, null, null, emptyMap(),
                 RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
     }
 
@@ -63,7 +67,7 @@ public class ReindexFromRemoteWhitelistTests extends ESTestCase {
 
     public void testWhitelistedByPrefix() {
         checkRemoteWhitelist(buildRemoteWhitelist(singletonList("*.example.com:9200")),
-                new RemoteInfo(randomAlphaOfLength(5), "es.example.com", 9200, null, new BytesArray("test"), null, null, emptyMap(),
+                new RemoteInfo(randomAlphaOfLength(5), "es.example.com", 9200, null, query, null, null, emptyMap(),
                         RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
         checkRemoteWhitelist(buildRemoteWhitelist(singletonList("*.example.com:9200")),
                 newRemoteInfo("6e134134a1.us-east-1.aws.example.com", 9200));

+ 3 - 2
modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexRestClientSslTests.java

@@ -199,8 +199,9 @@ public class ReindexRestClientSslTests extends ESTestCase {
     }
 
     private RemoteInfo getRemoteInfo() {
-        return new RemoteInfo("https", server.getAddress().getHostName(), server.getAddress().getPort(), "/", new BytesArray("test"),
-            "user", "password", Collections.emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
+        return new RemoteInfo("https", server.getAddress().getHostName(), server.getAddress().getPort(), "/",
+            new BytesArray("{\"match_all\":{}}"), "user", "password", Collections.emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT,
+            RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
     }
 
     @SuppressForbidden(reason = "use http server")

+ 5 - 2
modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexSourceTargetValidationTests.java

@@ -32,6 +32,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.test.ESTestCase;
@@ -61,6 +62,8 @@ public class ReindexSourceTargetValidationTests extends ESTestCase {
     private static final AutoCreateIndex AUTO_CREATE_INDEX = new AutoCreateIndex(Settings.EMPTY,
             new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), INDEX_NAME_EXPRESSION_RESOLVER);
 
+    private final BytesReference query = new BytesArray("{ \"foo\" : \"bar\" }");
+
     public void testObviousCases() {
         fails("target", "target");
         fails("target", "foo", "bar", "target", "baz");
@@ -106,10 +109,10 @@ public class ReindexSourceTargetValidationTests extends ESTestCase {
 
     public void testRemoteInfoSkipsValidation() {
         // The index doesn't have to exist
-        succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, null, new BytesArray("test"), null, null, emptyMap(),
+        succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, null, query, null, null, emptyMap(),
                 RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT), "does_not_exist", "target");
         // And it doesn't matter if they are the same index. They are considered to be different because the remote one is, well, remote.
-        succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, null, new BytesArray("test"), null, null, emptyMap(),
+        succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, null, query, null, null, emptyMap(),
                 RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT), "target", "target");
     }
 

+ 1 - 1
modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java

@@ -51,7 +51,7 @@ public class RoundTripTests extends ESTestCase {
         reindex.getDestination().index("test");
         if (randomBoolean()) {
             int port = between(1, Integer.MAX_VALUE);
-            BytesReference query = new BytesArray(randomAlphaOfLength(5));
+            BytesReference query = new BytesArray("{\"match_all\":{}}");
             String username = randomBoolean() ? randomAlphaOfLength(5) : null;
             String password = username != null && randomBoolean() ? randomAlphaOfLength(5) : null;
             int headersCount = randomBoolean() ? 0 : between(1, 10);

+ 8 - 8
modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteInfoTests.java

@@ -19,28 +19,28 @@
 
 package org.elasticsearch.index.reindex.remote;
 
-import org.elasticsearch.index.reindex.RemoteInfo;
 import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.index.reindex.RemoteInfo;
 import org.elasticsearch.test.ESTestCase;
 
 import static java.util.Collections.emptyMap;
 
 public class RemoteInfoTests extends ESTestCase {
     private RemoteInfo newRemoteInfo(String scheme, String prefixPath, String username, String password) {
-        return new RemoteInfo(scheme, "testhost", 12344, prefixPath, new BytesArray("testquery"), username, password, emptyMap(),
-                RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
+        return new RemoteInfo(scheme, "testhost", 12344, prefixPath,new BytesArray("{ \"foo\" : \"bar\" }"), username, password,
+            emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
     }
 
     public void testToString() {
-        assertEquals("host=testhost port=12344 query=testquery",
+        assertEquals("host=testhost port=12344 query={ \"foo\" : \"bar\" }",
             newRemoteInfo("http", null, null, null).toString());
-        assertEquals("host=testhost port=12344 query=testquery username=testuser",
+        assertEquals("host=testhost port=12344 query={ \"foo\" : \"bar\" } username=testuser",
             newRemoteInfo("http", null, "testuser", null).toString());
-        assertEquals("host=testhost port=12344 query=testquery username=testuser password=<<>>",
+        assertEquals("host=testhost port=12344 query={ \"foo\" : \"bar\" } username=testuser password=<<>>",
             newRemoteInfo("http", null, "testuser", "testpass").toString());
-        assertEquals("scheme=https host=testhost port=12344 query=testquery username=testuser password=<<>>",
+        assertEquals("scheme=https host=testhost port=12344 query={ \"foo\" : \"bar\" } username=testuser password=<<>>",
             newRemoteInfo("https", null, "testuser", "testpass").toString());
-        assertEquals("scheme=https host=testhost port=12344 pathPrefix=prxy query=testquery username=testuser password=<<>>",
+        assertEquals("scheme=https host=testhost port=12344 pathPrefix=prxy query={ \"foo\" : \"bar\" } username=testuser password=<<>>",
             newRemoteInfo("https", "prxy", "testuser", "testpass").toString());
     }
 }

+ 2 - 19
server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java

@@ -33,12 +33,10 @@ import org.elasticsearch.common.logging.DeprecationLogger;
 import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.ObjectParser;
-import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
-import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.query.QueryBuilder;
@@ -58,7 +56,6 @@ import static java.util.Objects.requireNonNull;
 import static org.elasticsearch.action.ValidateActions.addValidationError;
 import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
 import static org.elasticsearch.index.VersionType.INTERNAL;
-import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
 
 /**
  * Request to reindex some documents from one index to another. This implements CompositeIndicesRequest but in a misleading way. Rather than
@@ -303,7 +300,7 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ
             builder.startObject("source");
             if (remoteInfo != null) {
                 builder.field("remote", remoteInfo);
-                builder.rawField("query", remoteInfo.getQuery().streamInput(), builder.contentType());
+                builder.rawField("query", remoteInfo.getQuery().streamInput(), RemoteInfo.QUERY_CONTENT_TYPE.type());
             }
             builder.array("index", getSearchRequest().indices());
             getSearchRequest().source().innerToXContent(builder, params);
@@ -448,7 +445,7 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ
             throw new IllegalArgumentException(
                 "Unsupported fields in [remote]: [" + Strings.collectionToCommaDelimitedString(remote.keySet()) + "]");
         }
-        return new RemoteInfo(scheme, host, port, pathPrefix, queryForRemote(source),
+        return new RemoteInfo(scheme, host, port, pathPrefix, RemoteInfo.queryForRemote(source),
             username, password, headers, socketTimeout, connectTimeout);
     }
 
@@ -487,20 +484,6 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ
         return string == null ? defaultValue : parseTimeValue(string, name);
     }
 
-    private static BytesReference queryForRemote(Map<String, Object> source) throws IOException {
-        XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
-        Object query = source.remove("query");
-        if (query == null) {
-            return BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS));
-        }
-        if (!(query instanceof Map)) {
-            throw new IllegalArgumentException("Expected [query] to be an object but was [" + query + "]");
-        }
-        @SuppressWarnings("unchecked")
-        Map<String, Object> map = (Map<String, Object>) query;
-        return BytesReference.bytes(builder.map(map));
-    }
-
     static void setMaxDocsValidateIdentical(AbstractBulkByScrollRequest<?> request, int maxDocs) {
         if (request.getMaxDocs() != AbstractBulkByScrollRequest.MAX_DOCS_ALL_MATCHES && request.getMaxDocs() != maxDocs) {
             throw new IllegalArgumentException("[max_docs] set to two different values [" + request.getMaxDocs() + "]" +

+ 57 - 0
server/src/main/java/org/elasticsearch/index/reindex/RemoteInfo.java

@@ -25,16 +25,24 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.DeprecationHandler;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
 
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 import static java.util.Collections.unmodifiableMap;
 import static java.util.Objects.requireNonNull;
 import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
+import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
 
 public class RemoteInfo implements Writeable, ToXContentObject {
     /**
@@ -46,6 +54,8 @@ public class RemoteInfo implements Writeable, ToXContentObject {
      */
     public static final TimeValue DEFAULT_CONNECT_TIMEOUT = timeValueSeconds(30);
 
+    public static final XContent QUERY_CONTENT_TYPE = JsonXContent.jsonXContent;
+
     private final String scheme;
     private final String host;
     private final int port;
@@ -65,6 +75,7 @@ public class RemoteInfo implements Writeable, ToXContentObject {
 
     public RemoteInfo(String scheme, String host, int port, String pathPrefix, BytesReference query, String username, String password,
                       Map<String, String> headers, TimeValue socketTimeout, TimeValue connectTimeout) {
+        assert isQueryJson(query) : "Query does not appear to be JSON";
         this.scheme = requireNonNull(scheme, "[scheme] must be specified to reindex from a remote cluster");
         this.host = requireNonNull(host, "[host] must be specified to reindex from a remote cluster");
         this.port = port;
@@ -205,4 +216,50 @@ public class RemoteInfo implements Writeable, ToXContentObject {
         builder.endObject();
         return builder;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        RemoteInfo that = (RemoteInfo) o;
+        return port == that.port &&
+            Objects.equals(scheme, that.scheme) &&
+            Objects.equals(host, that.host) &&
+            Objects.equals(pathPrefix, that.pathPrefix) &&
+            Objects.equals(query, that.query) &&
+            Objects.equals(username, that.username) &&
+            Objects.equals(password, that.password) &&
+            Objects.equals(headers, that.headers) &&
+            Objects.equals(socketTimeout, that.socketTimeout) &&
+            Objects.equals(connectTimeout, that.connectTimeout);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(scheme, host, port, pathPrefix, query, username, password, headers, socketTimeout, connectTimeout);
+    }
+
+    static BytesReference queryForRemote(Map<String, Object> source) throws IOException {
+        XContentBuilder builder = XContentBuilder.builder(QUERY_CONTENT_TYPE).prettyPrint();
+        Object query = source.remove("query");
+        if (query == null) {
+            return BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS));
+        }
+        if (!(query instanceof Map)) {
+            throw new IllegalArgumentException("Expected [query] to be an object but was [" + query + "]");
+        }
+        @SuppressWarnings("unchecked")
+        Map<String, Object> map = (Map<String, Object>) query;
+        return BytesReference.bytes(builder.map(map));
+    }
+
+    private static boolean isQueryJson(BytesReference bytesReference) {
+        try (XContentParser parser = QUERY_CONTENT_TYPE.createParser(NamedXContentRegistry.EMPTY,
+            DeprecationHandler.THROW_UNSUPPORTED_OPERATION, bytesReference.streamInput())) {
+            Map<String, Object> query = parser.map();
+            return true;
+        } catch (IOException e) {
+            throw new AssertionError("Could not parse JSON", e);
+        }
+    }
 }

+ 6 - 1
server/src/test/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestTestCase.java

@@ -21,7 +21,9 @@ package org.elasticsearch.index.reindex;
 
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.ActiveShardCount;
+import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.test.AbstractXContentTestCase;
 import org.elasticsearch.test.ESTestCase;
 
 import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
@@ -29,9 +31,12 @@ import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
 /**
  * Shared superclass for testing reindex and friends. In particular it makes sure to test the slice features.
  */
-public abstract class AbstractBulkByScrollRequestTestCase<R extends AbstractBulkByScrollRequest<R>> extends ESTestCase {
+public abstract class AbstractBulkByScrollRequestTestCase<R extends AbstractBulkByScrollRequest<R> & ToXContent>
+    extends AbstractXContentTestCase<R> {
+
     public void testForSlice() {
         R original = newRequest();
+        extraRandomizationForSlice(original);
         original.setAbortOnVersionConflict(randomBoolean());
         original.setRefresh(randomBoolean());
         original.setTimeout(parseTimeValue(randomPositiveTimeValue(), "timeout"));

+ 27 - 0
server/src/test/java/org/elasticsearch/index/reindex/DeleteByQueryRequestTests.java

@@ -22,8 +22,11 @@ package org.elasticsearch.index.reindex;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.index.query.QueryBuilders;
 
+import java.io.IOException;
+
 import static org.apache.lucene.util.TestUtil.randomSimpleString;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.is;
@@ -100,4 +103,28 @@ public class DeleteByQueryRequestTests extends AbstractBulkByScrollRequestTestCa
 
         assertThat(e, is(nullValue()));
     }
+
+    // TODO: Implement standard to/from x-content parsing tests
+
+    @Override
+    protected DeleteByQueryRequest createTestInstance() {
+        return newRequest();
+    }
+
+    @Override
+    protected DeleteByQueryRequest doParseInstance(XContentParser parser) throws IOException {
+        XContentParser.Token token;
+        while ((token = parser.nextToken()) != null) {
+        }
+        return newRequest();
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return false;
+    }
+
+    @Override
+    protected void assertEqualInstances(DeleteByQueryRequest expectedInstance, DeleteByQueryRequest newInstance) {
+    }
 }

+ 102 - 9
server/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java

@@ -22,12 +22,19 @@ package org.elasticsearch.index.reindex;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.index.query.TermQueryBuilder;
+import org.elasticsearch.search.SearchModule;
 import org.elasticsearch.search.slice.SliceBuilder;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -41,12 +48,101 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
  */
 public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase<ReindexRequest> {
 
+    private final BytesReference matchAll = new BytesArray("{ \"foo\" : \"bar\" }");
+
+    @Override
+    protected NamedWriteableRegistry writableRegistry() {
+        SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList());
+        return new NamedWriteableRegistry(searchModule.getNamedWriteables());
+    }
+
+    @Override
+    protected NamedXContentRegistry xContentRegistry() {
+        SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList());
+        return new NamedXContentRegistry(searchModule.getNamedXContents());
+    }
+
+    @Override
+    protected boolean enableWarningsCheck() {
+        // There sometimes will be a warning about specifying types in reindex requests being deprecated.
+        return false;
+    }
+
+    @Override
+    protected ReindexRequest createTestInstance() {
+        ReindexRequest reindexRequest = new ReindexRequest();
+        reindexRequest.setSourceIndices("source");
+        reindexRequest.setDestIndex("destination");
+
+        if (randomBoolean()) {
+            try (XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint()) {
+                BytesReference query = BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS));
+                reindexRequest.setRemoteInfo(new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), between(1, Integer.MAX_VALUE),
+                    null, query, "user", "pass", emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
+            } catch (IOException e) {
+                throw new AssertionError(e);
+            }
+        }
+
+        if (randomBoolean()) {
+            reindexRequest.setSourceBatchSize(randomInt(100));
+        }
+        if (randomBoolean()) {
+            reindexRequest.setDestDocType("type");
+        }
+        if (randomBoolean()) {
+            reindexRequest.setDestOpType("create");
+        }
+        if (randomBoolean()) {
+            reindexRequest.setDestPipeline("my_pipeline");
+        }
+        if (randomBoolean()) {
+            reindexRequest.setDestRouting("=cat");
+        }
+        if (randomBoolean()) {
+            reindexRequest.setMaxDocs(randomIntBetween(100, 1000));
+        }
+        if (randomBoolean()) {
+            reindexRequest.setAbortOnVersionConflict(false);
+        }
+
+        if (reindexRequest.getRemoteInfo() == null && randomBoolean()) {
+            reindexRequest.setSourceQuery(new TermQueryBuilder("foo", "fooval"));
+        }
+
+        return reindexRequest;
+    }
+
+    @Override
+    protected ReindexRequest doParseInstance(XContentParser parser) throws IOException {
+        return ReindexRequest.fromXContent(parser);
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return false;
+    }
+
+    @Override
+    protected void assertEqualInstances(ReindexRequest expectedInstance, ReindexRequest newInstance) {
+        assertNotSame(newInstance, expectedInstance);
+        assertArrayEquals(expectedInstance.getSearchRequest().indices(), newInstance.getSearchRequest().indices());
+        assertEquals(expectedInstance.getSearchRequest(), newInstance.getSearchRequest());
+        assertEquals(expectedInstance.getMaxDocs(), newInstance.getMaxDocs());
+        assertEquals(expectedInstance.getSlices(), newInstance.getSlices());
+        assertEquals(expectedInstance.isAbortOnVersionConflict(), newInstance.isAbortOnVersionConflict());
+        assertEquals(expectedInstance.getRemoteInfo(), newInstance.getRemoteInfo());
+        assertEquals(expectedInstance.getDestination().getPipeline(), newInstance.getDestination().getPipeline());
+        assertEquals(expectedInstance.getDestination().routing(), newInstance.getDestination().routing());
+        assertEquals(expectedInstance.getDestination().opType(), newInstance.getDestination().opType());
+        assertEquals(expectedInstance.getDestination().type(), newInstance.getDestination().type());
+    }
+
     public void testReindexFromRemoteDoesNotSupportSearchQuery() {
         ReindexRequest reindex = newRequest();
         reindex.setRemoteInfo(
                 new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), between(1, Integer.MAX_VALUE), null,
-                    new BytesArray("real_query"), null, null, emptyMap(),
-                    RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
+                    matchAll, null, null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
         reindex.getSearchRequest().source().query(matchAllQuery()); // Unsupported place to put query
         ActionRequestValidationException e = reindex.validate();
         assertEquals("Validation Failed: 1: reindex from remote sources should use RemoteInfo's query instead of source's query;",
@@ -57,8 +153,7 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase<Rei
         ReindexRequest reindex = newRequest();
         reindex.setRemoteInfo(
                 new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), between(1, Integer.MAX_VALUE), null,
-                    new BytesArray("real_query"), null, null, emptyMap(),
-                    RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
+                    matchAll, null, null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
         reindex.setSlices(between(2, Integer.MAX_VALUE));
         ActionRequestValidationException e = reindex.validate();
         assertEquals(
@@ -80,10 +175,9 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase<Rei
             original.setScript(mockScript(randomAlphaOfLength(5)));
         }
         if (randomBoolean()) {
-            original.setRemoteInfo(new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), between(1, 10000),
-                    null, new BytesArray(randomAlphaOfLength(5)), null, null, emptyMap(),
-                    parseTimeValue(randomPositiveTimeValue(), "socket_timeout"),
-                    parseTimeValue(randomPositiveTimeValue(), "connect_timeout")));
+            original.setRemoteInfo(new RemoteInfo(randomAlphaOfLength(5), randomAlphaOfLength(5), between(1, 10000), null, matchAll, null,
+                null, emptyMap(), parseTimeValue(randomPositiveTimeValue(), "socket_timeout"),
+                parseTimeValue(randomPositiveTimeValue(), "connect_timeout")));
         }
     }
 
@@ -230,5 +324,4 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase<Rei
 
         return ReindexRequest.buildRemoteInfo(source);
     }
-
 }

+ 27 - 0
server/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryRequestTests.java

@@ -20,6 +20,9 @@
 package org.elasticsearch.index.reindex;
 
 import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
 
 import static org.apache.lucene.util.TestUtil.randomSimpleString;
 
@@ -77,4 +80,28 @@ public class UpdateByQueryRequestTests extends AbstractBulkByScrollRequestTestCa
         assertEquals(original.getScript(), forSliced.getScript());
         assertEquals(original.getPipeline(), forSliced.getPipeline());
     }
+
+    // TODO: Implement standard to/from x-content parsing tests
+
+    @Override
+    protected UpdateByQueryRequest createTestInstance() {
+        return newRequest();
+    }
+
+    @Override
+    protected UpdateByQueryRequest doParseInstance(XContentParser parser) throws IOException {
+        XContentParser.Token token;
+        while ((token = parser.nextToken()) != null) {
+        }
+        return newRequest();
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return false;
+    }
+
+    @Override
+    protected void assertEqualInstances(UpdateByQueryRequest expectedInstance, UpdateByQueryRequest newInstance) {
+    }
 }