Browse Source

Reduce allocations when persisting cluster state (#61159)

Today we allocate a new `byte[]` for each document written to the
cluster state. Some of these documents may be quite large. We need a
buffer that's at least as large as the largest document, but there's no
need to use a fresh buffer for each document.

With this commit we re-use the same `byte[]` much more, only allocating
it afresh if we need a larger one, and using the buffer needed for one
round of persistence as a hint for the size needed for the next one.
David Turner 5 years ago
parent
commit
d9060ae4ee

+ 1 - 1
server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

@@ -205,7 +205,7 @@ public class JoinHelper {
         }
 
         void logWarnWithTimestamp() {
-            logger.info(() -> new ParameterizedMessage("last failed join attempt was {} ago, failed to join {} with {}",
+            logger.warn(() -> new ParameterizedMessage("last failed join attempt was {} ago, failed to join {} with {}",
                             TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - timestamp)),
                             destination,
                             joinRequest),

+ 148 - 0
server/src/main/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutput.java

@@ -0,0 +1,148 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.bytes;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefIterator;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.io.stream.BytesStream;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.ByteArray;
+import org.elasticsearch.core.internal.io.IOUtils;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * An in-memory {@link StreamOutput} which first fills the given {@code byte[]} and then allocates more space from the given
+ * {@link BigArrays} if needed. The idea is that you can use this for passing data to an API that requires a single {@code byte[]} (or a
+ * {@link org.apache.lucene.util.BytesRef}) which you'd prefer to re-use if possible, avoiding excessive allocations, but which may not
+ * always be large enough.
+ */
+public class RecyclingBytesStreamOutput extends BytesStream {
+
+    private final byte[] buffer;
+    private final BigArrays bigArrays;
+
+    private int position;
+
+    @Nullable // if buffer is large enough
+    private ByteArray overflow;
+
+    public RecyclingBytesStreamOutput(byte[] buffer, BigArrays bigArrays) {
+        this.buffer = Objects.requireNonNull(buffer);
+        this.bigArrays = Objects.requireNonNull(bigArrays);
+    }
+
+    @Override
+    public void writeByte(byte b) {
+        if (position < buffer.length) {
+            buffer[position++] = b;
+        } else {
+            ensureCapacity(position + 1);
+            overflow.set(position++ - buffer.length, b);
+        }
+    }
+
+    private void ensureCapacity(int size) {
+        final int overflowSize = size - buffer.length;
+        assert overflowSize > 0 : "no need to ensureCapacity(" + size + ") with buffer of size [" + buffer.length + "]";
+        assert position >= buffer.length
+                : "no need to ensureCapacity(" + size + ") with buffer of size [" + buffer.length + "] at position [" + position + "]";
+        if (overflow == null) {
+            overflow = bigArrays.newByteArray(overflowSize, false);
+        } else if (overflowSize > overflow.size()) {
+            overflow = bigArrays.resize(overflow, overflowSize);
+        }
+        assert overflow.size() >= overflowSize;
+    }
+
+    @Override
+    public void writeBytes(byte[] b, int offset, int length) {
+        if (position < buffer.length) {
+            final int lengthForBuffer = Math.min(length, buffer.length - position);
+            System.arraycopy(b, offset, buffer, position, lengthForBuffer);
+            position += lengthForBuffer;
+            offset += lengthForBuffer;
+            length -= lengthForBuffer;
+        }
+
+        if (length > 0) {
+            ensureCapacity(position + length);
+            overflow.set(position - buffer.length, b, offset, length);
+            position += length;
+        }
+    }
+
+    @Override
+    public void flush() {
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOUtils.close(overflow);
+    }
+
+    @Override
+    public void reset() throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Return the written bytes in a {@link BytesRef}, avoiding allocating a new {@code byte[]} if the original buffer was already large
+     * enough. If we allocate a new (larger) buffer here then callers should typically re-use it for subsequent streams.
+     */
+    public BytesRef toBytesRef() {
+        if (position <= buffer.length) {
+            assert overflow == null;
+            return new BytesRef(buffer, 0, position);
+        }
+
+        final byte[] newBuffer = new byte[position];
+        System.arraycopy(buffer, 0, newBuffer, 0, buffer.length);
+        int copyPos = buffer.length;
+        final BytesRefIterator iterator = new PagedBytesReference(overflow, position - buffer.length).iterator();
+        BytesRef bytesRef;
+        try {
+            while ((bytesRef = iterator.next()) != null) {
+                assert copyPos + bytesRef.length <= position;
+                System.arraycopy(bytesRef.bytes, bytesRef.offset, newBuffer, copyPos, bytesRef.length);
+                copyPos += bytesRef.length;
+            }
+        } catch (IOException e) {
+            throw new AssertionError("impossible", e);
+        }
+
+        return new BytesRef(newBuffer, 0, position);
+    }
+
+    @Override
+    public BytesReference bytes() {
+        if (position <= buffer.length) {
+            assert overflow == null;
+            return new BytesArray(buffer, 0, position);
+        } else {
+            return CompositeBytesReference.of(
+                    new BytesArray(buffer, 0, buffer.length),
+                    new PagedBytesReference(overflow, position - buffer.length));
+        }
+    }
+}

+ 153 - 114
server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java

@@ -46,6 +46,7 @@ import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.SimpleFSDirectory;
 import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefIterator;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.ClusterState;
@@ -53,14 +54,19 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.CheckedConsumer;
 import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
+import org.elasticsearch.common.bytes.PagedBytesReference;
+import org.elasticsearch.common.bytes.RecyclingBytesStreamOutput;
+import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.ByteArray;
+import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.ToXContent;
@@ -73,7 +79,6 @@ import org.elasticsearch.env.NodeMetadata;
 import org.elasticsearch.index.Index;
 
 import java.io.Closeable;
-import java.io.FilterOutputStream;
 import java.io.IOError;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -452,29 +457,6 @@ public class PersistedClusterStateService {
         FORMAT_PARAMS = new ToXContent.MapParams(params);
     }
 
-    /**
-     * A {@link Document} with a stored field containing serialized metadata written to a {@link ReleasableBytesStreamOutput} which must be
-     * released when no longer needed.
-     */
-    private static class ReleasableDocument implements Releasable {
-        private final Document document;
-        private final Releasable releasable;
-
-        ReleasableDocument(Document document, Releasable releasable) {
-            this.document = document;
-            this.releasable = releasable;
-        }
-
-        Document getDocument() {
-            return document;
-        }
-
-        @Override
-        public void close() {
-            releasable.close();
-        }
-    }
-
     /**
      * Encapsulates a single {@link IndexWriter} with its {@link Directory} for ease of closing, and a {@link Logger}. There is one of these
      * for each data path.
@@ -547,6 +529,10 @@ public class PersistedClusterStateService {
         boolean fullStateWritten = false;
         private final AtomicBoolean closed = new AtomicBoolean();
 
+        // The size of the document buffer that was used for the last write operation, used as a hint for allocating the buffer for the
+        // next one.
+        private int documentBufferUsed;
+
         private Writer(List<MetadataIndexWriter> metadataIndexWriters, String nodeId, BigArrays bigArrays,
                        LongSupplier relativeTimeMillisSupplier, Supplier<TimeValue> slowWriteLoggingThresholdSupplier) {
             this.metadataIndexWriters = metadataIndexWriters;
@@ -650,56 +636,60 @@ public class PersistedClusterStateService {
             logger.trace("currentTerm [{}] matches previous currentTerm, writing changes only",
                 metadata.coordinationMetadata().term());
 
-            final boolean updateGlobalMeta = Metadata.isGlobalStateEquals(previouslyWrittenMetadata, metadata) == false;
-            if (updateGlobalMeta) {
-                try (ReleasableDocument globalMetadataDocument = makeGlobalMetadataDocument(metadata)) {
+            try (DocumentBuffer documentBuffer = allocateBuffer()) {
+
+                final boolean updateGlobalMeta = Metadata.isGlobalStateEquals(previouslyWrittenMetadata, metadata) == false;
+                if (updateGlobalMeta) {
+                    final Document globalMetadataDocument = makeGlobalMetadataDocument(metadata, documentBuffer);
                     for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
-                        metadataIndexWriter.updateGlobalMetadata(globalMetadataDocument.getDocument());
+                        metadataIndexWriter.updateGlobalMetadata(globalMetadataDocument);
                     }
                 }
-            }
 
-            final Map<String, Long> indexMetadataVersionByUUID = new HashMap<>(previouslyWrittenMetadata.indices().size());
-            for (ObjectCursor<IndexMetadata> cursor : previouslyWrittenMetadata.indices().values()) {
-                final IndexMetadata indexMetadata = cursor.value;
-                final Long previousValue = indexMetadataVersionByUUID.putIfAbsent(indexMetadata.getIndexUUID(), indexMetadata.getVersion());
-                assert previousValue == null : indexMetadata.getIndexUUID() + " already mapped to " + previousValue;
-            }
+                final Map<String, Long> indexMetadataVersionByUUID = new HashMap<>(previouslyWrittenMetadata.indices().size());
+                for (ObjectCursor<IndexMetadata> cursor : previouslyWrittenMetadata.indices().values()) {
+                    final IndexMetadata indexMetadata = cursor.value;
+                    final Long previousValue
+                            = indexMetadataVersionByUUID.putIfAbsent(indexMetadata.getIndexUUID(), indexMetadata.getVersion());
+                    assert previousValue == null : indexMetadata.getIndexUUID() + " already mapped to " + previousValue;
+                }
 
-            int numIndicesUpdated = 0;
-            int numIndicesUnchanged = 0;
-            for (ObjectCursor<IndexMetadata> cursor : metadata.indices().values()) {
-                final IndexMetadata indexMetadata = cursor.value;
-                final Long previousVersion = indexMetadataVersionByUUID.get(indexMetadata.getIndexUUID());
-                if (previousVersion == null || indexMetadata.getVersion() != previousVersion) {
-                    logger.trace("updating metadata for [{}], changing version from [{}] to [{}]",
-                        indexMetadata.getIndex(), previousVersion, indexMetadata.getVersion());
-                    numIndicesUpdated++;
-                    try (ReleasableDocument indexMetadataDocument = makeIndexMetadataDocument(indexMetadata)) {
+                int numIndicesUpdated = 0;
+                int numIndicesUnchanged = 0;
+                for (ObjectCursor<IndexMetadata> cursor : metadata.indices().values()) {
+                    final IndexMetadata indexMetadata = cursor.value;
+                    final Long previousVersion = indexMetadataVersionByUUID.get(indexMetadata.getIndexUUID());
+                    if (previousVersion == null || indexMetadata.getVersion() != previousVersion) {
+                        logger.trace("updating metadata for [{}], changing version from [{}] to [{}]",
+                                indexMetadata.getIndex(), previousVersion, indexMetadata.getVersion());
+                        numIndicesUpdated++;
+                        final Document indexMetadataDocument = makeIndexMetadataDocument(indexMetadata, documentBuffer);
                         for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
-                            metadataIndexWriter.updateIndexMetadataDocument(indexMetadataDocument.getDocument(), indexMetadata.getIndex());
+                            metadataIndexWriter.updateIndexMetadataDocument(indexMetadataDocument, indexMetadata.getIndex());
                         }
+                    } else {
+                        numIndicesUnchanged++;
+                        logger.trace("no action required for [{}]", indexMetadata.getIndex());
                     }
-                } else {
-                    numIndicesUnchanged++;
-                    logger.trace("no action required for [{}]", indexMetadata.getIndex());
+                    indexMetadataVersionByUUID.remove(indexMetadata.getIndexUUID());
                 }
-                indexMetadataVersionByUUID.remove(indexMetadata.getIndexUUID());
-            }
 
-            for (String removedIndexUUID : indexMetadataVersionByUUID.keySet()) {
+                documentBufferUsed = documentBuffer.getMaxUsed();
+
+                for (String removedIndexUUID : indexMetadataVersionByUUID.keySet()) {
+                    for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
+                        metadataIndexWriter.deleteIndexMetadata(removedIndexUUID);
+                    }
+                }
+
+                // Flush, to try and expose a failure (e.g. out of disk space) before committing, because we can handle a failure here more
+                // gracefully than one that occurs during the commit process.
                 for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
-                    metadataIndexWriter.deleteIndexMetadata(removedIndexUUID);
+                    metadataIndexWriter.flush();
                 }
-            }
 
-            // Flush, to try and expose a failure (e.g. out of disk space) before committing, because we can handle a failure here more
-            // gracefully than one that occurs during the commit process.
-            for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
-                metadataIndexWriter.flush();
+                return new WriterStats(updateGlobalMeta, numIndicesUpdated, numIndicesUnchanged);
             }
-
-            return new WriterStats(updateGlobalMeta, numIndicesUpdated, numIndicesUnchanged);
         }
 
         /**
@@ -716,28 +706,39 @@ public class PersistedClusterStateService {
          * Add documents for the metadata of the given cluster state, assuming that there are currently no documents.
          */
         private WriterStats addMetadata(Metadata metadata) throws IOException {
-            try (ReleasableDocument globalMetadataDocument = makeGlobalMetadataDocument(metadata)) {
+            try (DocumentBuffer documentBuffer = allocateBuffer()) {
+
+                final Document globalMetadataDocument = makeGlobalMetadataDocument(metadata, documentBuffer);
                 for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
-                    metadataIndexWriter.updateGlobalMetadata(globalMetadataDocument.getDocument());
+                    metadataIndexWriter.updateGlobalMetadata(globalMetadataDocument);
                 }
-            }
 
-            for (ObjectCursor<IndexMetadata> cursor : metadata.indices().values()) {
-                final IndexMetadata indexMetadata = cursor.value;
-                try (ReleasableDocument indexMetadataDocument = makeIndexMetadataDocument(indexMetadata)) {
+                for (ObjectCursor<IndexMetadata> cursor : metadata.indices().values()) {
+                    final IndexMetadata indexMetadata = cursor.value;
+                    final Document indexMetadataDocument = makeIndexMetadataDocument(indexMetadata, documentBuffer);
                     for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
-                        metadataIndexWriter.updateIndexMetadataDocument(indexMetadataDocument.getDocument(), indexMetadata.getIndex());
+                        metadataIndexWriter.updateIndexMetadataDocument(indexMetadataDocument, indexMetadata.getIndex());
                     }
                 }
-            }
 
-            // Flush, to try and expose a failure (e.g. out of disk space) before committing, because we can handle a failure here more
-            // gracefully than one that occurs during the commit process.
-            for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
-                metadataIndexWriter.flush();
+                documentBufferUsed = documentBuffer.getMaxUsed();
+
+                // Flush, to try and expose a failure (e.g. out of disk space) before committing, because we can handle a failure here more
+                // gracefully than one that occurs during the commit process.
+                for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
+                    metadataIndexWriter.flush();
+                }
+
+                return new WriterStats(true, metadata.indices().size(), 0);
             }
+        }
 
-            return new WriterStats(true, metadata.indices().size(), 0);
+        private DocumentBuffer allocateBuffer() {
+            // heuristics for picking the initial buffer size based on the buffer we needed last time: try and fit within a single page,
+            // but if we needed more than a single page last time then allow a bit more space to try and avoid needing to grow the buffer
+            // later on.
+            final int extraSpace = documentBufferUsed <= PageCacheRecycler.PAGE_SIZE_IN_BYTES ? 0 : PageCacheRecycler.PAGE_SIZE_IN_BYTES;
+            return new DocumentBuffer(documentBufferUsed + extraSpace, bigArrays);
         }
 
         public void writeIncrementalTermUpdateAndCommit(long currentTerm, long lastAcceptedVersion) throws IOException {
@@ -802,59 +803,97 @@ public class PersistedClusterStateService {
             }
         }
 
-        private ReleasableDocument makeIndexMetadataDocument(IndexMetadata indexMetadata) throws IOException {
-            final ReleasableDocument indexMetadataDocument = makeDocument(INDEX_TYPE_NAME, indexMetadata);
-            boolean success = false;
-            try {
-                final String indexUUID = indexMetadata.getIndexUUID();
-                assert indexUUID.equals(IndexMetadata.INDEX_UUID_NA_VALUE) == false;
-                indexMetadataDocument.getDocument().add(new StringField(INDEX_UUID_FIELD_NAME, indexUUID, Field.Store.NO));
-                success = true;
-                return indexMetadataDocument;
-            } finally {
-                if (success == false) {
-                    IOUtils.closeWhileHandlingException(indexMetadataDocument);
-                }
-            }
+        private Document makeIndexMetadataDocument(IndexMetadata indexMetadata, DocumentBuffer documentBuffer) throws IOException {
+            final Document indexMetadataDocument = makeDocument(INDEX_TYPE_NAME, indexMetadata, documentBuffer);
+            final String indexUUID = indexMetadata.getIndexUUID();
+            assert indexUUID.equals(IndexMetadata.INDEX_UUID_NA_VALUE) == false;
+            indexMetadataDocument.add(new StringField(INDEX_UUID_FIELD_NAME, indexUUID, Field.Store.NO));
+            return indexMetadataDocument;
         }
 
-        private ReleasableDocument makeGlobalMetadataDocument(Metadata metadata) throws IOException {
-            return makeDocument(GLOBAL_TYPE_NAME, metadata);
+        private Document makeGlobalMetadataDocument(Metadata metadata, DocumentBuffer documentBuffer) throws IOException {
+            return makeDocument(GLOBAL_TYPE_NAME, metadata, documentBuffer);
         }
 
-        private ReleasableDocument makeDocument(String typeName, ToXContent metadata) throws IOException {
+        private Document makeDocument(String typeName, ToXContent metadata, DocumentBuffer documentBuffer) throws IOException {
             final Document document = new Document();
             document.add(new StringField(TYPE_FIELD_NAME, typeName, Field.Store.NO));
 
-            boolean success = false;
-            final ReleasableBytesStreamOutput releasableBytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays);
-            try {
-                final FilterOutputStream outputStream = new FilterOutputStream(releasableBytesStreamOutput) {
-
-                    @Override
-                    public void write(byte[] b, int off, int len) throws IOException {
-                        out.write(b, off, len);
-                    }
-
-                    @Override
-                    public void close() {
-                        // closing the XContentBuilder should not release the bytes yet
-                    }
-                };
-                try (XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.SMILE, outputStream)) {
+            try (RecyclingBytesStreamOutput streamOutput = documentBuffer.streamOutput()) {
+                try (XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.SMILE,
+                        Streams.flushOnCloseStream(streamOutput))) {
                     xContentBuilder.startObject();
                     metadata.toXContent(xContentBuilder, FORMAT_PARAMS);
                     xContentBuilder.endObject();
                 }
-                document.add(new StoredField(DATA_FIELD_NAME, releasableBytesStreamOutput.bytes().toBytesRef()));
-                final ReleasableDocument releasableDocument = new ReleasableDocument(document, releasableBytesStreamOutput);
-                success = true;
-                return releasableDocument;
-            } finally {
-                if (success == false) {
-                    IOUtils.closeWhileHandlingException(releasableBytesStreamOutput);
+                document.add(new StoredField(DATA_FIELD_NAME, streamOutput.toBytesRef()));
+            }
+
+            return document;
+        }
+    }
+
+    /**
+     * Holds the current buffer, keeping track of new allocations as it grows.
+     */
+    private static class DocumentBuffer implements Releasable {
+        private final BigArrays bigArrays;
+
+        @Nullable // if the initial page doesn't need releasing
+        private final Releasable releasable;
+        private byte[] buffer;
+        private int maxUsed;
+
+        DocumentBuffer(int size, BigArrays bigArrays) {
+            if (size <= PageCacheRecycler.PAGE_SIZE_IN_BYTES) {
+                final ByteArray byteArray = bigArrays.newByteArray(PageCacheRecycler.PAGE_SIZE_IN_BYTES);
+                final BytesRefIterator iterator = new PagedBytesReference(byteArray, Math.toIntExact(byteArray.size())).iterator();
+                final BytesRef firstPage;
+                try {
+                    firstPage = iterator.next();
+                    assert iterator.next() == null : "should be one page";
+                } catch (IOException e) {
+                    throw new AssertionError("impossible", e);
                 }
+
+                // we require that we have the whole page to ourselves
+                assert firstPage.offset == 0 : firstPage.offset;
+                assert firstPage.bytes.length == PageCacheRecycler.PAGE_SIZE_IN_BYTES : firstPage.bytes.length;
+                buffer = firstPage.bytes;
+                releasable = byteArray;
+            } else {
+                buffer = new byte[size];
+                releasable = null;
             }
+            this.bigArrays = bigArrays;
+            maxUsed = 0;
+        }
+
+        RecyclingBytesStreamOutput streamOutput() {
+            return new RecyclingBytesStreamOutput(buffer, bigArrays) {
+                @Override
+                public BytesRef toBytesRef() {
+                    final BytesRef bytesRef = super.toBytesRef();
+                    maxUsed = Math.max(maxUsed, bytesRef.length);
+                    if (buffer != bytesRef.bytes) {
+                        assert bytesRef.length > buffer.length;
+                        logger.trace("growing document buffer from [{}] to [{}]", buffer.length, maxUsed);
+                        buffer = bytesRef.bytes;
+                    }
+                    assert maxUsed <= buffer.length;
+                    return bytesRef;
+                }
+            };
+        }
+
+        int getMaxUsed() {
+            return maxUsed;
+        }
+
+        @Override
+        public void close() {
+            Releasables.close(releasable);
         }
     }
 }
+

+ 80 - 0
server/src/test/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutputTests.java

@@ -0,0 +1,80 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.bytes;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.MockBigArrays;
+import org.elasticsearch.common.util.MockPageCacheRecycler;
+import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.sameInstance;
+
+public class RecyclingBytesStreamOutputTests extends ESTestCase {
+
+    public void testReturnsWrittenBytesAndRecyclesBufferIfPossible() throws IOException {
+
+        final byte[] source = randomUnicodeOfLength(scaledRandomIntBetween(0, 20000)).getBytes(StandardCharsets.UTF_8);
+        final byte[] buffer = new byte[scaledRandomIntBetween(0, 20000)];
+
+        final MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
+        try (RecyclingBytesStreamOutput output = new RecyclingBytesStreamOutput(buffer, bigArrays)) {
+            int position = 0;
+            while (position < source.length) {
+                if (randomBoolean()) {
+                    output.writeByte(source[position++]);
+                } else {
+                    final int length = randomIntBetween(1, source.length - position);
+                    final int sliceStart = randomIntBetween(0, position);
+                    final int sliceEnd = randomIntBetween(position + length, source.length);
+                    final byte[] slice = new byte[sliceEnd - sliceStart];
+                    System.arraycopy(source, sliceStart, slice, 0, slice.length);
+                    output.writeBytes(slice, position - sliceStart, length);
+                    position += length;
+                }
+            }
+
+            final BytesRef bytesRef;
+
+            if (randomBoolean()) {
+                bytesRef = output.toBytesRef();
+                assertThat(bytesRef.offset, equalTo(0));
+
+                if (source.length <= buffer.length) {
+                    assertThat("should have re-used the same buffer", bytesRef.bytes, sameInstance(buffer));
+                } else {
+                    assertThat("new buffer should be the right size", bytesRef.bytes.length, equalTo(source.length));
+                }
+            } else {
+                bytesRef = output.bytes().toBytesRef();
+            }
+
+            assertThat(bytesRef.length, equalTo(source.length));
+            final byte[] trimmed = new byte[source.length];
+            System.arraycopy(bytesRef.bytes, bytesRef.offset, trimmed, 0, bytesRef.length);
+            assertArrayEquals(source, trimmed);
+        }
+    }
+}

+ 18 - 7
server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java

@@ -38,11 +38,14 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.MockBigArrays;
+import org.elasticsearch.common.util.MockPageCacheRecycler;
 import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.env.TestEnvironment;
+import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.TestThreadPool;
@@ -71,9 +74,11 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
     private ClusterName clusterName;
     private Settings settings;
     private DiscoveryNode localNode;
+    private BigArrays bigArrays;
 
     @Override
     public void setUp() throws Exception {
+        bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
         nodeEnvironment = newNodeEnvironment();
         localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Collections.emptyMap(),
             Sets.newHashSet(DiscoveryNodeRole.MASTER_ROLE), Version.CURRENT);
@@ -89,7 +94,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
     }
 
     private CoordinationState.PersistedState newGatewayPersistedState() {
-        final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode);
+        final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode, bigArrays);
         gateway.start(settings, nodeEnvironment, xContentRegistry());
         final CoordinationState.PersistedState persistedState = gateway.getPersistedState();
         assertThat(persistedState, instanceOf(GatewayMetaState.LucenePersistedState.class));
@@ -298,7 +303,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
     public void testStatePersistedOnLoad() throws IOException {
         // open LucenePersistedState to make sure that cluster state is written out to each data path
         final PersistedClusterStateService persistedClusterStateService =
-            new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
+            new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
                 new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
         final ClusterState state = createClusterState(randomNonNegativeLong(),
             Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build());
@@ -316,7 +321,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
                 .put(Environment.PATH_DATA_SETTING.getKey(), path.toString()).build();
             try (NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) {
                 final PersistedClusterStateService newPersistedClusterStateService =
-                    new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
+                    new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
                         new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
                 final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState();
                 assertFalse(onDiskState.empty());
@@ -340,7 +345,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
                 .put(nonMasterNode())
                 .put(Node.NODE_NAME_SETTING.getKey(), "test")
                 .build();
-            final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode);
+            final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode, bigArrays);
             cleanup.add(gateway);
             final TransportService transportService = mock(TransportService.class);
             TestThreadPool threadPool = new TestThreadPool("testMarkAcceptedConfigAsCommittedOnDataOnlyNode");
@@ -350,7 +355,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
             when(clusterService.getClusterSettings()).thenReturn(
                 new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
             final PersistedClusterStateService persistedClusterStateService =
-                new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
+                new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
                     new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
             gateway.start(settings, transportService, clusterService,
                 new MetaStateService(nodeEnvironment, xContentRegistry()), null, null, persistedClusterStateService);
@@ -437,7 +442,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
         final AtomicReference<Double> ioExceptionRate = new AtomicReference<>(0.01d);
         final List<MockDirectoryWrapper> list = new ArrayList<>();
         final PersistedClusterStateService persistedClusterStateService =
-            new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
+            new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
                 new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) {
                 @Override
                 Directory createDirectory(Path path) {
@@ -514,7 +519,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
                 .put(Environment.PATH_DATA_SETTING.getKey(), path.toString()).build();
             try (NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) {
                 final PersistedClusterStateService newPersistedClusterStateService =
-                    new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
+                    new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
                         new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
                 final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState();
                 assertFalse(onDiskState.empty());
@@ -527,4 +532,10 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
         }
     }
 
+    private static BigArrays getBigArrays() {
+        return usually()
+                ? BigArrays.NON_RECYCLING_INSTANCE
+                : new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
+    }
+
 }

