Browse Source

Dry up TransportDecompressor Implementations (#98351)

Lots of code can be shared across Lz4 and Deflate here, especially
the bits around acquiring and releasing pages.
Armin Braun 2 years ago
parent
commit
bb1dad8f5c

+ 4 - 44
server/src/main/java/org/elasticsearch/transport/DeflateTransportDecompressor.java

@@ -10,30 +10,21 @@ package org.elasticsearch.transport;
 
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefIterator;
-import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.recycler.Recycler;
 import org.elasticsearch.common.util.PageCacheRecycler;
 
 import java.io.IOException;
-import java.util.ArrayDeque;
 import java.util.zip.DataFormatException;
 import java.util.zip.Inflater;
 
-public class DeflateTransportDecompressor implements TransportDecompressor {
+public class DeflateTransportDecompressor extends TransportDecompressor {
 
     private final Inflater inflater;
-    private final Recycler<BytesRef> recycler;
-    private final ArrayDeque<Recycler.V<BytesRef>> pages;
-    private int pageOffset = 0;
-    private int pageLength = 0;
-    private boolean hasSkippedHeader = false;
 
     public DeflateTransportDecompressor(Recycler<BytesRef> recycler) {
-        this.recycler = recycler;
+        super(recycler);
         inflater = new Inflater(true);
-        pages = new ArrayDeque<>(4);
     }
 
     @Override
@@ -53,14 +44,7 @@ public class DeflateTransportDecompressor implements TransportDecompressor {
             bytesConsumed += ref.length;
             boolean continueInflating = true;
             while (continueInflating) {
-                final boolean isNewPage = pageOffset == pageLength;
-                if (isNewPage) {
-                    Recycler.V<BytesRef> newPage = recycler.obtain();
-                    pageOffset = 0;
-                    pageLength = newPage.v().length;
-                    assert newPage.v().length > 0;
-                    pages.add(newPage);
-                }
+                final boolean isNewPage = maybeAddNewPage();
                 final Recycler.V<BytesRef> page = pages.getLast();
 
                 BytesRef output = page.v();
@@ -96,28 +80,6 @@ public class DeflateTransportDecompressor implements TransportDecompressor {
         return inflater.finished();
     }
 
-    @Override
-    public ReleasableBytesReference pollDecompressedPage(boolean isEOS) {
-        if (pages.isEmpty()) {
-            return null;
-        } else if (pages.size() == 1) {
-            if (isEOS) {
-                assert isEOS();
-                Recycler.V<BytesRef> page = pages.pollFirst();
-                BytesArray delegate = new BytesArray(page.v().bytes, page.v().offset, pageOffset);
-                ReleasableBytesReference reference = new ReleasableBytesReference(delegate, page);
-                pageLength = 0;
-                pageOffset = 0;
-                return reference;
-            } else {
-                return null;
-            }
-        } else {
-            Recycler.V<BytesRef> page = pages.pollFirst();
-            return new ReleasableBytesReference(new BytesArray(page.v()), page);
-        }
-    }
-
     @Override
     public Compression.Scheme getScheme() {
         return Compression.Scheme.DEFLATE;
@@ -126,8 +88,6 @@ public class DeflateTransportDecompressor implements TransportDecompressor {
     @Override
     public void close() {
         inflater.end();
-        for (Recycler.V<BytesRef> page : pages) {
-            page.close();
-        }
+        super.close();
     }
 }

+ 6 - 54
server/src/main/java/org/elasticsearch/transport/Lz4TransportDecompressor.java

@@ -26,18 +26,15 @@ import net.jpountz.lz4.LZ4Exception;
 import net.jpountz.lz4.LZ4FastDecompressor;
 
 import org.apache.lucene.util.BytesRef;
-import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.recycler.Recycler;
 
 import java.io.IOException;
-import java.util.ArrayDeque;
 import java.util.Locale;
 
 /**
- * This file is forked from the https://netty.io project. In particular it forks the following file
+ * This file is forked from the https://netty.io project. In particular, it forks the following file
  * io.netty.handler.codec.compression.Lz4FrameDecoder.
  *
  * It modifies the original netty code to operate on byte arrays opposed to ByteBufs.
@@ -46,7 +43,7 @@ import java.util.Locale;
  *
  * This class is necessary as Netty is not a dependency in Elasticsearch server module.
  */
-public class Lz4TransportDecompressor implements TransportDecompressor {
+public class Lz4TransportDecompressor extends TransportDecompressor {
 
     private static final ThreadLocal<byte[]> DECOMPRESSED = ThreadLocal.withInitial(() -> BytesRef.EMPTY_BYTES);
     private static final ThreadLocal<byte[]> COMPRESSED = ThreadLocal.withInitial(() -> BytesRef.EMPTY_BYTES);
@@ -68,9 +65,7 @@ public class Lz4TransportDecompressor implements TransportDecompressor {
      */
     static final int COMPRESSION_LEVEL_BASE = 10;
 
-    static final int MIN_BLOCK_SIZE = 64;
     static final int MAX_BLOCK_SIZE = 1 << COMPRESSION_LEVEL_BASE + 0x0F;   // 32 M
-    static final int DEFAULT_BLOCK_SIZE = 1 << 16;  // 64 KB
 
     static final int BLOCK_TYPE_NON_COMPRESSED = 0x10;
     static final int BLOCK_TYPE_COMPRESSED = 0x20;
@@ -104,37 +99,9 @@ public class Lz4TransportDecompressor implements TransportDecompressor {
      */
     private int decompressedLength;
 
-    private final Recycler<BytesRef> recycler;
-    private final ArrayDeque<Recycler.V<BytesRef>> pages;
-    private int pageOffset = 0;
-    private int pageLength = 0;
-    private boolean hasSkippedESHeader = false;
-
     public Lz4TransportDecompressor(Recycler<BytesRef> recycler) {
+        super(recycler);
         this.decompressor = Compression.Scheme.lz4Decompressor();
-        this.recycler = recycler;
-        this.pages = new ArrayDeque<>(4);
-    }
-
-    @Override
-    public ReleasableBytesReference pollDecompressedPage(boolean isEOS) {
-        if (pages.isEmpty()) {
-            return null;
-        } else if (pages.size() == 1) {
-            if (isEOS) {
-                Recycler.V<BytesRef> page = pages.pollFirst();
-                BytesArray delegate = new BytesArray(page.v().bytes, page.v().offset, pageOffset);
-                ReleasableBytesReference reference = new ReleasableBytesReference(delegate, page);
-                pageLength = 0;
-                pageOffset = 0;
-                return reference;
-            } else {
-                return null;
-            }
-        } else {
-            Recycler.V<BytesRef> page = pages.pollFirst();
-            return new ReleasableBytesReference(new BytesArray(page.v()), page);
-        }
     }
 
     @Override
@@ -142,18 +109,11 @@ public class Lz4TransportDecompressor implements TransportDecompressor {
         return Compression.Scheme.LZ4;
     }
 
-    @Override
-    public void close() {
-        for (Recycler.V<BytesRef> page : pages) {
-            page.close();
-        }
-    }
-
     @Override
     public int decompress(BytesReference bytesReference) throws IOException {
         int bytesConsumed = 0;
-        if (hasSkippedESHeader == false) {
-            hasSkippedESHeader = true;
+        if (hasSkippedHeader == false) {
+            hasSkippedHeader = true;
             int esHeaderLength = Compression.Scheme.HEADER_LENGTH;
             bytesReference = bytesReference.slice(esHeaderLength, bytesReference.length() - esHeaderLength);
             bytesConsumed += esHeaderLength;
@@ -292,16 +252,8 @@ public class Lz4TransportDecompressor implements TransportDecompressor {
                         int bytesToCopy = decompressedLength;
                         int uncompressedOffset = 0;
                         while (bytesToCopy > 0) {
-                            final boolean isNewPage = pageOffset == pageLength;
-                            if (isNewPage) {
-                                Recycler.V<BytesRef> newPage = recycler.obtain();
-                                pageOffset = 0;
-                                pageLength = newPage.v().length;
-                                assert newPage.v().length > 0;
-                                pages.add(newPage);
-                            }
+                            maybeAddNewPage();
                             final Recycler.V<BytesRef> page = pages.getLast();
-
                             int toCopy = Math.min(bytesToCopy, pageLength - pageOffset);
                             System.arraycopy(decompressed, uncompressedOffset, page.v().bytes, page.v().offset + pageOffset, toCopy);
                             pageOffset += toCopy;

+ 58 - 6
server/src/main/java/org/elasticsearch/transport/TransportDecompressor.java

@@ -9,14 +9,28 @@
 package org.elasticsearch.transport;
 
 import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.recycler.Recycler;
 import org.elasticsearch.core.Releasable;
 
 import java.io.IOException;
+import java.util.ArrayDeque;
 
-public interface TransportDecompressor extends Releasable {
+public abstract class TransportDecompressor implements Releasable {
+
+    protected int pageOffset = 0;
+    protected int pageLength = 0;
+    protected boolean hasSkippedHeader = false;
+
+    protected final ArrayDeque<Recycler.V<BytesRef>> pages = new ArrayDeque<>(4);
+
+    private final Recycler<BytesRef> recycler;
+
+    protected TransportDecompressor(Recycler<BytesRef> recycler) {
+        this.recycler = recycler;
+    }
 
     /**
      * Decompress the provided bytes
@@ -24,16 +38,33 @@ public interface TransportDecompressor extends Releasable {
      * @param bytesReference to decompress
      * @return number of compressed bytes consumed
      */
-    int decompress(BytesReference bytesReference) throws IOException;
+    public abstract int decompress(BytesReference bytesReference) throws IOException;
 
-    ReleasableBytesReference pollDecompressedPage(boolean isEOS);
+    public ReleasableBytesReference pollDecompressedPage(boolean isEOS) {
+        if (pages.isEmpty()) {
+            return null;
+        } else if (pages.size() == 1) {
+            if (isEOS) {
+                return pollLastPage();
+            } else {
+                return null;
+            }
+        } else {
+            Recycler.V<BytesRef> page = pages.pollFirst();
+            return new ReleasableBytesReference(new BytesArray(page.v()), page);
+        }
+    }
 
-    Compression.Scheme getScheme();
+    public abstract Compression.Scheme getScheme();
 
     @Override
-    void close();
+    public void close() {
+        for (Recycler.V<BytesRef> page : pages) {
+            page.close();
+        }
+    }
 
-    static TransportDecompressor getDecompressor(Recycler<BytesRef> recycler, BytesReference bytes) throws IOException {
+    static TransportDecompressor getDecompressor(Recycler<BytesRef> recycler, BytesReference bytes) {
         if (bytes.length() < Compression.Scheme.HEADER_LENGTH) {
             return null;
         }
@@ -47,6 +78,27 @@ public interface TransportDecompressor extends Releasable {
         }
     }
 
+    protected ReleasableBytesReference pollLastPage() {
+        Recycler.V<BytesRef> page = pages.pollFirst();
+        BytesArray delegate = new BytesArray(page.v().bytes, page.v().offset, pageOffset);
+        ReleasableBytesReference reference = new ReleasableBytesReference(delegate, page);
+        pageLength = 0;
+        pageOffset = 0;
+        return reference;
+    }
+
+    protected boolean maybeAddNewPage() {
+        if (pageOffset == pageLength) {
+            Recycler.V<BytesRef> newPage = recycler.obtain();
+            pageOffset = 0;
+            pageLength = newPage.v().length;
+            assert newPage.v().length > 0;
+            pages.add(newPage);
+            return true;
+        }
+        return false;
+    }
+
     private static IllegalStateException createIllegalState(BytesReference bytes) {
         int maxToRead = Math.min(bytes.length(), 10);
         StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead)