Browse Source

Bump reindex-from-remote's buffer to 200mb

It was 10mb and that was causing trouble when folks reindex-from-remoted
with large documents.

We also improve the error reporting so it tells folks to use a smaller
batch size if they hit a buffer size exception. Finally, adds some docs
to reindex-from-remote mentioning the buffer and giving an example of
lowering the size.

Closes #21185
Nik Everett 9 years ago
parent
commit
a612e5988e

+ 12 - 5
client/rest/src/main/java/org/elasticsearch/client/HeapBufferedAsyncResponseConsumer.java

@@ -46,7 +46,7 @@ public class HeapBufferedAsyncResponseConsumer extends AbstractAsyncResponseCons
     //default buffer limit is 10MB
     public static final int DEFAULT_BUFFER_LIMIT = 10 * 1024 * 1024;
 
-    private final int bufferLimit;
+    private final int bufferLimitBytes;
     private volatile HttpResponse response;
     private volatile SimpleInputBuffer buf;
 
@@ -54,7 +54,7 @@ public class HeapBufferedAsyncResponseConsumer extends AbstractAsyncResponseCons
      * Creates a new instance of this consumer with a buffer limit of {@link #DEFAULT_BUFFER_LIMIT}
      */
     public HeapBufferedAsyncResponseConsumer() {
-        this.bufferLimit = DEFAULT_BUFFER_LIMIT;
+        this.bufferLimitBytes = DEFAULT_BUFFER_LIMIT;
     }
 
     /**
@@ -64,7 +64,14 @@ public class HeapBufferedAsyncResponseConsumer extends AbstractAsyncResponseCons
         if (bufferLimit <= 0) {
             throw new IllegalArgumentException("bufferLimit must be greater than 0");
         }
-        this.bufferLimit = bufferLimit;
+        this.bufferLimitBytes = bufferLimit;
+    }
+
+    /**
+     * Get the limit of the buffer.
+     */
+    public int getBufferLimit() {
+        return bufferLimitBytes;
     }
 
     @Override
@@ -75,9 +82,9 @@ public class HeapBufferedAsyncResponseConsumer extends AbstractAsyncResponseCons
     @Override
     protected void onEntityEnclosed(HttpEntity entity, ContentType contentType) throws IOException {
         long len = entity.getContentLength();
-        if (len > bufferLimit) {
+        if (len > bufferLimitBytes) {
             throw new ContentTooLongException("entity content is too long [" + len +
-                    "] for the configured buffer limit [" + bufferLimit + "]");
+                    "] for the configured buffer limit [" + bufferLimitBytes + "]");
         }
         if (len < 0) {
             len = 4096;

+ 36 - 0
docs/reference/docs/reindex.asciidoc

@@ -421,6 +421,42 @@ version.
 To enable queries sent to older versions of Elasticsearch the `query` parameter
 is sent directly to the remote host without validation or modification.
 
+Reindexing from a remote server uses an on-heap buffer that defaults to a
+maximum size of 200mb. If the remote index includes very large documents you'll
+need to use a smaller batch size. The example below sets the batch size `10`
+which is very, very small.
+
+[source,js]
+--------------------------------------------------
+POST _reindex
+{
+  "source": {
+    "remote": {
+      "host": "http://otherhost:9200",
+      "username": "user",
+      "password": "pass"
+    },
+    "index": "source",
+    "size": 10,
+    "query": {
+      "match": {
+        "test": "data"
+      }
+    }
+  },
+  "dest": {
+    "index": "dest"
+  }
+}
+--------------------------------------------------
+// CONSOLE
+// TEST[setup:host]
+// TEST[s/^/PUT source\n/]
+// TEST[s/otherhost:9200",/\${host}"/]
+// TEST[s/"username": "user",//]
+// TEST[s/"password": "pass"//]
+
+
 [float]
 === URL Parameters
 

+ 13 - 1
modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.index.reindex.remote;
 
+import org.apache.http.ContentTooLongException;
 import org.apache.http.HttpEntity;
 import org.apache.http.util.EntityUtils;
 import org.apache.logging.log4j.Logger;
@@ -29,6 +30,7 @@ import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.client.HeapBufferedAsyncResponseConsumer;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.client.ResponseListener;
 import org.elasticsearch.client.RestClient;
@@ -37,6 +39,8 @@ import org.elasticsearch.common.ParseFieldMatcher;
 import org.elasticsearch.common.ParseFieldMatcherSupplier;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -67,6 +71,10 @@ import static org.elasticsearch.index.reindex.remote.RemoteResponseParsers.MAIN_
 import static org.elasticsearch.index.reindex.remote.RemoteResponseParsers.RESPONSE_PARSER;
 
 public class RemoteScrollableHitSource extends ScrollableHitSource {
+    /**
+     * The maximum size of the remote response to buffer. 200mb because bulks beyond 40mb tend to be slow anyway but 200mb is simply huge.
+     */
+    private static final ByteSizeValue BUFFER_LIMIT = new ByteSizeValue(200, ByteSizeUnit.MB);
     private final RestClient client;
     private final BytesReference query;
     private final SearchRequest searchRequest;
@@ -142,7 +150,8 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
 
             @Override
             protected void doRun() throws Exception {
-                client.performRequestAsync(method, uri, params, entity, new ResponseListener() {
+                HeapBufferedAsyncResponseConsumer consumer = new HeapBufferedAsyncResponseConsumer(BUFFER_LIMIT.bytesAsInt());
+                client.performRequestAsync(method, uri, params, entity, consumer, new ResponseListener() {
                     @Override
                     public void onSuccess(org.elasticsearch.client.Response response) {
                         // Restore the thread context to get the precious headers
@@ -184,6 +193,9 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
                             }
                             e = wrapExceptionToPreserveStatus(re.getResponse().getStatusLine().getStatusCode(),
                                     re.getResponse().getEntity(), re);
+                        } else if (e instanceof ContentTooLongException) {
+                            e = new IllegalArgumentException(
+                                    "Remote responded with a chunk that was too large. Use a smaller batch size.", e);
                         }
                         fail.accept(e);
                     }

+ 42 - 1
modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.index.reindex.remote;
 
+import org.apache.http.ContentTooLongException;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpEntityEnclosingRequest;
 import org.apache.http.HttpHost;
@@ -39,10 +40,13 @@ import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.client.HeapBufferedAsyncResponseConsumer;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.common.ParsingException;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.io.Streams;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.index.reindex.ScrollableHitSource.Response;
@@ -76,7 +80,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class RemoteScrollableHitSourceTests extends ESTestCase {
-    private final String FAKE_SCROLL_ID = "DnF1ZXJ5VGhlbkZldGNoBQAAAfakescroll";
+    private static final String FAKE_SCROLL_ID = "DnF1ZXJ5VGhlbkZldGNoBQAAAfakescroll";
     private int retries;
     private ThreadPool threadPool;
     private SearchRequest searchRequest;
@@ -429,6 +433,39 @@ public class RemoteScrollableHitSourceTests extends ESTestCase {
         assertEquals(badEntityException, wrapped.getSuppressed()[0]);
     }
 
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public void testTooLargeResponse() throws Exception {
+        ContentTooLongException tooLong = new ContentTooLongException("too long!");
+        CloseableHttpAsyncClient httpClient = mock(CloseableHttpAsyncClient.class);
+        when(httpClient.<HttpResponse>execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class),
+                any(FutureCallback.class))).then(new Answer<Future<HttpResponse>>() {
+                    @Override
+                    public Future<HttpResponse> answer(InvocationOnMock invocationOnMock) throws Throwable {
+                        HeapBufferedAsyncResponseConsumer consumer = (HeapBufferedAsyncResponseConsumer) invocationOnMock.getArguments()[1];
+                        FutureCallback callback = (FutureCallback) invocationOnMock.getArguments()[2];
+
+                        assertEquals(new ByteSizeValue(200, ByteSizeUnit.MB).bytesAsInt(), consumer.getBufferLimit());
+                        callback.failed(tooLong);
+                        return null;
+                    }
+                });
+        RemoteScrollableHitSource source = sourceWithMockedClient(true, httpClient);
+
+        AtomicBoolean called = new AtomicBoolean();
+        Consumer<Response> checkResponse = r -> called.set(true);
+        Throwable e = expectThrows(RuntimeException.class,
+                () -> source.doStartNextScroll(FAKE_SCROLL_ID, timeValueMillis(0), checkResponse));
+        // Unwrap the some artifacts from the test
+        while (e.getMessage().equals("failed")) {
+            e = e.getCause();
+        }
+        // This next exception is what the user sees
+        assertEquals("Remote responded with a chunk that was too large. Use a smaller batch size.", e.getMessage());
+        // And that exception is reported as being caused by the underlying exception returned by the client
+        assertSame(tooLong, e.getCause());
+        assertFalse(called.get());
+    }
+
     private RemoteScrollableHitSource sourceWithMockedRemoteCall(String... paths) throws Exception {
         return sourceWithMockedRemoteCall(true, paths);
     }
@@ -482,7 +519,11 @@ public class RemoteScrollableHitSourceTests extends ESTestCase {
                 return null;
             }
         });
+        return sourceWithMockedClient(mockRemoteVersion, httpClient);
+    }
 
+    private RemoteScrollableHitSource sourceWithMockedClient(boolean mockRemoteVersion, CloseableHttpAsyncClient httpClient)
+            throws Exception {
         HttpAsyncClientBuilder clientBuilder = mock(HttpAsyncClientBuilder.class);
         when(clientBuilder.build()).thenReturn(httpClient);