+ 10 - 13
server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java

@@ -77,10 +77,7 @@ import static org.hamcrest.Matchers.nullValue;
 public class PersistedClusterStateServiceTests extends ESTestCase {
 
     private PersistedClusterStateService newPersistedClusterStateService(NodeEnvironment nodeEnvironment) {
-        return new PersistedClusterStateService(nodeEnvironment, xContentRegistry(),
-            usually()
-                ? BigArrays.NON_RECYCLING_INSTANCE
-                : new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()),
+        return new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
             new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
             () -> 0L);
     }
@@ -356,7 +353,7 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
 
         try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) {
             final PersistedClusterStateService persistedClusterStateService
-                = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
+                = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
                 new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) {
                 @Override
                 Directory createDirectory(Path path) throws IOException {
@@ -394,7 +391,7 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
 
         try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) {
             final PersistedClusterStateService persistedClusterStateService
-                = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
+                = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
                 new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) {
                 @Override
                 Directory createDirectory(Path path) throws IOException {
@@ -440,7 +437,7 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
 
         try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) {
             final PersistedClusterStateService persistedClusterStateService
-                = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
+                = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
                 new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) {
                 @Override
                 Directory createDirectory(Path path) throws IOException {
@@ -798,12 +795,7 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
         final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
         try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) {
             PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService(nodeEnvironment,
-                xContentRegistry(),
-                usually()
-                    ? BigArrays.NON_RECYCLING_INSTANCE
-                    : new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()),
-                clusterSettings,
-                () -> currentTime.getAndAdd(writeDurationMillis.get()));
+                    xContentRegistry(), getBigArrays(), clusterSettings, () -> currentTime.getAndAdd(writeDurationMillis.get()));
 
             try (Writer writer = persistedClusterStateService.createWriter()) {
                 assertExpectedLogs(1L, null, clusterState, writer, new MockLogAppender.SeenEventExpectation(
@@ -921,5 +913,10 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
         return ClusterState.builder(ClusterName.DEFAULT).version(version).metadata(metadata).build();
     }
 
+    private static BigArrays getBigArrays() {
+        return usually()
+                ? BigArrays.NON_RECYCLING_INSTANCE
+                : new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
+    }
 
 }

+ 11 - 4
test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

@@ -57,6 +57,8 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.MockBigArrays;
+import org.elasticsearch.common.util.MockPageCacheRecycler;
 import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
 import org.elasticsearch.discovery.DiscoveryModule;
 import org.elasticsearch.discovery.SeedHostsProvider;
@@ -65,6 +67,7 @@ import org.elasticsearch.gateway.ClusterStateUpdaters;
 import org.elasticsearch.gateway.GatewayService;
 import org.elasticsearch.gateway.MockGatewayMetaState;
 import org.elasticsearch.gateway.PersistedClusterStateService;
+import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.monitor.NodeHealthService;
 import org.elasticsearch.monitor.StatusInfo;
 import org.elasticsearch.test.ESTestCase;
@@ -257,7 +260,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
         private final Map<Long, ClusterState> committedStatesByVersion = new HashMap<>();
         private final LinearizabilityChecker linearizabilityChecker = new LinearizabilityChecker();
         private final History history = new History();
-        private NodeHealthService nodeHealthService;
+        private final BigArrays bigArrays;
+        private final NodeHealthService nodeHealthService;
 
         private final Function<DiscoveryNode, MockPersistedState> defaultPersistedStateSupplier = MockPersistedState::new;
 
@@ -274,6 +278,9 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
 
         Cluster(int initialNodeCount, boolean allNodesMasterEligible, Settings nodeSettings, NodeHealthService nodeHealthService) {
             this.nodeHealthService = nodeHealthService;
+            bigArrays = usually()
+                    ? BigArrays.NON_RECYCLING_INSTANCE
+                    : new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
             deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
 
             assertThat(initialNodeCount, greaterThan(0));
@@ -738,7 +745,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                     if (rarely()) {
                         nodeEnvironment = newNodeEnvironment();
                         nodeEnvironments.add(nodeEnvironment);
-                        final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(localNode);
+                        final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(localNode, bigArrays);
                         gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry());
                         delegate = gatewayMetaState.getPersistedState();
                     } else {
@@ -762,7 +769,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                         final long updatedTerm = adaptCurrentTerm.apply(oldState.getCurrentTerm());
                         if (updatedMetadata != oldState.getLastAcceptedState().metadata() || updatedTerm != oldState.getCurrentTerm()) {
                             try (PersistedClusterStateService.Writer writer =
-                                     new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
+                                     new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), bigArrays,
                                          new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
                                          deterministicTaskQueue::getCurrentTimeMillis)
                                          .createWriter()) {
@@ -770,7 +777,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                                     ClusterState.builder(oldState.getLastAcceptedState()).metadata(updatedMetadata).build());
                             }
                         }
-                        final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(newLocalNode);
+                        final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(newLocalNode, bigArrays);
                         gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry());
                         delegate = gatewayMetaState.getPersistedState();
                     } else {

+ 4 - 2
test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java

@@ -48,9 +48,11 @@ import static org.mockito.Mockito.when;
  */
 public class MockGatewayMetaState extends GatewayMetaState {
     private final DiscoveryNode localNode;
+    private final BigArrays bigArrays;
 
-    public MockGatewayMetaState(DiscoveryNode localNode) {
+    public MockGatewayMetaState(DiscoveryNode localNode, BigArrays bigArrays) {
         this.localNode = localNode;
+        this.bigArrays = bigArrays;
     }
 
     @Override
@@ -80,7 +82,7 @@ public class MockGatewayMetaState extends GatewayMetaState {
             throw new AssertionError(e);
         }
         start(settings, transportService, clusterService, metaStateService,
-            null, null, new PersistedClusterStateService(nodeEnvironment, xContentRegistry, BigArrays.NON_RECYCLING_INSTANCE,
+            null, null, new PersistedClusterStateService(nodeEnvironment, xContentRegistry, bigArrays,
                 new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L));
     }
 }