浏览代码

update to the new lzf compression (0.7)

kimchy 14 年之前
父节点
当前提交
ae0eed937b
共有 26 个文件被更改,包括 767 次插入450 次删除
  1. 2 1
      modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetResponse.java
  2. 1 1
      modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/CompressedString.java
  3. 159 0
      modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/BufferRecycler.java
  4. 116 79
      modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/ChunkEncoder.java
  5. 5 19
      modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZF.java
  6. 28 41
      modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFChunk.java
  7. 51 52
      modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFDecoder.java
  8. 9 84
      modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java
  9. 75 29
      modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFInputStream.java
  10. 62 50
      modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFOutputStream.java
  11. 27 0
      modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/package-info.java
  12. 1 1
      modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamInput.java
  13. 1 1
      modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java
  14. 74 33
      modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamInput.java
  15. 78 39
      modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java
  16. 2 2
      modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java
  17. 3 3
      modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java
  18. 4 3
      modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/SourceFieldMapper.java
  19. 2 2
      modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapper.java
  20. 3 0
      modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java
  21. 2 2
      modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/support/RestXContentBuilder.java
  22. 2 1
      modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java
  23. 2 2
      modules/elasticsearch/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java
  24. 5 5
      modules/elasticsearch/src/test/java/org/elasticsearch/index/mapper/xcontent/source/CompressSourceMappingTests.java
  25. 49 0
      modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java
  26. 4 0
      modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java

+ 2 - 1
modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetResponse.java

@@ -23,6 +23,7 @@ import org.elasticsearch.ElasticSearchParseException;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.common.Unicode;
 import org.elasticsearch.common.collect.ImmutableMap;
+import org.elasticsearch.common.compress.lzf.LZF;
 import org.elasticsearch.common.compress.lzf.LZFDecoder;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -156,7 +157,7 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
         if (source == null) {
             return null;
         }
-        if (LZFDecoder.isCompressed(source)) {
+        if (LZF.isCompressed(source)) {
             try {
                 this.source = LZFDecoder.decode(source);
             } catch (IOException e) {

+ 1 - 1
modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/CompressedString.java

@@ -42,7 +42,7 @@ public class CompressedString implements Streamable {
 
     public CompressedString(String str) throws IOException {
         UnicodeUtil.UTF8Result result = Unicode.unsafeFromStringAsUtf8(str);
-        this.bytes = LZFEncoder.encodeWithCache(result.result, result.length);
+        this.bytes = LZFEncoder.encode(result.result, result.length);
     }
 
     public byte[] compressed() {

+ 159 - 0
modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/BufferRecycler.java

@@ -0,0 +1,159 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.compress.lzf;
+
+import java.lang.ref.SoftReference;
+
+/**
+ * Simple helper class to encapsulate details of basic buffer
+ * recycling scheme, which helps a lot (as per profiling) for
+ * smaller encoding cases.
+ *
+ * @author tatu
+ */
+public class BufferRecycler {
+    private final static int MIN_ENCODING_BUFFER = 4000;
+
+    private final static int MIN_OUTPUT_BUFFER = 8000;
+
+    /**
+     * This <code>ThreadLocal</code> contains a {@link java.lang.ref.SoftRerefence}
+     * to a {@link BufferRecycler} used to provide a low-cost
+     * buffer recycling for buffers we need for encoding, decoding.
+     */
+    final protected static ThreadLocal<SoftReference<BufferRecycler>> _recyclerRef
+            = new ThreadLocal<SoftReference<BufferRecycler>>();
+
+
+    private byte[] _inputBuffer;
+    private byte[] _outputBuffer;
+
+    private byte[] _decodingBuffer;
+    private byte[] _encodingBuffer;
+
+    private int[] _encodingHash;
+
+
+    /**
+     * Accessor to get thread-local recycler instance
+     */
+    public static BufferRecycler instance() {
+        SoftReference<BufferRecycler> ref = _recyclerRef.get();
+        BufferRecycler br = (ref == null) ? null : ref.get();
+        if (br == null) {
+            br = new BufferRecycler();
+            _recyclerRef.set(new SoftReference<BufferRecycler>(br));
+        }
+        return br;
+    }
+
+    /*
+    ///////////////////////////////////////////////////////////////////////
+    // Buffers for encoding (output)
+    ///////////////////////////////////////////////////////////////////////
+     */
+
+    public byte[] allocEncodingBuffer(int minSize) {
+        byte[] buf = _encodingBuffer;
+        if (buf == null || buf.length < minSize) {
+            buf = new byte[Math.max(minSize, MIN_ENCODING_BUFFER)];
+        } else {
+            _encodingBuffer = null;
+        }
+        return buf;
+    }
+
+    public void releaseEncodeBuffer(byte[] buffer) {
+        if (_encodingBuffer == null || buffer.length > _encodingBuffer.length) {
+            _encodingBuffer = buffer;
+        }
+    }
+
+    public byte[] allocOutputBuffer(int minSize) {
+        byte[] buf = _outputBuffer;
+        if (buf == null || buf.length < minSize) {
+            buf = new byte[Math.max(minSize, MIN_OUTPUT_BUFFER)];
+        } else {
+            _outputBuffer = null;
+        }
+        return buf;
+    }
+
+    public void releaseOutputBuffer(byte[] buffer) {
+        if (_outputBuffer == null || buffer.length > _outputBuffer.length) {
+            _outputBuffer = buffer;
+        }
+    }
+
+    public int[] allocEncodingHash(int suggestedSize) {
+        int[] buf = _encodingHash;
+        if (buf == null || buf.length < suggestedSize) {
+            buf = new int[suggestedSize];
+        } else {
+            _encodingHash = null;
+        }
+        return buf;
+    }
+
+    public void releaseEncodingHash(int[] buffer) {
+        if (_encodingHash == null || buffer.length > _encodingHash.length) {
+            _encodingHash = buffer;
+        }
+    }
+
+    /*
+    ///////////////////////////////////////////////////////////////////////
+    // Buffers for decoding (input)
+    ///////////////////////////////////////////////////////////////////////
+     */
+
+    public byte[] allocInputBuffer(int minSize) {
+        byte[] buf = _inputBuffer;
+        if (buf == null || buf.length < minSize) {
+            buf = new byte[Math.max(minSize, MIN_OUTPUT_BUFFER)];
+        } else {
+            _inputBuffer = null;
+        }
+        return buf;
+    }
+
+    public void releaseInputBuffer(byte[] buffer) {
+        if (_inputBuffer == null || buffer.length > _inputBuffer.length) {
+            _inputBuffer = buffer;
+        }
+    }
+
+    public byte[] allocDecodeBuffer(int size) {
+        byte[] buf = _decodingBuffer;
+        if (buf == null || buf.length < size) {
+            buf = new byte[size];
+        } else {
+            _decodingBuffer = null;
+        }
+        return buf;
+    }
+
+    public void releaseDecodeBuffer(byte[] buffer) {
+        if (_decodingBuffer == null || buffer.length > _decodingBuffer.length) {
+            _decodingBuffer = buffer;
+        }
+    }
+
+}

+ 116 - 79
modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/ChunkEncoder.java

@@ -1,22 +1,3 @@
-/*
- * Licensed to Elastic Search and Shay Banon under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Elastic Search licenses this
- * file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
 /* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
  * file except in compliance with the License. You may obtain a copy of the License at
  *
@@ -53,16 +34,23 @@ public class ChunkEncoder {
     private static final int MAX_OFF = 1 << 13; // 8k
     private static final int MAX_REF = (1 << 8) + (1 << 3); // 264
 
-    // // Encoding tables
+    // // Encoding tables etc
+
+    private final BufferRecycler _recycler;
+
+    private int[] _hashTable;
+
+    private final int _hashModulo;
 
     /**
      * Buffer in which encoded content is stored during processing
      */
-    private final byte[] _encodeBuffer;
-
-    private final int[] _hashTable;
+    private byte[] _encodeBuffer;
 
-    private final int _hashModulo;
+    /**
+     * Small buffer passed to LZFChunk, needed for writing chunk header
+     */
+    private byte[] _headerBuffer;
 
     /**
      * @param totalLength Total encoded length; used for calculating size
@@ -71,49 +59,87 @@ public class ChunkEncoder {
     public ChunkEncoder(int totalLength) {
         int largestChunkLen = Math.max(totalLength, LZFChunk.MAX_CHUNK_LEN);
 
-        int hashLen = calcHashLen(largestChunkLen);
-        _hashTable = new int[hashLen];
-        _hashModulo = hashLen - 1;
+        int suggestedHashLen = calcHashLen(largestChunkLen);
+        _recycler = BufferRecycler.instance();
+        _hashTable = _recycler.allocEncodingHash(suggestedHashLen);
+        _hashModulo = _hashTable.length - 1;
         // Ok, then, what's the worst case output buffer length?
         // length indicator for each 32 literals, so:
         int bufferLen = largestChunkLen + ((largestChunkLen + 31) >> 5);
-        _encodeBuffer = new byte[bufferLen];
+        _encodeBuffer = _recycler.allocEncodingBuffer(bufferLen);
+    }
+
+    /*
+    ///////////////////////////////////////////////////////////////////////
+    // Public API
+    ///////////////////////////////////////////////////////////////////////
+     */
+
+    /**
+     * Method to close once encoder is no longer in use. Note: after calling
+     * this method, further calls to {@link #_encodeChunk} will fail
+     */
+    public void close() {
+        byte[] buf = _encodeBuffer;
+        if (buf != null) {
+            _encodeBuffer = null;
+            _recycler.releaseEncodeBuffer(buf);
+        }
+        int[] ibuf = _hashTable;
+        if (ibuf != null) {
+            _hashTable = null;
+            _recycler.releaseEncodingHash(ibuf);
+        }
     }
 
     /**
      * Method for compressing (or not) individual chunks
      */
-    public int encodeChunk(OutputStream os, byte[] data, int offset, int len) throws IOException {
+    public LZFChunk encodeChunk(byte[] data, int offset, int len) {
         if (len >= MIN_BLOCK_TO_COMPRESS) {
             /* If we have non-trivial block, and can compress it by at least
              * 2 bytes (since header is 2 bytes longer), let's compress:
              */
             int compLen = tryCompress(data, offset, offset + len, _encodeBuffer, 0);
             if (compLen < (len - 2)) { // nah; just return uncompressed
-                return LZFChunk.createCompressed(os, len, _encodeBuffer, 0, compLen);
+                return LZFChunk.createCompressed(len, _encodeBuffer, 0, compLen);
             }
         }
         // Otherwise leave uncompressed:
-        return LZFChunk.createNonCompressed(os, data, offset, len);
+        return LZFChunk.createNonCompressed(data, offset, len);
     }
 
     /**
-     * Method for compressing (or not) individual chunks
+     * Method for encoding individual chunk, writing it to given output stream.
      */
-    public LZFChunk encodeChunk(byte[] data, int offset, int len) {
+    public void encodeAndWriteChunk(byte[] data, int offset, int len, OutputStream out)
+            throws IOException {
+        byte[] headerBuf = _headerBuffer;
+        if (headerBuf == null) {
+            _headerBuffer = headerBuf = new byte[LZFChunk.MAX_HEADER_LEN];
+        }
         if (len >= MIN_BLOCK_TO_COMPRESS) {
             /* If we have non-trivial block, and can compress it by at least
              * 2 bytes (since header is 2 bytes longer), let's compress:
              */
             int compLen = tryCompress(data, offset, offset + len, _encodeBuffer, 0);
             if (compLen < (len - 2)) { // nah; just return uncompressed
-                return LZFChunk.createCompressed(len, _encodeBuffer, 0, compLen);
+                LZFChunk.writeCompressedHeader(len, compLen, out, headerBuf);
+                out.write(_encodeBuffer, 0, compLen);
+                return;
             }
         }
         // Otherwise leave uncompressed:
-        return LZFChunk.createNonCompressed(data, offset, len);
+        LZFChunk.writeNonCompressedHeader(len, out, headerBuf);
+        out.write(data, offset, len);
     }
 
+    /*
+    ///////////////////////////////////////////////////////////////////////
+    // Internal methods
+    ///////////////////////////////////////////////////////////////////////
+     */
+
     private static int calcHashLen(int chunkSize) {
         // in general try get hash table size of 2x input size
         chunkSize += chunkSize;
@@ -133,12 +159,13 @@ public class ChunkEncoder {
         return (in[inPos] << 8) + (in[inPos + 1] & 255);
     }
 
+    /*
     private static int next(int v, byte[] in, int inPos) {
         return (v << 8) + (in[inPos + 2] & 255);
     }
+*/
 
-
-    private int hash(int h) {
+    private final int hash(int h) {
         // or 184117; but this seems to give better hashing?
         return ((h * 57321) >> 9) & _hashModulo;
         // original lzf-c.c used this:
@@ -147,65 +174,75 @@ public class ChunkEncoder {
     }
 
     private int tryCompress(byte[] in, int inPos, int inEnd, byte[] out, int outPos) {
-        int literals = 0;
-        outPos++;
+        final int[] hashTable = _hashTable;
+        ++outPos;
         int hash = first(in, 0);
+        int literals = 0;
         inEnd -= 4;
         final int firstPos = inPos; // so that we won't have back references across block boundary
+
         while (inPos < inEnd) {
             byte p2 = in[inPos + 2];
             // next
             hash = (hash << 8) + (p2 & 255);
             int off = hash(hash);
-            int ref = _hashTable[off];
-            _hashTable[off] = inPos;
-            if (ref < inPos
-                    && ref >= firstPos
-                    && (off = inPos - ref - 1) < MAX_OFF
-                    && in[ref + 2] == p2
-                    && in[ref + 1] == (byte) (hash >> 8)
-                    && in[ref] == (byte) (hash >> 16)) {
-                // match
-                int maxLen = inEnd - inPos + 2;
-                if (maxLen > MAX_REF) {
-                    maxLen = MAX_REF;
-                }
-                if (literals == 0) {
-                    outPos--;
-                } else {
-                    out[outPos - literals - 1] = (byte) (literals - 1);
-                    literals = 0;
-                }
-                int len = 3;
-                while (len < maxLen && in[ref + len] == in[inPos + len]) {
-                    len++;
-                }
-                len -= 2;
-                if (len < 7) {
-                    out[outPos++] = (byte) ((off >> 8) + (len << 5));
-                } else {
-                    out[outPos++] = (byte) ((off >> 8) + (7 << 5));
-                    out[outPos++] = (byte) (len - 7);
-                }
-                out[outPos++] = (byte) off;
-                outPos++;
-                inPos += len;
-                hash = first(in, inPos);
-                hash = next(hash, in, inPos);
-                _hashTable[hash(hash)] = inPos++;
-                hash = next(hash, in, inPos);
-                _hashTable[hash(hash)] = inPos++;
-            } else {
+            int ref = hashTable[off];
+            hashTable[off] = inPos;
+
+            // First expected common case: no back-ref (for whatever reason)
+            if (ref >= inPos // can't refer forward (i.e. leftovers)
+                    || ref < firstPos // or to previous block
+                    || (off = inPos - ref - 1) >= MAX_OFF
+                    || in[ref + 2] != p2 // must match hash
+                    || in[ref + 1] != (byte) (hash >> 8)
+                    || in[ref] != (byte) (hash >> 16)) {
                 out[outPos++] = in[inPos++];
                 literals++;
                 if (literals == LZFChunk.MAX_LITERAL) {
-                    out[outPos - literals - 1] = (byte) (literals - 1);
+                    out[outPos - 33] = (byte) 31; // <= out[outPos - literals - 1] = MAX_LITERAL_MINUS_1;
                     literals = 0;
                     outPos++;
                 }
+                continue;
+            }
+            // match
+            int maxLen = inEnd - inPos + 2;
+            if (maxLen > MAX_REF) {
+                maxLen = MAX_REF;
             }
+            if (literals == 0) {
+                outPos--;
+            } else {
+                out[outPos - literals - 1] = (byte) (literals - 1);
+                literals = 0;
+            }
+            int len = 3;
+            while (len < maxLen && in[ref + len] == in[inPos + len]) {
+                len++;
+            }
+            len -= 2;
+            if (len < 7) {
+                out[outPos++] = (byte) ((off >> 8) + (len << 5));
+            } else {
+                out[outPos++] = (byte) ((off >> 8) + (7 << 5));
+                out[outPos++] = (byte) (len - 7);
+            }
+            out[outPos++] = (byte) off;
+            outPos++;
+            inPos += len;
+            hash = first(in, inPos);
+            hash = (hash << 8) + (in[inPos + 2] & 255);
+            hashTable[hash(hash)] = inPos++;
+            hash = (hash << 8) + (in[inPos + 2] & 255); // hash = next(hash, in, inPos);
+            hashTable[hash(hash)] = inPos++;
         }
         inEnd += 4;
+        // try offlining the tail
+        return tryCompressTail(in, inPos, inEnd, out, outPos, literals);
+    }
+
+    private int tryCompressTail(byte[] in, int inPos, int inEnd, byte[] out, int outPos,
+                                int literals) {
         while (inPos < inEnd) {
             out[outPos++] = in[inPos++];
             literals++;

+ 5 - 19
modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZF.java

@@ -1,22 +1,3 @@
-/*
- * Licensed to Elastic Search and Shay Banon under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Elastic Search licenses this
- * file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
 /* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
  * file except in compliance with the License. You may obtain a copy of the License at
  *
@@ -43,6 +24,11 @@ import java.io.IOException;
  * @author tatu@ning.com
  */
 public class LZF {
+
+    public static boolean isCompressed(final byte[] buffer) {
+        return buffer.length >= 2 && buffer[0] == LZFChunk.BYTE_Z && buffer[1] == LZFChunk.BYTE_V;
+    }
+
     public final static String SUFFIX = ".lzf";
 
     void process(String[] args) throws IOException {

+ 28 - 41
modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFChunk.java

@@ -1,22 +1,3 @@
-/*
- * Licensed to Elastic Search and Shay Banon under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Elastic Search licenses this
- * file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
 /* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
  * file except in compliance with the License. You may obtain a copy of the License at
  *
@@ -48,6 +29,12 @@ public class LZFChunk {
     // Chunk length is limited by 2-byte length indicator, to 64k
     public static final int MAX_CHUNK_LEN = 0xFFFF;
 
+    /**
+     * Header can be either 7 bytes (compressed) or 5 bytes (uncompressed)
+     * long
+     */
+    public static final int MAX_HEADER_LEN = 7;
+
     public final static byte BYTE_Z = 'Z';
     public final static byte BYTE_V = 'V';
 
@@ -55,25 +42,13 @@ public class LZFChunk {
     public final static int BLOCK_TYPE_COMPRESSED = 1;
 
 
-    final byte[] _data;
-    LZFChunk _next;
+    protected final byte[] _data;
+    protected LZFChunk _next;
 
     private LZFChunk(byte[] data) {
         _data = data;
     }
 
-    public static int createCompressed(OutputStream os, int origLen, byte[] encData, int encPtr, int encLen) throws IOException {
-        os.write(BYTE_Z);
-        os.write(BYTE_V);
-        os.write(BLOCK_TYPE_COMPRESSED);
-        os.write(encLen >> 8);
-        os.write(encLen);
-        os.write((origLen >> 8));
-        os.write(origLen);
-        os.write(encData, encPtr, encLen);
-        return encLen + 7;
-    }
-
     /**
      * Factory method for constructing compressed chunk
      */
@@ -90,14 +65,16 @@ public class LZFChunk {
         return new LZFChunk(result);
     }
 
-    public static int createNonCompressed(OutputStream os, byte[] plainData, int ptr, int len) throws IOException {
-        os.write(BYTE_Z);
-        os.write(BYTE_V);
-        os.write(BLOCK_TYPE_NON_COMPRESSED);
-        os.write(len >> 8);
-        os.write(len);
-        os.write(plainData, ptr, len);
-        return len + 5;
+    public static void writeCompressedHeader(int origLen, int encLen, OutputStream out, byte[] headerBuffer)
+            throws IOException {
+        headerBuffer[0] = BYTE_Z;
+        headerBuffer[1] = BYTE_V;
+        headerBuffer[2] = BLOCK_TYPE_COMPRESSED;
+        headerBuffer[3] = (byte) (encLen >> 8);
+        headerBuffer[4] = (byte) encLen;
+        headerBuffer[5] = (byte) (origLen >> 8);
+        headerBuffer[6] = (byte) origLen;
+        out.write(headerBuffer, 0, 7);
     }
 
     /**
@@ -114,6 +91,16 @@ public class LZFChunk {
         return new LZFChunk(result);
     }
 
+    public static void writeNonCompressedHeader(int len, OutputStream out, byte[] headerBuffer)
+            throws IOException {
+        headerBuffer[0] = BYTE_Z;
+        headerBuffer[1] = BYTE_V;
+        headerBuffer[2] = BLOCK_TYPE_NON_COMPRESSED;
+        headerBuffer[3] = (byte) (len >> 8);
+        headerBuffer[4] = (byte) len;
+        out.write(headerBuffer, 0, 5);
+    }
+
     public void setNext(LZFChunk next) {
         _next = next;
     }

+ 51 - 52
modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFDecoder.java

@@ -1,22 +1,3 @@
-/*
- * Licensed to Elastic Search and Shay Banon under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Elastic Search licenses this
- * file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
 /* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
  * file except in compliance with the License. You may obtain a copy of the License at
  *
@@ -44,14 +25,9 @@ public class LZFDecoder {
     private final static int HEADER_BYTES = 5;
 
     // static methods, no need to instantiate
-
     private LZFDecoder() {
     }
 
-    public static boolean isCompressed(final byte[] buffer) {
-        return buffer.length >= 2 && buffer[0] == LZFChunk.BYTE_Z && buffer[1] == LZFChunk.BYTE_V;
-    }
-
     public static byte[] decode(final byte[] sourceBuffer) throws IOException {
         byte[] result = new byte[calculateUncompressedSize(sourceBuffer)];
         decode(sourceBuffer, result);
@@ -153,16 +129,15 @@ public class LZFDecoder {
         }
         inPtr += 2;
         int type = inputBuffer[inPtr++];
-        int len = uint16(inputBuffer, inPtr);
+        int compLen = uint16(inputBuffer, inPtr);
         inPtr += 2;
         if (type == LZFChunk.BLOCK_TYPE_NON_COMPRESSED) { // uncompressed
-            is.read(outputBuffer, 0, len);
-            bytesInOutput = len;
+            readFully(is, false, outputBuffer, 0, compLen);
+            bytesInOutput = compLen;
         } else { // compressed
-            is.read(inputBuffer, inPtr, 2);
-            int uncompLen = uint16(inputBuffer, inPtr);
-            is.read(inputBuffer, 0, len);
-            decompressChunk(inputBuffer, 0, outputBuffer, 0, uncompLen);
+            readFully(is, true, inputBuffer, 0, 2 + compLen); // first 2 bytes are uncompressed length
+            int uncompLen = uint16(inputBuffer, 0);
+            decompressChunk(inputBuffer, 2, outputBuffer, 0, uncompLen);
             bytesInOutput = uncompLen;
         }
         return bytesInOutput;
@@ -180,29 +155,38 @@ public class LZFDecoder {
                 do {
                     out[outPos++] = in[inPos];
                 } while (inPos++ < ctrl);
-            } else {
-                // back reference
-                int len = ctrl >> 5;
-                ctrl = -((ctrl & 0x1f) << 8) - 1;
-                if (len == 7) {
-                    len += in[inPos++] & 255;
-                }
-                ctrl -= in[inPos++] & 255;
-                len += outPos + 2;
+                continue;
+            }
+            // back reference
+            int len = ctrl >> 5;
+            ctrl = -((ctrl & 0x1f) << 8) - 1;
+            if (len == 7) {
+                len += in[inPos++] & 255;
+            }
+            ctrl -= in[inPos++] & 255;
+            len += outPos + 2;
+            out[outPos] = out[outPos++ + ctrl];
+            out[outPos] = out[outPos++ + ctrl];
+
+            /* Odd: after extensive profiling, looks like magic number
+             * for unrolling is 4: with 8 performance is worse (even
+             * bit less than with no unrolling).
+             */
+            final int end = len - 3;
+            while (outPos < end) {
                 out[outPos] = out[outPos++ + ctrl];
                 out[outPos] = out[outPos++ + ctrl];
-                while (outPos < len - 8) {
-                    out[outPos] = out[outPos++ + ctrl];
-                    out[outPos] = out[outPos++ + ctrl];
-                    out[outPos] = out[outPos++ + ctrl];
-                    out[outPos] = out[outPos++ + ctrl];
-                    out[outPos] = out[outPos++ + ctrl];
-                    out[outPos] = out[outPos++ + ctrl];
-                    out[outPos] = out[outPos++ + ctrl];
-                    out[outPos] = out[outPos++ + ctrl];
-                }
-                while (outPos < len) {
+                out[outPos] = out[outPos++ + ctrl];
+                out[outPos] = out[outPos++ + ctrl];
+            }
+            // and, interestingly, unlooping works here too:
+            if (outPos < len) { // max 3 bytes to copy
+                out[outPos] = out[outPos++ + ctrl];
+                if (outPos < len) {
                     out[outPos] = out[outPos++ + ctrl];
+                    if (outPos < len) {
+                        out[outPos] = out[outPos++ + ctrl];
+                    }
                 }
             }
         } while (outPos < outEnd);
@@ -212,7 +196,22 @@ public class LZFDecoder {
             throw new IOException("Corrupt data: overrun in decompress, input offset " + inPos + ", output offset " + outPos);
     }
 
-    private static int uint16(byte[] data, int ptr) {
+    private final static int uint16(byte[] data, int ptr) {
         return ((data[ptr] & 0xFF) << 8) + (data[ptr + 1] & 0xFF);
     }
+
+    private final static void readFully(InputStream is, boolean compressed,
+                                        byte[] outputBuffer, int offset, int len) throws IOException {
+        int left = len;
+        while (left > 0) {
+            int count = is.read(outputBuffer, offset, left);
+            if (count < 0) { // EOF not allowed here
+                throw new IOException("EOF in " + len + " byte ("
+                        + (compressed ? "" : "un") + "compressed) block: could only read "
+                        + (len - left) + " bytes");
+            }
+            offset += count;
+            left -= count;
+        }
+    }
 }

+ 9 - 84
modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java

@@ -1,22 +1,3 @@
-/*
- * Licensed to Elastic Search and Shay Banon under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Elastic Search licenses this
- * file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
 /* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
  * file except in compliance with the License. You may obtain a copy of the License at
  *
@@ -30,10 +11,7 @@
 
 package org.elasticsearch.common.compress.lzf;
 
-import org.elasticsearch.common.thread.ThreadLocals;
-
 import java.io.IOException;
-import java.io.OutputStream;
 
 /**
  * Encoder that handles splitting of input into chunks to encode,
@@ -44,7 +22,6 @@ import java.io.OutputStream;
  */
 public class LZFEncoder {
     // Static methods only, no point in instantiating
-
     private LZFEncoder() {
     }
 
@@ -52,74 +29,22 @@ public class LZFEncoder {
         return encode(data, data.length);
     }
 
-
-    public static void encode(OutputStream os, byte[] data, int length) throws IOException {
-        int left = length;
-        ChunkEncoder enc = new ChunkEncoder(left);
-        int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left);
-        enc.encodeChunk(os, data, 0, chunkLen);
-        left -= chunkLen;
-        // shortcut: if it all fit in, no need to coalesce:
-        if (left < 1) {
-            return;
-        }
-        int inputOffset = chunkLen;
-
-        do {
-            chunkLen = Math.min(left, LZFChunk.MAX_CHUNK_LEN);
-            enc.encodeChunk(os, data, inputOffset, chunkLen);
-            inputOffset += chunkLen;
-            left -= chunkLen;
-        } while (left > 0);
-    }
-
-    public static final ThreadLocal<ThreadLocals.CleanableValue<ChunkEncoder>> cachedEncoder = new ThreadLocal<ThreadLocals.CleanableValue<ChunkEncoder>>() {
-        @Override protected ThreadLocals.CleanableValue<ChunkEncoder> initialValue() {
-            return new ThreadLocals.CleanableValue<ChunkEncoder>(new ChunkEncoder(LZFChunk.MAX_CHUNK_LEN));
-        }
-    };
-
-    public static byte[] encodeWithCache(byte[] data, int length) throws IOException {
-        int left = length;
-        ChunkEncoder enc = cachedEncoder.get().get();
-        int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left);
-        LZFChunk first = enc.encodeChunk(data, 0, chunkLen);
-        left -= chunkLen;
-        // shortcut: if it all fit in, no need to coalesce:
-        if (left < 1) {
-            return first.getData();
-        }
-        // otherwise need to get other chunks:
-        int resultBytes = first.length();
-        int inputOffset = chunkLen;
-        LZFChunk last = first;
-
-        do {
-            chunkLen = Math.min(left, LZFChunk.MAX_CHUNK_LEN);
-            LZFChunk chunk = enc.encodeChunk(data, inputOffset, chunkLen);
-            inputOffset += chunkLen;
-            left -= chunkLen;
-            resultBytes += chunk.length();
-            last.setNext(chunk);
-            last = chunk;
-        } while (left > 0);
-        // and then coalesce returns into single contiguous byte array
-        byte[] result = new byte[resultBytes];
-        int ptr = 0;
-        for (; first != null; first = first.next()) {
-            ptr = first.copyTo(result, ptr);
-        }
-        return result;
-    }
-
     /**
      * Method for compressing given input data using LZF encoding and
      * block structure (compatible with lzf command line utility).
      * Result consists of a sequence of chunks.
      */
     public static byte[] encode(byte[] data, int length) throws IOException {
+        ChunkEncoder enc = new ChunkEncoder(length);
+        byte[] result = encode(enc, data, length);
+        // important: may be able to reuse buffers
+        enc.close();
+        return result;
+    }
+
+    public static byte[] encode(ChunkEncoder enc, byte[] data, int length)
+            throws IOException {
         int left = length;
-        ChunkEncoder enc = new ChunkEncoder(left);
         int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left);
         LZFChunk first = enc.encodeChunk(data, 0, chunkLen);
         left -= chunkLen;

+ 75 - 29
modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFInputStream.java

@@ -4,16 +4,26 @@ import java.io.IOException;
 import java.io.InputStream;
 
 public class LZFInputStream extends InputStream {
-    public static final int EOF_FLAG = -1;
+    private final BufferRecycler _recycler;
 
-    /* stream to be decompressed */
-    private final InputStream inputStream;
+    /**
+     * stream to be decompressed
+     */
+    protected final InputStream inputStream;
+
+    /**
+     * Flag that indicates whether we force full reads (reading of as many
+     * bytes as requested), or 'optimal' reads (up to as many as available,
+     * but at least one). Default is false, meaning that 'optimal' read
+     * is used.
+     */
+    protected boolean cfgFullReads = false;
 
-    /* the current buffer of compressed bytes */
-    private final byte[] compressedBytes = new byte[LZFChunk.MAX_CHUNK_LEN];
+    /* the current buffer of compressed bytes (from which to decode) */
+    private byte[] _inputBuffer;
 
-    /* the buffer of uncompressed bytes from which */
-    private final byte[] uncompressedBytes = new byte[LZFChunk.MAX_CHUNK_LEN];
+    /* the buffer of uncompressed bytes from which content is read */
+    private byte[] _decodedBytes;
 
     /* The current position (next char to output) in the uncompressed bytes buffer. */
     private int bufferPosition = 0;
@@ -22,49 +32,85 @@ public class LZFInputStream extends InputStream {
     private int bufferLength = 0;
 
     public LZFInputStream(final InputStream inputStream) throws IOException {
+        this(inputStream, false);
+    }
+
+    /**
+     * @param inputStream Underlying input stream to use
+     * @param fullReads   Whether {@link #read(byte[])} should try to read exactly
+     *                    as many bytes as requested (true); or just however many happen to be
+     *                    available (false)
+     */
+    public LZFInputStream(final InputStream in, boolean fullReads) throws IOException {
         super();
-        this.inputStream = inputStream;
+        _recycler = BufferRecycler.instance();
+        inputStream = in;
+        cfgFullReads = fullReads;
+
+        _inputBuffer = _recycler.allocInputBuffer(LZFChunk.MAX_CHUNK_LEN);
+        _decodedBytes = _recycler.allocDecodeBuffer(LZFChunk.MAX_CHUNK_LEN);
     }
 
     @Override
     public int read() throws IOException {
-        int returnValue = EOF_FLAG;
         readyBuffer();
         if (bufferPosition < bufferLength) {
-            returnValue = (uncompressedBytes[bufferPosition++] & 255);
+            return _decodedBytes[bufferPosition++] & 255;
         }
-        return returnValue;
+        return -1;
     }
 
     public int read(final byte[] buffer) throws IOException {
-        return (read(buffer, 0, buffer.length));
-
+        return read(buffer, 0, buffer.length);
     }
 
-    public int read(final byte[] buffer, final int offset, final int length) throws IOException {
-        // FIXED HERE: handle 0 length cases
-        if (length == 0) {
+    public int read(final byte[] buffer, int offset, int length) throws IOException {
+        if (length < 1) {
             return 0;
         }
-        int outputPos = offset;
         readyBuffer();
-        if (bufferLength == -1) {
+        if (bufferLength < 0) {
             return -1;
         }
+        // First let's read however much data we happen to have...
+        int chunkLength = Math.min(bufferLength - bufferPosition, length);
+        System.arraycopy(_decodedBytes, bufferPosition, buffer, offset, chunkLength);
+        bufferPosition += chunkLength;
 
-        // FIXED HERE: fixed to use length
-        while (outputPos < length && bufferPosition < bufferLength) {
-            int chunkLength = Math.min(bufferLength - bufferPosition, length - outputPos);
-            System.arraycopy(uncompressedBytes, bufferPosition, buffer, outputPos, chunkLength);
-            outputPos += chunkLength;
-            bufferPosition += chunkLength;
-            readyBuffer();
+        if (chunkLength == length || !cfgFullReads) {
+            return chunkLength;
         }
-        return outputPos - offset;
+        // Need more data, then
+        int totalRead = chunkLength;
+        do {
+            offset += chunkLength;
+            readyBuffer();
+            if (bufferLength == -1) {
+                break;
+            }
+            chunkLength = Math.min(bufferLength - bufferPosition, (length - totalRead));
+            System.arraycopy(_decodedBytes, bufferPosition, buffer, offset, chunkLength);
+            bufferPosition += chunkLength;
+            totalRead += chunkLength;
+        } while (totalRead < length);
+
+        return totalRead;
     }
 
     public void close() throws IOException {
+        bufferPosition = bufferLength = 0;
+        byte[] buf = _inputBuffer;
+        if (buf != null) {
+            _inputBuffer = null;
+            _recycler.releaseInputBuffer(buf);
+        }
+        buf = _decodedBytes;
+        if (buf != null) {
+            _decodedBytes = null;
+            _recycler.releaseDecodeBuffer(buf);
+        }
         inputStream.close();
+
     }
 
     /**
@@ -72,10 +118,10 @@ public class LZFInputStream extends InputStream {
      *
      * @throws IOException
      */
-    private void readyBuffer() throws IOException {
+    private final void readyBuffer() throws IOException {
         if (bufferPosition >= bufferLength) {
-            bufferLength = LZFDecoder.decompressChunk(inputStream, compressedBytes, uncompressedBytes);
+            bufferLength = LZFDecoder.decompressChunk(inputStream, _inputBuffer, _decodedBytes);
             bufferPosition = 0;
         }
     }
-}
+}

+ 62 - 50
modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFOutputStream.java

@@ -1,80 +1,86 @@
-/*
- * Licensed to Elastic Search and Shay Banon under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Elastic Search licenses this
- * file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
 package org.elasticsearch.common.compress.lzf;
 
 import java.io.IOException;
 import java.io.OutputStream;
 
+/**
+ * @author jon hartlaub
+ * @author tatu
+ */
 public class LZFOutputStream extends OutputStream {
     private static int OUTPUT_BUFFER_SIZE = LZFChunk.MAX_CHUNK_LEN;
-    private static int BYTE_MASK = 0xff;
 
-    private final OutputStream outputStream;
+    private final ChunkEncoder _encoder;
+    private final BufferRecycler _recycler;
 
-    private final byte[] outputBuffer = new byte[OUTPUT_BUFFER_SIZE];
-    private final ChunkEncoder encoder = new ChunkEncoder(OUTPUT_BUFFER_SIZE);
-    private int position = 0;
+    protected final OutputStream _outputStream;
+    protected byte[] _outputBuffer;
+    protected int _position = 0;
 
     public LZFOutputStream(final OutputStream outputStream) {
-        this.outputStream = outputStream;
+        _encoder = new ChunkEncoder(OUTPUT_BUFFER_SIZE);
+        _recycler = BufferRecycler.instance();
+        _outputStream = outputStream;
+        _outputBuffer = _recycler.allocOutputBuffer(OUTPUT_BUFFER_SIZE);
     }
 
     @Override
     public void write(final int singleByte) throws IOException {
-        if (position >= outputBuffer.length) {
+        if (_position >= _outputBuffer.length) {
             writeCompressedBlock();
         }
-        outputBuffer[position++] = (byte) (singleByte & BYTE_MASK);
+        _outputBuffer[_position++] = (byte) singleByte;
     }
 
     @Override
-    public void write(final byte[] buffer, final int offset, final int length) throws IOException {
-        int inputCursor = offset;
-        int remainingBytes = length;
-        while (remainingBytes > 0) {
-            if (position >= outputBuffer.length) {
-                writeCompressedBlock();
-            }
-            int chunkLength = (remainingBytes > (outputBuffer.length - position)) ? outputBuffer.length - position : remainingBytes;
-            System.arraycopy(buffer, inputCursor, outputBuffer, position, chunkLength);
-            position += chunkLength;
-            remainingBytes -= chunkLength;
-            inputCursor += chunkLength;
+    public void write(final byte[] buffer, int offset, int length) throws IOException {
+        final int BUFFER_LEN = _outputBuffer.length;
+
+        // simple case first: buffering only (for trivially short writes)
+        int free = BUFFER_LEN - _position;
+        if (free >= length) {
+            System.arraycopy(buffer, offset, _outputBuffer, _position, length);
+            _position += length;
+            return;
+        }
+        // otherwise, copy whatever we can, flush
+        System.arraycopy(buffer, offset, _outputBuffer, _position, free);
+        offset += free;
+        length -= free;
+        _position += free;
+        writeCompressedBlock();
+
+        // then write intermediate full block, if any, without copying:
+        while (length >= BUFFER_LEN) {
+            _encoder.encodeAndWriteChunk(buffer, offset, BUFFER_LEN, _outputStream);
+            offset += BUFFER_LEN;
+            length -= BUFFER_LEN;
+        }
+
+        // and finally, copy leftovers in buffer, if any
+        if (length > 0) {
+            System.arraycopy(buffer, offset, _outputBuffer, 0, length);
         }
+        _position = length;
     }
 
     @Override
     public void flush() throws IOException {
-        try {
+        if (_position > 0) {
             writeCompressedBlock();
-        } finally {
-            outputStream.flush();
         }
+        _outputStream.flush();
     }
 
     @Override
     public void close() throws IOException {
-        try {
-            flush();
-        } finally {
-            outputStream.close();
+        flush();
+        _outputStream.close();
+        _encoder.close();
+        byte[] buf = _outputBuffer;
+        if (buf != null) {
+            _outputBuffer = null;
+            _recycler.releaseOutputBuffer(buf);
         }
     }
 
@@ -82,9 +88,15 @@ public class LZFOutputStream extends OutputStream {
      * Compress and write the current block to the OutputStream
      */
     private void writeCompressedBlock() throws IOException {
-        if (position > 0) {
-            encoder.encodeChunk(outputStream, outputBuffer, 0, position);
-            position = 0;
-        }
+        int left = _position;
+        _position = 0;
+        int offset = 0;
+
+        do {
+            int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left);
+            _encoder.encodeAndWriteChunk(_outputBuffer, 0, chunkLen, _outputStream);
+            offset += chunkLen;
+            left -= chunkLen;
+        } while (left > 0);
     }
 }

+ 27 - 0
modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/package-info.java

@@ -0,0 +1,27 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Copy of LZF code from ning compress based on 0.7 version.
+ *
+ * Changes:
+ *
+ * 1.
+ */
+package org.elasticsearch.common.compress.lzf;

+ 1 - 1
modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamInput.java

@@ -41,7 +41,7 @@ public class CachedStreamInput {
     private static final ThreadLocal<ThreadLocals.CleanableValue<Entry>> cache = new ThreadLocal<ThreadLocals.CleanableValue<Entry>>() {
         @Override protected ThreadLocals.CleanableValue<Entry> initialValue() {
             HandlesStreamInput handles = new HandlesStreamInput();
-            LZFStreamInput lzf = new LZFStreamInput();
+            LZFStreamInput lzf = new LZFStreamInput(null, true);
             return new ThreadLocals.CleanableValue<Entry>(new Entry(handles, lzf));
         }
     };

+ 1 - 1
modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java

@@ -44,7 +44,7 @@ public class CachedStreamOutput {
         @Override protected ThreadLocals.CleanableValue<Entry> initialValue() {
             BytesStreamOutput bytes = new BytesStreamOutput();
             HandlesStreamOutput handles = new HandlesStreamOutput(bytes);
-            LZFStreamOutput lzf = new LZFStreamOutput(bytes);
+            LZFStreamOutput lzf = new LZFStreamOutput(bytes, true);
             return new ThreadLocals.CleanableValue<Entry>(new Entry(bytes, handles, lzf));
         }
     };

+ 74 - 33
modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamInput.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.common.io.stream;
 
+import org.elasticsearch.common.compress.lzf.BufferRecycler;
 import org.elasticsearch.common.compress.lzf.LZFChunk;
 import org.elasticsearch.common.compress.lzf.LZFDecoder;
 
@@ -29,13 +30,26 @@ import java.io.IOException;
  * @author kimchy (shay.banon)
  */
 public class LZFStreamInput extends StreamInput {
-    public static final int EOF_FLAG = -1;
+    private final BufferRecycler _recycler;
 
-    /* the current buffer of compressed bytes */
-    private final byte[] compressedBytes = new byte[LZFChunk.MAX_CHUNK_LEN];
+    /**
+     * stream to be decompressed
+     */
+    protected StreamInput inputStream;
+
+    /**
+     * Flag that indicates whether we force full reads (reading of as many
+     * bytes as requested), or 'optimal' reads (up to as many as available,
+     * but at least one). Default is false, meaning that 'optimal' read
+     * is used.
+     */
+    protected boolean cfgFullReads = true; // ES: ALWAYS TRUE since we need to throw EOF when doing readBytes
 
-    /* the buffer of uncompressed bytes from which */
-    private final byte[] uncompressedBytes = new byte[LZFChunk.MAX_CHUNK_LEN];
+    /* the current buffer of compressed bytes (from which to decode) */
+    private byte[] _inputBuffer;
+
+    /* the buffer of uncompressed bytes from which content is read */
+    private byte[] _decodedBytes;
 
     /* The current position (next char to output) in the uncompressed bytes buffer. */
     private int bufferPosition = 0;
@@ -43,50 +57,64 @@ public class LZFStreamInput extends StreamInput {
     /* Length of the current uncompressed bytes buffer */
     private int bufferLength = 0;
 
-    private StreamInput in;
+    // ES: added to support never closing just resetting
+    private final boolean neverClose;
 
-    public LZFStreamInput() {
-    }
+    public LZFStreamInput(StreamInput in, boolean neverClose) {
+        super();
+        this.neverClose = neverClose;
+        _recycler = BufferRecycler.instance();
+        inputStream = in;
 
-    public LZFStreamInput(StreamInput in) throws IOException {
-        this.in = in;
-        // we need to read the first buffer here, since it might be a VOID message, and we need to at least read the LZF header
-        readyBuffer();
+        _inputBuffer = _recycler.allocInputBuffer(LZFChunk.MAX_CHUNK_LEN);
+        _decodedBytes = _recycler.allocDecodeBuffer(LZFChunk.MAX_CHUNK_LEN);
     }
 
     @Override public int read() throws IOException {
-        int returnValue = EOF_FLAG;
         readyBuffer();
         if (bufferPosition < bufferLength) {
-            returnValue = (uncompressedBytes[bufferPosition++] & 255);
+            return _decodedBytes[bufferPosition++] & 255;
         }
-        return returnValue;
+        return -1;
     }
 
-    @Override public int read(byte[] b, int off, int len) throws IOException {
-        if (len == 0) {
+    @Override public int read(byte[] buffer, int offset, int length) throws IOException {
+        if (length < 1) {
             return 0;
         }
-        int outputPos = off;
         readyBuffer();
-        if (bufferLength == -1) {
+        if (bufferLength < 0) {
             return -1;
         }
+        // First let's read however much data we happen to have...
+        int chunkLength = Math.min(bufferLength - bufferPosition, length);
+        System.arraycopy(_decodedBytes, bufferPosition, buffer, offset, chunkLength);
+        bufferPosition += chunkLength;
 
-        while (outputPos < len && bufferPosition < bufferLength) {
-            int chunkLength = Math.min(bufferLength - bufferPosition, len - outputPos);
-            System.arraycopy(uncompressedBytes, bufferPosition, b, outputPos, chunkLength);
-            outputPos += chunkLength;
-            bufferPosition += chunkLength;
-            readyBuffer();
+        if (chunkLength == length || !cfgFullReads) {
+            return chunkLength;
         }
-        return outputPos - off;
+        // Need more data, then
+        int totalRead = chunkLength;
+        do {
+            offset += chunkLength;
+            readyBuffer();
+            if (bufferLength == -1) {
+                break;
+            }
+            chunkLength = Math.min(bufferLength - bufferPosition, (length - totalRead));
+            System.arraycopy(_decodedBytes, bufferPosition, buffer, offset, chunkLength);
+            bufferPosition += chunkLength;
+            totalRead += chunkLength;
+        } while (totalRead < length);
+
+        return totalRead;
     }
 
     @Override public byte readByte() throws IOException {
         readyBuffer();
         if (bufferPosition < bufferLength) {
-            return (uncompressedBytes[bufferPosition++]);
+            return _decodedBytes[bufferPosition++];
         }
         throw new EOFException();
     }
@@ -101,15 +129,13 @@ public class LZFStreamInput extends StreamInput {
     @Override public void reset() throws IOException {
         this.bufferPosition = 0;
         this.bufferLength = 0;
-        in.reset();
+        inputStream.reset();
     }
 
     public void reset(StreamInput in) throws IOException {
-        this.in = in;
+        this.inputStream = in;
         this.bufferPosition = 0;
         this.bufferLength = 0;
-        // we need to read the first buffer here, since it might be a VOID message, and we need to at least read the LZF header
-        readyBuffer();
     }
 
     /**
@@ -120,7 +146,22 @@ public class LZFStreamInput extends StreamInput {
     }
 
     @Override public void close() throws IOException {
-        in.close();
+        if (neverClose) {
+            reset();
+            return;
+        }
+        bufferPosition = bufferLength = 0;
+        byte[] buf = _inputBuffer;
+        if (buf != null) {
+            _inputBuffer = null;
+            _recycler.releaseInputBuffer(buf);
+        }
+        buf = _decodedBytes;
+        if (buf != null) {
+            _decodedBytes = null;
+            _recycler.releaseDecodeBuffer(buf);
+        }
+        inputStream.close();
     }
 
     /**
@@ -130,7 +171,7 @@ public class LZFStreamInput extends StreamInput {
      */
     private void readyBuffer() throws IOException {
         if (bufferPosition >= bufferLength) {
-            bufferLength = LZFDecoder.decompressChunk(in, compressedBytes, uncompressedBytes);
+            bufferLength = LZFDecoder.decompressChunk(inputStream, _inputBuffer, _decodedBytes);
             bufferPosition = 0;
         }
     }

+ 78 - 39
modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/LZFStreamOutput.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.common.io.stream;
 
+import org.elasticsearch.common.compress.lzf.BufferRecycler;
 import org.elasticsearch.common.compress.lzf.ChunkEncoder;
 import org.elasticsearch.common.compress.lzf.LZFChunk;
 
@@ -29,83 +30,121 @@ import java.io.IOException;
  */
 public class LZFStreamOutput extends StreamOutput {
 
-    private StreamOutput out;
+    private static int OUTPUT_BUFFER_SIZE = LZFChunk.MAX_CHUNK_LEN;
 
-    private final byte[] outputBuffer = new byte[LZFChunk.MAX_CHUNK_LEN];
-    private final ChunkEncoder encoder = new ChunkEncoder(LZFChunk.MAX_CHUNK_LEN);
+    private final ChunkEncoder _encoder;
+    private final BufferRecycler _recycler;
 
-    private int position = 0;
+    protected StreamOutput _outputStream;
+    protected byte[] _outputBuffer;
+    protected int _position = 0;
 
-    public LZFStreamOutput(StreamOutput out) {
-        this.out = out;
+    private final boolean neverClose;
+
+    public LZFStreamOutput(StreamOutput out, boolean neverClose) {
+        this.neverClose = neverClose;
+        _encoder = new ChunkEncoder(OUTPUT_BUFFER_SIZE);
+        _recycler = BufferRecycler.instance();
+        _outputStream = out;
+        _outputBuffer = _recycler.allocOutputBuffer(OUTPUT_BUFFER_SIZE);
     }
 
     @Override public void write(final int singleByte) throws IOException {
-        if (position >= outputBuffer.length) {
+        if (_position >= _outputBuffer.length) {
             writeCompressedBlock();
         }
-        outputBuffer[position++] = (byte) (singleByte & 0xff);
+        _outputBuffer[_position++] = (byte) singleByte;
     }
 
     @Override public void writeByte(byte b) throws IOException {
-        if (position >= outputBuffer.length) {
+        if (_position >= _outputBuffer.length) {
             writeCompressedBlock();
         }
-        outputBuffer[position++] = b;
+        _outputBuffer[_position++] = b;
     }
 
-    @Override public void writeBytes(byte[] b, int offset, int length) throws IOException {
-        int inputCursor = offset;
-        int remainingBytes = length;
-        while (remainingBytes > 0) {
-            if (position >= outputBuffer.length) {
-                writeCompressedBlock();
-            }
-            int chunkLength = (remainingBytes > (outputBuffer.length - position)) ? outputBuffer.length - position : remainingBytes;
-            System.arraycopy(b, inputCursor, outputBuffer, position, chunkLength);
-            position += chunkLength;
-            remainingBytes -= chunkLength;
-            inputCursor += chunkLength;
+    @Override public void writeBytes(byte[] buffer, int offset, int length) throws IOException {
+        final int BUFFER_LEN = _outputBuffer.length;
+
+        // simple case first: buffering only (for trivially short writes)
+        int free = BUFFER_LEN - _position;
+        if (free >= length) {
+            System.arraycopy(buffer, offset, _outputBuffer, _position, length);
+            _position += length;
+            return;
         }
+        // otherwise, copy whatever we can, flush
+        System.arraycopy(buffer, offset, _outputBuffer, _position, free);
+        offset += free;
+        length -= free;
+        _position += free;
+        writeCompressedBlock();
+
+        // then write intermediate full block, if any, without copying:
+        while (length >= BUFFER_LEN) {
+            _encoder.encodeAndWriteChunk(buffer, offset, BUFFER_LEN, _outputStream);
+            offset += BUFFER_LEN;
+            length -= BUFFER_LEN;
+        }
+
+        // and finally, copy leftovers in buffer, if any
+        if (length > 0) {
+            System.arraycopy(buffer, offset, _outputBuffer, 0, length);
+        }
+        _position = length;
     }
 
-    @Override public void flush() throws IOException {
-        try {
+    @Override
+    public void flush() throws IOException {
+        if (_position > 0) {
             writeCompressedBlock();
-        } finally {
-            out.flush();
         }
+        _outputStream.flush();
     }
 
-    @Override public void close() throws IOException {
-        try {
-            flush();
-        } finally {
-            out.close();
+    @Override
+    public void close() throws IOException {
+        flush();
+        if (neverClose) {
+            reset();
+            return;
+        }
+        _outputStream.close();
+        _encoder.close();
+        byte[] buf = _outputBuffer;
+        if (buf != null) {
+            _outputBuffer = null;
+            _recycler.releaseOutputBuffer(buf);
         }
     }
 
     @Override public void reset() throws IOException {
-        this.position = 0;
-        out.reset();
+        _position = 0;
+        _outputStream.reset();
     }
 
     public void reset(StreamOutput out) throws IOException {
-        this.out = out;
+        this._outputStream = out;
         reset();
     }
 
     public StreamOutput wrappedOut() {
-        return this.out;
+        return this._outputStream;
     }
 
     /**
      * Compress and write the current block to the OutputStream
      */
     private void writeCompressedBlock() throws IOException {
-        if (position > 0) {
-            encoder.encodeChunk(out, outputBuffer, 0, position);
-            position = 0;
-        }
+        int left = _position;
+        _position = 0;
+        int offset = 0;
+
+        do {
+            int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left);
+            _encoder.encodeAndWriteChunk(_outputBuffer, 0, chunkLen, _outputStream);
+            offset += chunkLen;
+            left -= chunkLen;
+        } while (left > 0);
     }
 }

+ 2 - 2
modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobStoreGateway.java

@@ -26,7 +26,7 @@ import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.blobstore.*;
 import org.elasticsearch.common.collect.ImmutableMap;
 import org.elasticsearch.common.collect.Lists;
-import org.elasticsearch.common.compress.lzf.LZFDecoder;
+import org.elasticsearch.common.compress.lzf.LZF;
 import org.elasticsearch.common.compress.lzf.LZFEncoder;
 import org.elasticsearch.common.io.stream.BytesStreamInput;
 import org.elasticsearch.common.io.stream.CachedStreamInput;
@@ -206,7 +206,7 @@ public abstract class BlobStoreGateway extends SharedStorageGateway {
     private MetaData readMetaData(byte[] data) throws IOException {
         XContentParser parser = null;
         try {
-            if (LZFDecoder.isCompressed(data)) {
+            if (LZF.isCompressed(data)) {
                 BytesStreamInput siBytes = new BytesStreamInput(data);
                 LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
                 parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf);

+ 3 - 3
modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java

@@ -31,7 +31,7 @@ import org.elasticsearch.cluster.routing.MutableShardRouting;
 import org.elasticsearch.cluster.routing.RoutingNode;
 import org.elasticsearch.common.collect.Sets;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
-import org.elasticsearch.common.compress.lzf.LZFDecoder;
+import org.elasticsearch.common.compress.lzf.LZF;
 import org.elasticsearch.common.compress.lzf.LZFOutputStream;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.inject.Module;
@@ -390,7 +390,7 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
     private LocalGatewayMetaState readMetaState(byte[] data) throws IOException {
         XContentParser parser = null;
         try {
-            if (LZFDecoder.isCompressed(data)) {
+            if (LZF.isCompressed(data)) {
                 BytesStreamInput siBytes = new BytesStreamInput(data);
                 LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
                 parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf);
@@ -408,7 +408,7 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
     private LocalGatewayStartedShards readStartedShards(byte[] data) throws IOException {
         XContentParser parser = null;
         try {
-            if (LZFDecoder.isCompressed(data)) {
+            if (LZF.isCompressed(data)) {
                 BytesStreamInput siBytes = new BytesStreamInput(data);
                 LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
                 parser = XContentFactory.xContent(XContentType.JSON).createParser(siLzf);

+ 4 - 3
modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/SourceFieldMapper.java

@@ -21,6 +21,7 @@ package org.elasticsearch.index.mapper.xcontent;
 
 import org.apache.lucene.document.*;
 import org.elasticsearch.ElasticSearchParseException;
+import org.elasticsearch.common.compress.lzf.LZF;
 import org.elasticsearch.common.compress.lzf.LZFDecoder;
 import org.elasticsearch.common.compress.lzf.LZFEncoder;
 import org.elasticsearch.common.lucene.Lucene;
@@ -116,9 +117,9 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements or
             return null;
         }
         byte[] data = context.source();
-        if (compress != null && compress && !LZFDecoder.isCompressed(data)) {
+        if (compress != null && compress && !LZF.isCompressed(data)) {
             if (compressThreshold == -1 || data.length > compressThreshold) {
-                data = LZFEncoder.encodeWithCache(data, data.length);
+                data = LZFEncoder.encode(data, data.length);
                 context.source(data);
             }
         }
@@ -139,7 +140,7 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements or
         if (value == null) {
             return value;
         }
-        if (LZFDecoder.isCompressed(value)) {
+        if (LZF.isCompressed(value)) {
             try {
                 return LZFDecoder.decode(value);
             } catch (IOException e) {

+ 2 - 2
modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapper.java

@@ -26,7 +26,7 @@ import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Preconditions;
 import org.elasticsearch.common.collect.ImmutableMap;
 import org.elasticsearch.common.compress.CompressedString;
-import org.elasticsearch.common.compress.lzf.LZFDecoder;
+import org.elasticsearch.common.compress.lzf.LZF;
 import org.elasticsearch.common.io.stream.BytesStreamInput;
 import org.elasticsearch.common.io.stream.CachedStreamInput;
 import org.elasticsearch.common.io.stream.LZFStreamInput;
@@ -384,7 +384,7 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
         XContentParser parser = source.parser();
         try {
             if (parser == null) {
-                if (LZFDecoder.isCompressed(source.source())) {
+                if (LZF.isCompressed(source.source())) {
                     BytesStreamInput siBytes = new BytesStreamInput(source.source());
                     LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
                     XContentType contentType = XContentFactory.xContentType(siLzf);

+ 3 - 0
modules/elasticsearch/src/main/java/org/elasticsearch/index/store/support/AbstractStore.java

@@ -138,6 +138,9 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
         try {
             indexInput.readInt(); // version
             return indexInput.readStringStringMap();
+        } catch (Exception e) {
+            // failed to load checksums, ignore and return an empty map
+            return new HashMap<String, String>();
         } finally {
             indexInput.close();
         }

+ 2 - 2
modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/support/RestXContentBuilder.java

@@ -19,7 +19,7 @@
 
 package org.elasticsearch.rest.action.support;
 
-import org.elasticsearch.common.compress.lzf.LZFDecoder;
+import org.elasticsearch.common.compress.lzf.LZF;
 import org.elasticsearch.common.io.stream.BytesStreamInput;
 import org.elasticsearch.common.io.stream.CachedStreamInput;
 import org.elasticsearch.common.io.stream.LZFStreamInput;
@@ -61,7 +61,7 @@ public class RestXContentBuilder {
     }
 
     public static void restDocumentSource(byte[] source, XContentBuilder builder, ToXContent.Params params) throws IOException {
-        if (LZFDecoder.isCompressed(source)) {
+        if (LZF.isCompressed(source)) {
             BytesStreamInput siBytes = new BytesStreamInput(source);
             LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
             XContentType contentType = XContentFactory.xContentType(siLzf);

+ 2 - 1
modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java

@@ -25,6 +25,7 @@ import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.Unicode;
 import org.elasticsearch.common.collect.ImmutableMap;
+import org.elasticsearch.common.compress.lzf.LZF;
 import org.elasticsearch.common.compress.lzf.LZFDecoder;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -152,7 +153,7 @@ public class InternalSearchHit implements SearchHit {
         if (source == null) {
             return null;
         }
-        if (LZFDecoder.isCompressed(source)) {
+        if (LZF.isCompressed(source)) {
             try {
                 this.source = LZFDecoder.decode(source);
             } catch (IOException e) {

+ 2 - 2
modules/elasticsearch/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java

@@ -24,7 +24,7 @@ import org.apache.lucene.document.Fieldable;
 import org.apache.lucene.index.IndexReader;
 import org.elasticsearch.ElasticSearchParseException;
 import org.elasticsearch.common.collect.Lists;
-import org.elasticsearch.common.compress.lzf.LZFDecoder;
+import org.elasticsearch.common.compress.lzf.LZF;
 import org.elasticsearch.common.io.stream.BytesStreamInput;
 import org.elasticsearch.common.io.stream.CachedStreamInput;
 import org.elasticsearch.common.io.stream.LZFStreamInput;
@@ -65,7 +65,7 @@ public class SourceLookup implements Map {
             Document doc = reader.document(docId, SourceFieldSelector.INSTANCE);
             Fieldable sourceField = doc.getFieldable(SourceFieldMapper.NAME);
             byte[] source = sourceField.getBinaryValue();
-            if (LZFDecoder.isCompressed(source)) {
+            if (LZF.isCompressed(source)) {
                 BytesStreamInput siBytes = new BytesStreamInput(source);
                 LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
                 XContentType contentType = XContentFactory.xContentType(siLzf);

+ 5 - 5
modules/elasticsearch/src/test/java/org/elasticsearch/index/mapper/xcontent/source/CompressSourceMappingTests.java

@@ -19,7 +19,7 @@
 
 package org.elasticsearch.index.mapper.xcontent.source;
 
-import org.elasticsearch.common.compress.lzf.LZFDecoder;
+import org.elasticsearch.common.compress.lzf.LZF;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.index.mapper.ParsedDocument;
 import org.elasticsearch.index.mapper.xcontent.MapperTests;
@@ -46,7 +46,7 @@ public class CompressSourceMappingTests {
                 .field("field2", "value2")
                 .endObject().copiedBytes());
 
-        assertThat(LZFDecoder.isCompressed(doc.doc().getBinaryValue("_source")), equalTo(false));
+        assertThat(LZF.isCompressed(doc.doc().getBinaryValue("_source")), equalTo(false));
     }
 
     @Test public void testCompressEnabled() throws Exception {
@@ -61,7 +61,7 @@ public class CompressSourceMappingTests {
                 .field("field2", "value2")
                 .endObject().copiedBytes());
 
-        assertThat(LZFDecoder.isCompressed(doc.doc().getBinaryValue("_source")), equalTo(true));
+        assertThat(LZF.isCompressed(doc.doc().getBinaryValue("_source")), equalTo(true));
     }
 
     @Test public void testCompressThreshold() throws Exception {
@@ -75,7 +75,7 @@ public class CompressSourceMappingTests {
                 .field("field1", "value1")
                 .endObject().copiedBytes());
 
-        assertThat(LZFDecoder.isCompressed(doc.doc().getBinaryValue("_source")), equalTo(false));
+        assertThat(LZF.isCompressed(doc.doc().getBinaryValue("_source")), equalTo(false));
 
         doc = documentMapper.parse("type", "1", XContentFactory.jsonBuilder().startObject()
                 .field("field1", "value1")
@@ -85,6 +85,6 @@ public class CompressSourceMappingTests {
                 .field("field2", "value2 xxxxxxxxxxxxxx yyyyyyyyyyyyyyyyyyy zzzzzzzzzzzzzzzzz")
                 .endObject().copiedBytes());
 
-        assertThat(LZFDecoder.isCompressed(doc.doc().getBinaryValue("_source")), equalTo(true));
+        assertThat(LZF.isCompressed(doc.doc().getBinaryValue("_source")), equalTo(true));
     }
 }

+ 49 - 0
modules/elasticsearch/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java

@@ -23,6 +23,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Streamable;
+import org.elasticsearch.common.io.stream.VoidStreamable;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.testng.annotations.AfterMethod;
@@ -116,6 +117,54 @@ public abstract class AbstractSimpleTransportTests {
         serviceA.removeHandler("sayHello");
     }
 
+    @Test public void testVoidMessageCompressed() {
+        serviceA.registerHandler("sayHello", new BaseTransportRequestHandler<VoidStreamable>() {
+            @Override public VoidStreamable newInstance() {
+                return VoidStreamable.INSTANCE;
+            }
+
+            @Override public String executor() {
+                return ThreadPool.Names.CACHED;
+            }
+
+            @Override public void messageReceived(VoidStreamable request, TransportChannel channel) {
+                try {
+                    channel.sendResponse(VoidStreamable.INSTANCE, TransportResponseOptions.options().withCompress(true));
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    assertThat(e.getMessage(), false, equalTo(true));
+                }
+            }
+        });
+
+        TransportFuture<VoidStreamable> res = serviceB.submitRequest(serviceANode, "sayHello",
+                VoidStreamable.INSTANCE, TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler<VoidStreamable>() {
+                    @Override public VoidStreamable newInstance() {
+                        return VoidStreamable.INSTANCE;
+                    }
+
+                    @Override public String executor() {
+                        return ThreadPool.Names.CACHED;
+                    }
+
+                    @Override public void handleResponse(VoidStreamable response) {
+                    }
+
+                    @Override public void handleException(TransportException exp) {
+                        exp.printStackTrace();
+                        assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true));
+                    }
+                });
+
+        try {
+            VoidStreamable message = res.get();
+            assertThat(message, notNullValue());
+        } catch (Exception e) {
+            assertThat(e.getMessage(), false, equalTo(true));
+        }
+
+        serviceA.removeHandler("sayHello");
+    }
 
     @Test public void testHelloWorldCompressed() {
         serviceA.registerHandler("sayHello", new BaseTransportRequestHandler<StringMessage>() {

+ 4 - 0
modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java

@@ -39,6 +39,10 @@ public class SimpleNettyTransportTests extends AbstractSimpleTransportTests {
         serviceBNode = new DiscoveryNode("B", serviceB.boundAddress().publishAddress());
     }
 
+    @Override public void testVoidMessageCompressed() {
+        super.testVoidMessageCompressed();    //To change body of overridden methods use File | Settings | File Templates.
+    }
+
     @Test public void testConnectException() {
         try {
             serviceA.connectToNode(new DiscoveryNode("C", new InetSocketTransportAddress("localhost", 9876)));