Przeglądaj źródła

Add additional transport compression options (#74587)

This commit is related to #73497. It adds two new settings. The first setting
is transport.compression_scheme. This setting allows the user to
configure LZ4 or DEFLATE as the transport compression. Additionally, it
modifies transport.compress to support the value indexing_data. When
this setting is set to indexing_data only messages which are primarily
composed of raw source data will be compressed. This is bulk, operations
recovery, and shard changes messages.
Tim Brooks 4 lat temu
rodzic
commit
293d490ded
46 zmienionych plików z 1734 dodań i 238 usunięć
  1. 5 4
      docs/reference/modules/remote-clusters.asciidoc
  2. 18 2
      docs/reference/modules/transport.asciidoc
  3. 42 2
      qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java
  4. 8 0
      server/build.gradle
  5. 1 0
      server/licenses/lz4-java-1.8.0.jar.sha1
  6. 202 0
      server/licenses/lz4-java-LICENSE.txt
  7. 0 0
      server/licenses/lz4-java-NOTICE.txt
  8. 2 1
      server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java
  9. 2 0
      server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
  10. 17 0
      server/src/main/java/org/elasticsearch/common/settings/Setting.java
  11. 1 1
      server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java
  12. 2 1
      server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java
  13. 84 0
      server/src/main/java/org/elasticsearch/transport/Compression.java
  14. 40 10
      server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java
  15. 122 0
      server/src/main/java/org/elasticsearch/transport/DeflateTransportDecompressor.java
  16. 25 14
      server/src/main/java/org/elasticsearch/transport/InboundDecoder.java
  17. 352 0
      server/src/main/java/org/elasticsearch/transport/Lz4TransportDecompressor.java
  18. 12 2
      server/src/main/java/org/elasticsearch/transport/NetworkMessage.java
  19. 20 5
      server/src/main/java/org/elasticsearch/transport/OutboundHandler.java
  20. 27 20
      server/src/main/java/org/elasticsearch/transport/OutboundMessage.java
  21. 17 0
      server/src/main/java/org/elasticsearch/transport/RawIndexingDataTransportRequest.java
  22. 9 2
      server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java
  23. 9 2
      server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java
  24. 334 0
      server/src/main/java/org/elasticsearch/transport/ReuseBuffersLZ4BlockOutputStream.java
  25. 6 3
      server/src/main/java/org/elasticsearch/transport/TcpTransport.java
  26. 29 108
      server/src/main/java/org/elasticsearch/transport/TransportDecompressor.java
  27. 6 2
      server/src/main/java/org/elasticsearch/transport/TransportSettings.java
  28. 1 1
      server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java
  29. 29 2
      server/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java
  30. 13 14
      server/src/test/java/org/elasticsearch/transport/DeflateTransportDecompressorTests.java
  31. 15 10
      server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java
  32. 2 2
      server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java
  33. 24 9
      server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java
  34. 173 0
      server/src/test/java/org/elasticsearch/transport/Lz4TransportDecompressorTests.java
  35. 11 3
      server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java
  36. 14 3
      server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java
  37. 16 3
      server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java
  38. 1 1
      server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java
  39. 1 1
      server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java
  40. 19 4
      test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
  41. 10 2
      test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java
  42. 1 0
      test/framework/src/main/java/org/elasticsearch/transport/TestProfiles.java
  43. 4 1
      test/framework/src/main/java/org/elasticsearch/transport/TestTransportChannels.java
  44. 1 0
      test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java
  45. 5 2
      x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java
  46. 2 1
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

+ 5 - 4
docs/reference/modules/remote-clusters.asciidoc

@@ -288,11 +288,12 @@ separately.
 
 `cluster.remote.<cluster_alias>.transport.compress`::
 
-  Per cluster boolean setting that enables you to configure compression for
-  requests to a specific remote cluster. This setting impacts only requests
+  Per cluster setting that enables you to configure compression for requests
+  to a specific remote cluster. This setting impacts only requests
   sent to the remote cluster. If the inbound request is compressed,
-  Elasticsearch compresses the response. If unset, the global
-  `transport.compress` is used as the fallback setting.
+  Elasticsearch compresses the response. The setting options are `true`,
+  `indexing_data`, and `false`. The option `indexing_data` is experimental.
+  If unset, the global `transport.compress` is used as the fallback setting.
 
 [discrete]
 [[remote-cluster-sniff-settings]]

+ 18 - 2
docs/reference/modules/transport.asciidoc

@@ -49,8 +49,19 @@ time setting format). Defaults to `30s`.
 
 `transport.compress`::
 (<<static-cluster-setting,Static>>)
-Set to `true` to enable compression (`DEFLATE`) between
-all nodes. Defaults to `false`.
+Set to `true`, `indexing_data`, or `false` to configure transport compression
+between nodes. The option `true` will compress all data. The option
+`indexing_data` will compress only the raw index data sent between nodes during
+ingest, ccr following (excluding bootstrap), and operations based shard recovery
+(excluding transferring lucene files). The `indexing_data` option is experimental.
+Defaults to `false`.
+
+`transport.compression_scheme`::
+(<<static-cluster-setting,Static>>)
+Configures the compression scheme for `transport.compress`. The options are
+`deflate` or `lz4`. The option `lz4` is experimental. If `lz4` is configured and
+ the remote node has not been upgraded to a version supporting `lz4`, the traffic
+ will be sent uncompressed. Defaults to `deflate`.
 
 `transport.ping_schedule`::
 (<<static-cluster-setting,Static>>)
@@ -172,6 +183,11 @@ normally makes sense for local cluster communication as compression has a
 noticeable CPU cost and local clusters tend to be set up with fast network
 connections between nodes.
 
+The `transport.compress` configuration option `indexing_data` will only
+compress requests that relate to the transport of raw indexing source data
+between nodes. This option primarily compresses data sent during ingest,
+ccr, and shard recovery. This option is experimental.
+
 The `transport.compress` setting always configures local cluster request
 compression and is the fallback setting for remote cluster request compression.
 If you want to configure remote request compression differently than local

+ 42 - 2
qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java

@@ -10,31 +10,35 @@ package org.elasticsearch.upgrades;
 
 import org.apache.http.util.EntityUtils;
 import org.elasticsearch.Version;
+import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsResponse;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
-import org.elasticsearch.core.Booleans;
-import org.elasticsearch.core.CheckedFunction;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
+import org.elasticsearch.core.Booleans;
+import org.elasticsearch.core.CheckedFunction;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.rest.action.admin.indices.RestPutIndexTemplateAction;
 import org.elasticsearch.test.NotEqualMessageBuilder;
 import org.elasticsearch.test.XContentTestUtils;
 import org.elasticsearch.test.rest.ESRestTestCase;
 import org.elasticsearch.test.rest.yaml.ObjectPath;
+import org.elasticsearch.transport.Compression;
 import org.junit.Before;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Base64;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -51,6 +55,7 @@ import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SYS
 import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
 import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_COMPRESS;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
@@ -1588,6 +1593,41 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {
         }
     }
 
+    /**
+     * In 7.14 the cluster.remote.*.transport.compress setting was change from a boolean to an enum setting
+     * with true/false as options. This test ensures that the old boolean setting in cluster state is
+     * translated properly. This test can be removed in 9.0.
+     */
+    public void testTransportCompressionSetting() throws IOException {
+        if (isRunningAgainstOldCluster()) {
+            final Request putSettingsRequest = new Request("PUT", "/_cluster/settings");
+            try (XContentBuilder builder = jsonBuilder()) {
+                builder.startObject();
+                {
+                    builder.startObject("persistent");
+                    {
+                        builder.field("cluster.remote.foo.seeds", Collections.singletonList("localhost:9200"));
+                        builder.field("cluster.remote.foo.transport.compress", "true");
+                    }
+                    builder.endObject();
+                }
+                builder.endObject();
+                putSettingsRequest.setJsonEntity(Strings.toString(builder));
+            }
+            client().performRequest(putSettingsRequest);
+        } else {
+            final Request getSettingsRequest = new Request("GET", "/_cluster/settings");
+            final Response getSettingsResponse = client().performRequest(getSettingsRequest);
+            try (XContentParser parser = createParser(JsonXContent.jsonXContent, getSettingsResponse.getEntity().getContent())) {
+                final ClusterGetSettingsResponse clusterGetSettingsResponse = ClusterGetSettingsResponse.fromXContent(parser);
+                final Settings settings = clusterGetSettingsResponse.getPersistentSettings();
+                assertThat(
+                    REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("foo").get(settings),
+                    equalTo(Compression.Enabled.TRUE));
+            }
+        }
+    }
+
     public static void assertNumHits(String index, int numHits, int totalShards) throws IOException {
         Map<String, Object> resp = entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search")));
         assertNoFailures(resp);

+ 8 - 0
server/build.gradle

@@ -50,6 +50,9 @@ dependencies {
   api project(":libs:elasticsearch-cli")
   api 'com.carrotsearch:hppc:0.8.1'
 
+  // LZ4
+  api 'org.lz4:lz4-java:1.8.0'
+
   // time handling, remove with java 8 time
   api "joda-time:joda-time:${versions.joda}"
 
@@ -266,6 +269,11 @@ tasks.named("thirdPartyAudit").configure {
             'com.google.common.geometry.S2LatLng'
     )
     ignoreMissingClasses 'javax.xml.bind.DatatypeConverter'
+
+    ignoreViolations(
+      // from java-lz4
+      'net.jpountz.util.UnsafeUtils'
+    )
 }
 
 tasks.named("dependencyLicenses").configure {

+ 1 - 0
server/licenses/lz4-java-1.8.0.jar.sha1

@@ -0,0 +1 @@
+4b986a99445e49ea5fbf5d149c4b63f6ed6c6780

+ 202 - 0
server/licenses/lz4-java-LICENSE.txt

@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.

+ 0 - 0
server/licenses/lz4-java-NOTICE.txt


+ 2 - 1
server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java

@@ -18,13 +18,14 @@ import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.transport.RawIndexingDataTransportRequest;
 
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.stream.Stream;
 
-public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> implements Accountable {
+public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> implements Accountable, RawIndexingDataTransportRequest {
 
     private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkShardRequest.class);
 

+ 2 - 0
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -296,6 +296,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
             RemoteClusterService.REMOTE_NODE_ATTRIBUTE,
             RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE,
             RemoteClusterService.REMOTE_CLUSTER_COMPRESS,
+            RemoteClusterService.REMOTE_CLUSTER_COMPRESSION_SCHEME,
             RemoteConnectionStrategy.REMOTE_CONNECTION_MODE,
             ProxyConnectionStrategy.PROXY_ADDRESS,
             ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
@@ -320,6 +321,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
             TransportSettings.PUBLISH_PORT,
             TransportSettings.PUBLISH_PORT_PROFILE,
             TransportSettings.TRANSPORT_COMPRESS,
+            TransportSettings.TRANSPORT_COMPRESSION_SCHEME,
             TransportSettings.PING_SCHEDULE,
             TransportSettings.CONNECT_TIMEOUT,
             TransportSettings.DEFAULT_FEATURES_SETTING,

+ 17 - 0
server/src/main/java/org/elasticsearch/common/settings/Setting.java

@@ -1405,6 +1405,23 @@ public class Setting<T> implements ToXContentObject {
         return new Setting<>(key, defaultValue.toString(), e -> Enum.valueOf(clazz, e.toUpperCase(Locale.ROOT)), properties);
     }
 
+    /**
+     * Creates a setting where the allowed values are defined as enum constants. All enum constants must be uppercase.
+     *
+     * @param clazz the enum class
+     * @param key the key for the setting
+     * @param fallbackSetting the fallback setting for this setting
+     * @param validator validator for this setting
+     * @param properties properties for this setting like scope, filtering...
+     * @param <T> the generics type parameter reflecting the actual type of the enum
+     * @return the setting object
+     */
+    public static <T extends Enum<T>> Setting<T> enumSetting(Class<T> clazz, String key, Setting<T> fallbackSetting,
+                                                             Validator<T> validator, Property... properties) {
+        return new Setting<>(new SimpleKey(key), fallbackSetting, fallbackSetting::getRaw,
+            e -> Enum.valueOf(clazz, e.toUpperCase(Locale.ROOT)), validator, properties);
+    }
+
     /**
      * Creates a setting which specifies a memory size. This can either be
      * specified as an absolute bytes value or as a percentage of the heap

+ 1 - 1
server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java

@@ -74,7 +74,7 @@ public class HandshakingTransportAddressConnector implements TransportAddressCon
                 logger.trace("[{}] opening probe connection", thisConnectionAttempt);
                 transportService.openConnection(targetNode,
                     ConnectionProfile.buildSingleChannelProfile(Type.REG, probeConnectTimeout, probeHandshakeTimeout,
-                        TimeValue.MINUS_ONE, null), listener.delegateFailure((l, connection) -> {
+                        TimeValue.MINUS_ONE, null, null), listener.delegateFailure((l, connection) -> {
                         logger.trace("[{}] opened probe connection", thisConnectionAttempt);
 
                         // use NotifyOnceListener to make sure the following line does not result in onFailure being called when

+ 2 - 1
server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java

@@ -13,11 +13,12 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.index.seqno.RetentionLeases;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.transport.RawIndexingDataTransportRequest;
 
 import java.io.IOException;
 import java.util.List;
 
-public class RecoveryTranslogOperationsRequest extends RecoveryTransportRequest {
+public class RecoveryTranslogOperationsRequest extends RecoveryTransportRequest implements RawIndexingDataTransportRequest {
 
     private final long recoveryId;
     private final ShardId shardId;

+ 84 - 0
server/src/main/java/org/elasticsearch/transport/Compression.java

@@ -0,0 +1,84 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.transport;
+
+import net.jpountz.lz4.LZ4Factory;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.common.bytes.BytesReference;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class Compression {
+
+    public enum Scheme {
+        LZ4,
+        DEFLATE;
+
+        // TODO: Change after backport
+        static final Version LZ4_VERSION = Version.V_8_0_0;
+        static final int HEADER_LENGTH = 4;
+        private static final byte[] DEFLATE_HEADER = new byte[]{'D', 'F', 'L', '\0'};
+        private static final byte[] LZ4_HEADER = new byte[]{'L', 'Z', '4', '\0'};
+        private static final int LZ4_BLOCK_SIZE;
+
+        static {
+            String blockSizeString = System.getProperty("es.transport.compression.lz4_block_size");
+            if (blockSizeString != null) {
+                int lz4BlockSize = Integer.parseInt(blockSizeString);
+                if (lz4BlockSize < 1024 || lz4BlockSize > (512 * 1024)) {
+                    throw new IllegalArgumentException("lz4_block_size must be >= 1KB and <= 512KB");
+                }
+                LZ4_BLOCK_SIZE = lz4BlockSize;
+            } else {
+                LZ4_BLOCK_SIZE = 64 * 1024;
+            }
+        }
+
+        public static boolean isDeflate(BytesReference bytes) {
+            byte firstByte = bytes.get(0);
+            if (firstByte != Compression.Scheme.DEFLATE_HEADER[0]) {
+                return false;
+            } else {
+                return validateHeader(bytes, DEFLATE_HEADER);
+            }
+        }
+
+        public static boolean isLZ4(BytesReference bytes) {
+            byte firstByte = bytes.get(0);
+            if (firstByte != Scheme.LZ4_HEADER[0]) {
+                return false;
+            } else {
+                return validateHeader(bytes, LZ4_HEADER);
+            }
+        }
+
+        private static boolean validateHeader(BytesReference bytes, byte[] header) {
+            for (int i = 1; i < Compression.Scheme.HEADER_LENGTH; ++i) {
+                if (bytes.get(i) != header[i]) {
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        public static OutputStream lz4OutputStream(OutputStream outputStream) throws IOException {
+            outputStream.write(LZ4_HEADER);
+            return new ReuseBuffersLZ4BlockOutputStream(outputStream, LZ4_BLOCK_SIZE, LZ4Factory.safeInstance().fastCompressor());
+        }
+    }
+
+    public enum Enabled {
+        TRUE,
+        INDEXING_DATA,
+        FALSE
+    }
+
+}

+ 40 - 10
server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java

@@ -35,7 +35,8 @@ public final class ConnectionProfile {
         if (profile == null) {
             return fallbackProfile;
         } else if (profile.getConnectTimeout() != null && profile.getHandshakeTimeout() != null
-            && profile.getPingInterval() != null && profile.getCompressionEnabled() != null) {
+            && profile.getPingInterval() != null && profile.getCompressionEnabled() != null
+            && profile.getCompressionScheme() != null) {
             return profile;
         } else {
             ConnectionProfile.Builder builder = new ConnectionProfile.Builder(profile);
@@ -51,6 +52,9 @@ public final class ConnectionProfile {
             if (profile.getCompressionEnabled() == null) {
                 builder.setCompressionEnabled(fallbackProfile.getCompressionEnabled());
             }
+            if (profile.getCompressionScheme() == null) {
+                builder.setCompressionScheme(fallbackProfile.getCompressionScheme());
+            }
             return builder.build();
         }
     }
@@ -72,6 +76,7 @@ public final class ConnectionProfile {
         builder.setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings));
         builder.setPingInterval(TransportSettings.PING_SCHEDULE.get(settings));
         builder.setCompressionEnabled(TransportSettings.TRANSPORT_COMPRESS.get(settings));
+        builder.setCompressionScheme(TransportSettings.TRANSPORT_COMPRESSION_SCHEME.get(settings));
         builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
         builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
         // if we are not master eligible we don't need a dedicated channel to publish the state
@@ -89,7 +94,8 @@ public final class ConnectionProfile {
      */
     public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType, @Nullable TimeValue connectTimeout,
                                                               @Nullable TimeValue handshakeTimeout, @Nullable TimeValue pingInterval,
-                                                              @Nullable Boolean compressionEnabled) {
+                                                              @Nullable Compression.Enabled compressionEnabled,
+                                                              @Nullable Compression.Scheme compressionScheme) {
         Builder builder = new Builder();
         builder.addConnections(1, channelType);
         final EnumSet<TransportRequestOptions.Type> otherTypes = EnumSet.allOf(TransportRequestOptions.Type.class);
@@ -107,6 +113,9 @@ public final class ConnectionProfile {
         if (compressionEnabled != null) {
             builder.setCompressionEnabled(compressionEnabled);
         }
+        if (compressionScheme != null) {
+            builder.setCompressionScheme(compressionScheme);
+        }
         return builder.build();
     }
 
@@ -115,16 +124,19 @@ public final class ConnectionProfile {
     private final TimeValue connectTimeout;
     private final TimeValue handshakeTimeout;
     private final TimeValue pingInterval;
-    private final Boolean compressionEnabled;
+    private final Compression.Enabled compressionEnabled;
+    private final Compression.Scheme compressionScheme;
 
     private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue connectTimeout,
-                              TimeValue handshakeTimeout, TimeValue pingInterval, Boolean compressionEnabled) {
+                              TimeValue handshakeTimeout, TimeValue pingInterval, Compression.Enabled compressionEnabled,
+                              Compression.Scheme compressionScheme) {
         this.handles = handles;
         this.numConnections = numConnections;
         this.connectTimeout = connectTimeout;
         this.handshakeTimeout = handshakeTimeout;
         this.pingInterval = pingInterval;
         this.compressionEnabled = compressionEnabled;
+        this.compressionScheme = compressionScheme;
     }
 
     /**
@@ -136,7 +148,8 @@ public final class ConnectionProfile {
         private int numConnections = 0;
         private TimeValue connectTimeout;
         private TimeValue handshakeTimeout;
-        private Boolean compressionEnabled;
+        private Compression.Enabled compressionEnabled;
+        private Compression.Scheme compressionScheme;
         private TimeValue pingInterval;
 
         /** create an empty builder */
@@ -151,6 +164,7 @@ public final class ConnectionProfile {
             connectTimeout = source.getConnectTimeout();
             handshakeTimeout = source.getHandshakeTimeout();
             compressionEnabled = source.getCompressionEnabled();
+            compressionScheme = source.getCompressionScheme();
             pingInterval = source.getPingInterval();
         }
         /**
@@ -184,13 +198,21 @@ public final class ConnectionProfile {
         }
 
         /**
-         * Sets compression enabled for this connection profile
+         * Sets compression enabled configuration for this connection profile
          */
-        public Builder setCompressionEnabled(boolean compressionEnabled) {
+        public Builder setCompressionEnabled(Compression.Enabled compressionEnabled) {
             this.compressionEnabled = compressionEnabled;
             return this;
         }
 
+        /**
+         * Sets compression scheme for this connection profile
+         */
+        public Builder setCompressionScheme(Compression.Scheme compressionScheme) {
+            this.compressionScheme = compressionScheme;
+            return this;
+        }
+
         /**
          * Adds a number of connections for one or more types. Each type can only be added once.
          * @param numConnections the number of connections to use in the pool for the given connection types
@@ -222,7 +244,7 @@ public final class ConnectionProfile {
                 throw new IllegalStateException("not all types are added for this connection profile - missing types: " + types);
             }
             return new ConnectionProfile(Collections.unmodifiableList(handles), numConnections, connectTimeout, handshakeTimeout,
-                pingInterval, compressionEnabled);
+                pingInterval, compressionEnabled, compressionScheme);
         }
 
     }
@@ -249,13 +271,21 @@ public final class ConnectionProfile {
     }
 
     /**
-     * Returns boolean indicating if compression is enabled or <code>null</code> if no explicit compression
+     * Returns the compression enabled configuration or <code>null</code> if no explicit compression configuration
      * is set on this profile.
      */
-    public Boolean getCompressionEnabled() {
+    public Compression.Enabled getCompressionEnabled() {
         return compressionEnabled;
     }
 
+    /**
+     * Returns the configured compression scheme or <code>null</code> if no explicit
+     * compression scheme is set on this profile.
+     */
+    public Compression.Scheme getCompressionScheme() {
+        return compressionScheme;
+    }
+
     /**
      * Returns the total number of connections for this profile
      */

+ 122 - 0
server/src/main/java/org/elasticsearch/transport/DeflateTransportDecompressor.java

@@ -0,0 +1,122 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.transport;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefIterator;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
+import org.elasticsearch.common.recycler.Recycler;
+import org.elasticsearch.common.util.PageCacheRecycler;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+
+public class DeflateTransportDecompressor implements TransportDecompressor {
+
+    private final Inflater inflater;
+    private final PageCacheRecycler recycler;
+    private final ArrayDeque<Recycler.V<byte[]>> pages;
+    private int pageOffset = PageCacheRecycler.BYTE_PAGE_SIZE;
+    private boolean hasSkippedHeader = false;
+
+    public DeflateTransportDecompressor(PageCacheRecycler recycler) {
+        this.recycler = recycler;
+        inflater = new Inflater(true);
+        pages = new ArrayDeque<>(4);
+    }
+
+    @Override
+    public int decompress(BytesReference bytesReference) throws IOException {
+        int bytesConsumed = 0;
+        if (hasSkippedHeader == false) {
+            hasSkippedHeader = true;
+            int headerLength = Compression.Scheme.HEADER_LENGTH;
+            bytesReference = bytesReference.slice(headerLength, bytesReference.length() - headerLength);
+            bytesConsumed += headerLength;
+        }
+
+        BytesRefIterator refIterator = bytesReference.iterator();
+        BytesRef ref;
+        while ((ref = refIterator.next()) != null) {
+            inflater.setInput(ref.bytes, ref.offset, ref.length);
+            bytesConsumed += ref.length;
+            boolean continueInflating = true;
+            while (continueInflating) {
+                final boolean isNewPage = pageOffset == PageCacheRecycler.BYTE_PAGE_SIZE;
+                if (isNewPage) {
+                    pageOffset = 0;
+                    pages.add(recycler.bytePage(false));
+                }
+                final Recycler.V<byte[]> page = pages.getLast();
+
+                byte[] output = page.v();
+                try {
+                    int bytesInflated = inflater.inflate(output, pageOffset, PageCacheRecycler.BYTE_PAGE_SIZE - pageOffset);
+                    pageOffset += bytesInflated;
+                    if (isNewPage) {
+                        if (bytesInflated == 0) {
+                            Recycler.V<byte[]> removed = pages.pollLast();
+                            assert removed == page;
+                            removed.close();
+                            pageOffset = PageCacheRecycler.BYTE_PAGE_SIZE;
+                        }
+                    }
+                } catch (DataFormatException e) {
+                    throw new IOException("Exception while inflating bytes", e);
+                }
+                if (inflater.needsInput()) {
+                    continueInflating = false;
+                }
+                if (inflater.finished()) {
+                    bytesConsumed -= inflater.getRemaining();
+                    continueInflating = false;
+                }
+                assert inflater.needsDictionary() == false;
+            }
+        }
+
+        return bytesConsumed;
+    }
+
+    public boolean isEOS() {
+        return inflater.finished();
+    }
+
+    @Override
+    public ReleasableBytesReference pollDecompressedPage(boolean isEOS) {
+        if (pages.isEmpty()) {
+            return null;
+        } else if (pages.size() == 1) {
+            if (isEOS) {
+                assert isEOS();
+                Recycler.V<byte[]> page = pages.pollFirst();
+                ReleasableBytesReference reference = new ReleasableBytesReference(new BytesArray(page.v(), 0, pageOffset), page);
+                pageOffset = 0;
+                return reference;
+            } else {
+                return null;
+            }
+        } else {
+            Recycler.V<byte[]> page = pages.pollFirst();
+            return new ReleasableBytesReference(new BytesArray(page.v()), page);
+        }
+    }
+
+    @Override
+    public void close() {
+        inflater.end();
+        for (Recycler.V<byte[]> page : pages) {
+            page.close();
+        }
+    }
+}

+ 25 - 14
server/src/main/java/org/elasticsearch/transport/InboundDecoder.java

@@ -29,6 +29,7 @@ public class InboundDecoder implements Releasable {
     private TransportDecompressor decompressor;
     private int totalNetworkSize = -1;
     private int bytesConsumed = 0;
+    private boolean isCompressed = false;
     private boolean isClosed = false;
 
     public InboundDecoder(Version version, PageCacheRecycler recycler) {
@@ -64,7 +65,7 @@ public class InboundDecoder implements Releasable {
                     Header header = readHeader(version, messageLength, reference);
                     bytesConsumed += headerBytesToRead;
                     if (header.isCompressed()) {
-                        decompressor = new TransportDecompressor(recycler);
+                        isCompressed = true;
                     }
                     fragmentConsumer.accept(header);
 
@@ -75,32 +76,42 @@ public class InboundDecoder implements Releasable {
                 }
             }
         } else {
-            // There are a minimum number of bytes required to start decompression
-            if (decompressor != null && decompressor.canDecompress(reference.length()) == false) {
-                return 0;
+            if (isCompressed && decompressor == null) {
+                // Attempt to initialize decompressor
+                TransportDecompressor decompressor = TransportDecompressor.getDecompressor(recycler, reference);
+                if (decompressor == null) {
+                    return 0;
+                } else {
+                    this.decompressor = decompressor;
+                }
             }
-            int bytesToConsume = Math.min(reference.length(), totalNetworkSize - bytesConsumed);
-            bytesConsumed += bytesToConsume;
+            int remainingToConsume = totalNetworkSize - bytesConsumed;
+            int maxBytesToConsume = Math.min(reference.length(), remainingToConsume);
             ReleasableBytesReference retainedContent;
-            if (isDone()) {
-                retainedContent = reference.retainedSlice(0, bytesToConsume);
+            if (maxBytesToConsume == remainingToConsume) {
+                retainedContent = reference.retainedSlice(0, maxBytesToConsume);
             } else {
                 retainedContent = reference.retain();
             }
+
+            int bytesConsumedThisDecode = 0;
             if (decompressor != null) {
-                decompress(retainedContent);
+                bytesConsumedThisDecode += decompress(retainedContent);
+                bytesConsumed += bytesConsumedThisDecode;
                 ReleasableBytesReference decompressed;
-                while ((decompressed = decompressor.pollDecompressedPage()) != null) {
+                while ((decompressed = decompressor.pollDecompressedPage(isDone())) != null) {
                     fragmentConsumer.accept(decompressed);
                 }
             } else {
+                bytesConsumedThisDecode += maxBytesToConsume;
+                bytesConsumed += maxBytesToConsume;
                 fragmentConsumer.accept(retainedContent);
             }
             if (isDone()) {
                 finishMessage(fragmentConsumer);
             }
 
-            return bytesToConsume;
+            return bytesConsumedThisDecode;
         }
     }
 
@@ -119,16 +130,16 @@ public class InboundDecoder implements Releasable {
         try {
             Releasables.closeExpectNoException(decompressor);
         } finally {
+            isCompressed = false;
             decompressor = null;
             totalNetworkSize = -1;
             bytesConsumed = 0;
         }
     }
 
-    private void decompress(ReleasableBytesReference content) throws IOException {
+    private int decompress(ReleasableBytesReference content) throws IOException {
         try (content) {
-            int consumed = decompressor.decompress(content);
-            assert consumed == content.length();
+            return decompressor.decompress(content);
         }
     }
 

+ 352 - 0
server/src/main/java/org/elasticsearch/transport/Lz4TransportDecompressor.java

@@ -0,0 +1,352 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.elasticsearch.transport;
+
+import net.jpountz.lz4.LZ4Exception;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.recycler.Recycler;
+import org.elasticsearch.common.util.PageCacheRecycler;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Locale;
+import java.util.zip.Checksum;
+
+/**
+ * This file is forked from the https://netty.io project. In particular it forks the following file
+ * io.netty.handler.codec.compression.Lz4FrameDecoder.
+ *
+ * It modifies the original netty code to operate on byte arrays opposed to ByteBufs.
+ * Additionally, it integrates the decompression code to work in the Elasticsearch transport
+ * pipeline, Finally, it replaces the custom Netty decoder exceptions.
+ *
+ * This class is necessary as Netty is not a dependency in Elasticsearch server module.
+ */
+public class Lz4TransportDecompressor implements TransportDecompressor {
+
+    private static final ThreadLocal<byte[]> DECOMPRESSED = ThreadLocal.withInitial(() -> BytesRef.EMPTY_BYTES);
+    private static final ThreadLocal<byte[]> COMPRESSED = ThreadLocal.withInitial(() -> BytesRef.EMPTY_BYTES);
+
+    /**
+     * Magic number of LZ4 block.
+     */
+    static final long MAGIC_NUMBER = (long) 'L' << 56 |
+        (long) 'Z' << 48 |
+        (long) '4' << 40 |
+        (long) 'B' << 32 |
+        'l' << 24 |
+        'o' << 16 |
+        'c' << 8  |
+        'k';
+
+    static final int HEADER_LENGTH = 8 +  // magic number
+        1 +  // token
+        4 +  // compressed length
+        4 +  // decompressed length
+        4;   // checksum
+
+
+    /**
+     * Base value for compression level.
+     */
+    static final int COMPRESSION_LEVEL_BASE = 10;
+
+    static final int MIN_BLOCK_SIZE = 64;
+    static final int MAX_BLOCK_SIZE = 1 << COMPRESSION_LEVEL_BASE + 0x0F;   //  32 M
+    static final int DEFAULT_BLOCK_SIZE = 1 << 16;  // 64 KB
+
+    static final int BLOCK_TYPE_NON_COMPRESSED = 0x10;
+    static final int BLOCK_TYPE_COMPRESSED = 0x20;
+
+    private enum State {
+        INIT_BLOCK,
+        DECOMPRESS_DATA,
+        FINISHED,
+        CORRUPTED
+    }
+
+    private State currentState = State.INIT_BLOCK;
+
+    /**
+     * Underlying decompressor in use.
+     */
+    private LZ4FastDecompressor decompressor;
+
+    /**
+     * Underlying checksum calculator in use.
+     */
+    private Checksum checksum;
+
+    /**
+     * Type of current block.
+     */
+    private int blockType;
+
+    /**
+     * Compressed length of current incoming block.
+     */
+    private int compressedLength;
+
+    /**
+     * Decompressed length of current incoming block.
+     */
+    private int decompressedLength;
+
+    /**
+     * Checksum value of current incoming block.
+     */
+    private int currentChecksum;
+
+    private final PageCacheRecycler recycler;
+    private final ArrayDeque<Recycler.V<byte[]>> pages;
+    private int pageOffset = PageCacheRecycler.BYTE_PAGE_SIZE;
+    private boolean hasSkippedESHeader = false;
+
+    public Lz4TransportDecompressor(PageCacheRecycler recycler) {
+        this.decompressor = LZ4Factory.safeInstance().fastDecompressor();
+        this.recycler = recycler;
+        this.pages = new ArrayDeque<>(4);
+        this.checksum = null;
+    }
+
+    @Override
+    public ReleasableBytesReference pollDecompressedPage(boolean isEOS) {
+        if (pages.isEmpty()) {
+            return null;
+        } else if (pages.size() == 1) {
+            if (isEOS) {
+                Recycler.V<byte[]> page = pages.pollFirst();
+                ReleasableBytesReference reference = new ReleasableBytesReference(new BytesArray(page.v(), 0, pageOffset), page);
+                pageOffset = 0;
+                return reference;
+            } else {
+                return null;
+            }
+        } else {
+            Recycler.V<byte[]> page = pages.pollFirst();
+            return new ReleasableBytesReference(new BytesArray(page.v()), page);
+        }
+    }
+
+    @Override
+    public void close() {
+        for (Recycler.V<byte[]> page : pages) {
+            page.close();
+        }
+    }
+
+    @Override
+    public int decompress(BytesReference bytesReference) throws IOException {
+        int bytesConsumed = 0;
+        if (hasSkippedESHeader == false) {
+            hasSkippedESHeader = true;
+            int esHeaderLength = Compression.Scheme.HEADER_LENGTH;
+            bytesReference = bytesReference.slice(esHeaderLength, bytesReference.length() - esHeaderLength);
+            bytesConsumed += esHeaderLength;
+        }
+
+        while (true) {
+            int consumed = decodeBlock(bytesReference);
+            bytesConsumed += consumed;
+            int newLength = bytesReference.length() - consumed;
+            if (consumed > 0 && newLength > 0) {
+                bytesReference = bytesReference.slice(consumed, newLength);
+            } else {
+                break;
+            }
+        }
+
+        return bytesConsumed;
+    }
+
+    private int decodeBlock(BytesReference reference) throws IOException {
+        int bytesConsumed = 0;
+        try {
+            switch (currentState) {
+                case INIT_BLOCK:
+                    if (reference.length() < HEADER_LENGTH) {
+                        return bytesConsumed;
+                    }
+                    try (StreamInput in = reference.streamInput()) {
+                        final long magic = in.readLong();
+                        if (magic != MAGIC_NUMBER) {
+                            throw new IllegalStateException("unexpected block identifier");
+                        }
+
+                        final int token = in.readByte();
+                        final int compressionLevel = (token & 0x0F) + COMPRESSION_LEVEL_BASE;
+                        int blockType = token & 0xF0;
+
+                        int compressedLength = Integer.reverseBytes(in.readInt());
+                        if (compressedLength < 0 || compressedLength > MAX_BLOCK_SIZE) {
+                            throw new IllegalStateException(String.format(Locale.ROOT,
+                                "invalid compressedLength: %d (expected: 0-%d)",
+                                compressedLength, MAX_BLOCK_SIZE));
+                        }
+
+                        int decompressedLength = Integer.reverseBytes(in.readInt());
+                        final int maxDecompressedLength = 1 << compressionLevel;
+                        if (decompressedLength < 0 || decompressedLength > maxDecompressedLength) {
+                            throw new IllegalStateException(String.format(Locale.ROOT,
+                                "invalid decompressedLength: %d (expected: 0-%d)",
+                                decompressedLength, maxDecompressedLength));
+                        }
+                        if (decompressedLength == 0 && compressedLength != 0
+                            || decompressedLength != 0 && compressedLength == 0
+                            || blockType == BLOCK_TYPE_NON_COMPRESSED && decompressedLength != compressedLength) {
+                            throw new IllegalStateException(String.format(Locale.ROOT,
+                                "stream corrupted: compressedLength(%d) and decompressedLength(%d) mismatch",
+                                compressedLength, decompressedLength));
+                        }
+
+                        int currentChecksum = Integer.reverseBytes(in.readInt());
+                        bytesConsumed += HEADER_LENGTH;
+
+                        if (decompressedLength == 0) {
+                            if (currentChecksum != 0) {
+                                throw new IllegalStateException("stream corrupted: checksum error");
+                            }
+                            currentState = State.FINISHED;
+                            decompressor = null;
+                            checksum = null;
+                            break;
+                        }
+
+                        this.blockType = blockType;
+                        this.compressedLength = compressedLength;
+                        this.decompressedLength = decompressedLength;
+                        this.currentChecksum = currentChecksum;
+                    }
+
+                    currentState = State.DECOMPRESS_DATA;
+                    break;
+                case DECOMPRESS_DATA:
+                    if (reference.length() < compressedLength) {
+                        break;
+                    }
+
+                    final Checksum checksum = this.checksum;
+                    byte[] decompressed = getThreadLocalBuffer(DECOMPRESSED, decompressedLength);
+
+                    try {
+                        switch (blockType) {
+                            case BLOCK_TYPE_NON_COMPRESSED:
+                                try (StreamInput streamInput = reference.streamInput()) {
+                                    streamInput.readBytes(decompressed, 0, decompressedLength);
+                                }
+                                break;
+                            case BLOCK_TYPE_COMPRESSED:
+                                BytesRef ref = reference.iterator().next();
+                                final byte[] compressed;
+                                final int compressedOffset;
+                                if (ref.length >= compressedLength) {
+                                    compressed = ref.bytes;
+                                    compressedOffset = ref.offset;
+                                } else {
+                                    compressed = getThreadLocalBuffer(COMPRESSED, compressedLength);
+                                    compressedOffset = 0;
+                                    try (StreamInput streamInput = reference.streamInput()) {
+                                        streamInput.readBytes(compressed, 0, compressedLength);
+                                    }
+                                }
+                                decompressor.decompress(compressed, compressedOffset, decompressed, 0, decompressedLength);
+                                break;
+                            default:
+                                throw new IllegalStateException(String.format(Locale.ROOT,
+                                    "unexpected blockType: %d (expected: %d or %d)",
+                                    blockType, BLOCK_TYPE_NON_COMPRESSED, BLOCK_TYPE_COMPRESSED));
+                        }
+                        // Skip inbound bytes after we processed them.
+                        bytesConsumed += compressedLength;
+
+                        if (checksum != null) {
+                            checksum.reset();
+                            checksum.update(decompressed, 0, decompressedLength);
+                            final int checksumResult = (int) checksum.getValue();
+                            if (checksumResult != currentChecksum) {
+                                throw new IllegalStateException(String.format(Locale.ROOT,
+                                    "stream corrupted: mismatching checksum: %d (expected: %d)",
+                                    checksumResult, currentChecksum));
+                            }
+                        }
+
+                        int bytesToCopy = decompressedLength;
+                        int uncompressedOffset = 0;
+                        while (bytesToCopy > 0) {
+                            final boolean isNewPage = pageOffset == PageCacheRecycler.BYTE_PAGE_SIZE;
+                            if (isNewPage) {
+                                pageOffset = 0;
+                                pages.add(recycler.bytePage(false));
+                            }
+                            final Recycler.V<byte[]> page = pages.getLast();
+
+                            int toCopy = Math.min(bytesToCopy, PageCacheRecycler.BYTE_PAGE_SIZE - pageOffset);
+                            System.arraycopy(decompressed, uncompressedOffset, page.v(), pageOffset, toCopy);
+                            pageOffset += toCopy;
+                            bytesToCopy -= toCopy;
+                            uncompressedOffset += toCopy;
+                        }
+                        currentState = State.INIT_BLOCK;
+                    } catch (LZ4Exception e) {
+                        throw new IllegalStateException(e);
+                    }
+                    break;
+                case FINISHED:
+                    break;
+                case CORRUPTED:
+                    throw new IllegalStateException("LZ4 stream corrupted.");
+                default:
+                    throw new IllegalStateException();
+            }
+        } catch (IOException e) {
+            currentState = State.CORRUPTED;
+            throw e;
+        }
+        return bytesConsumed;
+    }
+
+    private byte[] getThreadLocalBuffer(ThreadLocal<byte[]> threadLocal, int requiredSize) {
+        byte[] buffer = threadLocal.get();
+        if (requiredSize > buffer.length) {
+            buffer = new byte[requiredSize];
+            threadLocal.set(buffer);
+        }
+        return buffer;
+    }
+
+    /**
+     * Returns {@code true} if and only if the end of the compressed stream
+     * has been reached.
+     */
+    public boolean isClosed() {
+        return currentState == State.FINISHED;
+    }
+}

+ 12 - 2
server/src/main/java/org/elasticsearch/transport/NetworkMessage.java

@@ -21,12 +21,18 @@ public abstract class NetworkMessage {
     protected final Writeable threadContext;
     protected final long requestId;
     protected final byte status;
+    protected final Compression.Scheme compressionScheme;
 
-    NetworkMessage(ThreadContext threadContext, Version version, byte status, long requestId) {
+    NetworkMessage(ThreadContext threadContext, Version version, byte status, long requestId, Compression.Scheme compressionScheme) {
         this.threadContext = threadContext.captureAsWriteable();
         this.version = version;
         this.requestId = requestId;
-        this.status = status;
+        this.compressionScheme = adjustedScheme(version, compressionScheme);
+        if (this.compressionScheme != null) {
+            this.status = TransportStatus.setCompress(status);
+        } else {
+            this.status = status;
+        }
     }
 
     public Version getVersion() {
@@ -56,4 +62,8 @@ public abstract class NetworkMessage {
     boolean isError() {
         return TransportStatus.isError(status);
     }
+
+    private static Compression.Scheme adjustedScheme(Version version, Compression.Scheme compressionScheme) {
+        return compressionScheme == Compression.Scheme.LZ4 && version.before(Compression.Scheme.LZ4_VERSION) ? null : compressionScheme;
+    }
 }

+ 20 - 5
server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

@@ -38,17 +38,20 @@ final class OutboundHandler {
     private final StatsTracker statsTracker;
     private final ThreadPool threadPool;
     private final BigArrays bigArrays;
+    private final Compression.Scheme configuredCompressionScheme;
 
     private volatile long slowLogThresholdMs = Long.MAX_VALUE;
 
     private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;
 
-    OutboundHandler(String nodeName, Version version, StatsTracker statsTracker, ThreadPool threadPool, BigArrays bigArrays) {
+    OutboundHandler(String nodeName, Version version, StatsTracker statsTracker, ThreadPool threadPool, BigArrays bigArrays,
+                    Compression.Scheme compressionScheme) {
         this.nodeName = nodeName;
         this.version = version;
         this.statsTracker = statsTracker;
         this.threadPool = threadPool;
         this.bigArrays = bigArrays;
+        this.configuredCompressionScheme = compressionScheme;
     }
 
     void setSlowLogThreshold(TimeValue slowLogThreshold) {
@@ -67,8 +70,14 @@ final class OutboundHandler {
                      final TransportRequest request, final TransportRequestOptions options, final Version channelVersion,
                      final boolean compressRequest, final boolean isHandshake) throws IOException, TransportException {
         Version version = Version.min(this.version, channelVersion);
+        final Compression.Scheme compressionScheme;
+        if (compressRequest) {
+            compressionScheme = configuredCompressionScheme;
+        } else {
+            compressionScheme = null;
+        }
         OutboundMessage.Request message =
-            new OutboundMessage.Request(threadPool.getThreadContext(), request, version, action, requestId, isHandshake, compressRequest);
+            new OutboundMessage.Request(threadPool.getThreadContext(), request, version, action, requestId, isHandshake, compressionScheme);
         if (request.tryIncRef() == false) {
             assert false : "request [" + request + "] has been released already";
             throw new AlreadyClosedException("request [" + request + "] has been released already");
@@ -90,10 +99,16 @@ final class OutboundHandler {
      * @see #sendErrorResponse(Version, TcpChannel, long, String, Exception) for sending error responses
      */
     void sendResponse(final Version nodeVersion, final TcpChannel channel, final long requestId, final String action,
-                      final TransportResponse response, final boolean compress, final boolean isHandshake) throws IOException {
+                      final TransportResponse response, final boolean compressResponse, final boolean isHandshake) throws IOException {
         Version version = Version.min(this.version, nodeVersion);
+        final Compression.Scheme compressionScheme;
+        if (compressResponse) {
+            compressionScheme = configuredCompressionScheme;
+        } else {
+            compressionScheme = null;
+        }
         OutboundMessage.Response message = new OutboundMessage.Response(threadPool.getThreadContext(), response, version,
-            requestId, isHandshake, compress);
+            requestId, isHandshake, compressionScheme);
         ActionListener<Void> listener = ActionListener.wrap(() -> messageListener.onResponseSent(requestId, action, response));
         sendMessage(channel, message, listener);
     }
@@ -107,7 +122,7 @@ final class OutboundHandler {
         TransportAddress address = new TransportAddress(channel.getLocalAddress());
         RemoteTransportException tx = new RemoteTransportException(nodeName, address, action, error);
         OutboundMessage.Response message = new OutboundMessage.Response(threadPool.getThreadContext(), tx, version, requestId,
-            false, false);
+            false, null);
         ActionListener<Void> listener = ActionListener.wrap(() -> messageListener.onResponseSent(requestId, action, error));
         sendMessage(channel, message, listener);
     }

+ 27 - 20
server/src/main/java/org/elasticsearch/transport/OutboundMessage.java

@@ -26,8 +26,9 @@ abstract class OutboundMessage extends NetworkMessage {
 
     protected final Writeable message;
 
-    OutboundMessage(ThreadContext threadContext, Version version, byte status, long requestId, Writeable message) {
-        super(threadContext, version, status, requestId);
+    OutboundMessage(ThreadContext threadContext, Version version, byte status, long requestId, Compression.Scheme compressionScheme,
+                    Writeable message) {
+        super(threadContext, version, status, requestId, compressionScheme);
         this.message = message;
     }
 
@@ -86,8 +87,14 @@ abstract class OutboundMessage extends NetworkMessage {
 
     // compressed stream wrapped bytes must be no-close wrapped since we need to close the compressed wrapper below to release
     // resources and write EOS marker bytes but must not yet release the bytes themselves
-    private OutputStreamStreamOutput wrapCompressed(BytesStreamOutput bytesStream) throws IOException {
-        return new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.noCloseStream(bytesStream)));
+    private StreamOutput wrapCompressed(BytesStreamOutput bytesStream) throws IOException {
+        if (compressionScheme == Compression.Scheme.DEFLATE) {
+            return new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.noCloseStream(bytesStream)));
+        } else if (compressionScheme == Compression.Scheme.LZ4) {
+            return new OutputStreamStreamOutput(Compression.Scheme.lz4OutputStream(Streams.noCloseStream(bytesStream)));
+        } else {
+            throw new IllegalArgumentException("Invalid compression scheme: " + compressionScheme);
+        }
     }
 
     protected void writeVariableHeader(StreamOutput stream) throws IOException {
@@ -99,8 +106,8 @@ abstract class OutboundMessage extends NetworkMessage {
         private final String action;
 
         Request(ThreadContext threadContext, Writeable message, Version version, String action, long requestId,
-                boolean isHandshake, boolean compress) {
-            super(threadContext, version, setStatus(compress, isHandshake, message), requestId, message);
+                boolean isHandshake, Compression.Scheme compressionScheme) {
+            super(threadContext, version, setStatus(isHandshake), requestId, adjustCompressionScheme(compressionScheme, message), message);
             this.action = action;
         }
 
@@ -114,12 +121,18 @@ abstract class OutboundMessage extends NetworkMessage {
             stream.writeString(action);
         }
 
-        private static byte setStatus(boolean compress, boolean isHandshake, Writeable message) {
+        // Do not compress instances of BytesTransportRequest
+        private static Compression.Scheme adjustCompressionScheme(Compression.Scheme compressionScheme, Writeable message) {
+            if (message instanceof BytesTransportRequest) {
+                return null;
+            } else {
+               return compressionScheme;
+            }
+        }
+
+        private static byte setStatus(boolean isHandshake) {
             byte status = 0;
             status = TransportStatus.setRequest(status);
-            if (compress && OutboundMessage.canCompress(message)) {
-                status = TransportStatus.setCompress(status);
-            }
             if (isHandshake) {
                 status = TransportStatus.setHandshake(status);
             }
@@ -136,19 +149,17 @@ abstract class OutboundMessage extends NetworkMessage {
 
     static class Response extends OutboundMessage {
 
-        Response(ThreadContext threadContext, Writeable message, Version version, long requestId, boolean isHandshake, boolean compress) {
-            super(threadContext, version, setStatus(compress, isHandshake, message), requestId, message);
+        Response(ThreadContext threadContext, Writeable message, Version version, long requestId, boolean isHandshake,
+                 Compression.Scheme compressionScheme) {
+            super(threadContext, version, setStatus(isHandshake, message), requestId, compressionScheme, message);
         }
 
-        private static byte setStatus(boolean compress, boolean isHandshake, Writeable message) {
+        private static byte setStatus(boolean isHandshake, Writeable message) {
             byte status = 0;
             status = TransportStatus.setResponse(status);
             if (message instanceof RemoteTransportException) {
                 status = TransportStatus.setError(status);
             }
-            if (compress) {
-                status = TransportStatus.setCompress(status);
-            }
             if (isHandshake) {
                 status = TransportStatus.setHandshake(status);
             }
@@ -162,8 +173,4 @@ abstract class OutboundMessage extends NetworkMessage {
                     + message.getClass() + "}";
         }
     }
-
-    private static boolean canCompress(Writeable message) {
-        return message instanceof BytesTransportRequest == false;
-    }
 }

+ 17 - 0
server/src/main/java/org/elasticsearch/transport/RawIndexingDataTransportRequest.java

@@ -0,0 +1,17 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.transport;
+
+/**
+ * Requests that implement this interface will be compressed when {@link TransportSettings#TRANSPORT_COMPRESS}
+ * is configured to {@link Compression.Enabled#INDEXING_DATA}. This is primary intended to be
+ * requests/responses primarily composed of raw source data.
+ */
+public interface RawIndexingDataTransportRequest {
+}

+ 9 - 2
server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

@@ -45,6 +45,7 @@ import java.util.function.Function;
 import java.util.stream.Stream;
 
 import static org.elasticsearch.common.settings.Setting.boolSetting;
+import static org.elasticsearch.common.settings.Setting.enumSetting;
 import static org.elasticsearch.common.settings.Setting.timeSetting;
 
 /**
@@ -89,10 +90,16 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
         (ns, key) -> timeSetting(key, TransportSettings.PING_SCHEDULE, new RemoteConnectionEnabled<>(ns, key),
             Setting.Property.Dynamic, Setting.Property.NodeScope));
 
-    public static final Setting.AffixSetting<Boolean> REMOTE_CLUSTER_COMPRESS = Setting.affixKeySetting(
+    public static final Setting.AffixSetting<Compression.Enabled> REMOTE_CLUSTER_COMPRESS = Setting.affixKeySetting(
         "cluster.remote.",
         "transport.compress",
-        (ns, key) -> boolSetting(key, TransportSettings.TRANSPORT_COMPRESS,
+        (ns, key) -> enumSetting(Compression.Enabled.class, key, TransportSettings.TRANSPORT_COMPRESS,
+            new RemoteConnectionEnabled<>(ns, key), Setting.Property.Dynamic, Setting.Property.NodeScope));
+
+    public static final Setting.AffixSetting<Compression.Scheme> REMOTE_CLUSTER_COMPRESSION_SCHEME = Setting.affixKeySetting(
+        "cluster.remote.",
+        "transport.compression_scheme",
+        (ns, key) -> enumSetting(Compression.Scheme.class, key, TransportSettings.TRANSPORT_COMPRESSION_SCHEME,
             new RemoteConnectionEnabled<>(ns, key), Setting.Property.Dynamic, Setting.Property.NodeScope));
 
     private final boolean enabled;

+ 9 - 2
server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java

@@ -125,6 +125,8 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
             .setConnectTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
             .setHandshakeTimeout(TransportSettings.CONNECT_TIMEOUT.get(settings))
             .setCompressionEnabled(RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings))
+            .setCompressionScheme(RemoteClusterService.REMOTE_CLUSTER_COMPRESSION_SCHEME
+                .getConcreteSettingForNamespace(clusterAlias).get(settings))
             .setPingInterval(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace(clusterAlias).get(settings))
             .addConnections(0, TransportRequestOptions.Type.BULK, TransportRequestOptions.Type.STATE,
                 TransportRequestOptions.Type.RECOVERY, TransportRequestOptions.Type.PING)
@@ -276,7 +278,10 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
         if (newMode.equals(strategyType()) == false) {
             return true;
         } else {
-            Boolean compressionEnabled = RemoteClusterService.REMOTE_CLUSTER_COMPRESS
+            Compression.Enabled compressionEnabled = RemoteClusterService.REMOTE_CLUSTER_COMPRESS
+                .getConcreteSettingForNamespace(clusterAlias)
+                .get(newSettings);
+            Compression.Scheme compressionScheme = RemoteClusterService.REMOTE_CLUSTER_COMPRESSION_SCHEME
                 .getConcreteSettingForNamespace(clusterAlias)
                 .get(newSettings);
             TimeValue pingSchedule = RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE
@@ -286,6 +291,7 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
             ConnectionProfile oldProfile = connectionManager.getConnectionProfile();
             ConnectionProfile.Builder builder = new ConnectionProfile.Builder(oldProfile);
             builder.setCompressionEnabled(compressionEnabled);
+            builder.setCompressionScheme(compressionScheme);
             builder.setPingInterval(pingSchedule);
             ConnectionProfile newProfile = builder.build();
             return connectionProfileChanged(oldProfile, newProfile) || strategyMustBeRebuilt(newSettings);
@@ -354,7 +360,8 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis
 
     private boolean connectionProfileChanged(ConnectionProfile oldProfile, ConnectionProfile newProfile) {
         return Objects.equals(oldProfile.getCompressionEnabled(), newProfile.getCompressionEnabled()) == false
-            || Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false;
+            || Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false
+            || Objects.equals(oldProfile.getCompressionScheme(), newProfile.getCompressionScheme()) == false;
     }
 
     static class StrategyValidator<T> implements Setting.Validator<T> {

+ 334 - 0
server/src/main/java/org/elasticsearch/transport/ReuseBuffersLZ4BlockOutputStream.java

@@ -0,0 +1,334 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+/*
+ * Copyright 2020 Adrien Grand and the lz4-java contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.elasticsearch.transport;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.Checksum;
+
+import net.jpountz.lz4.LZ4BlockInputStream;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FrameOutputStream;
+import net.jpountz.util.SafeUtils;
+import net.jpountz.xxhash.StreamingXXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+
+import org.apache.lucene.util.BytesRef;
+
+/**
+ * This file is forked from https://github.com/lz4/lz4-java. In particular it forks the following file
+ * net.jpountz.lz4.LZ4BlockOutputStream.
+ *
+ * It modifies the original lz4-java code to allow the reuse of local thread local byte arrays. This prevents
+ * the need to allocate two new byte arrays everytime a new stream is created. For the Elasticsearch use case,
+ * a single thread should fully compress the stream in one go to avoid memory corruption.
+ *
+ *
+ * Streaming LZ4 (not compatible with the LZ4 Frame format).
+ * This class compresses data into fixed-size blocks of compressed data.
+ * This class uses its own format and is not compatible with the LZ4 Frame format.
+ * For interoperability with other LZ4 tools, use {@link LZ4FrameOutputStream},
+ * which is compatible with the LZ4 Frame format. This class remains for backward compatibility.
+ * @see LZ4BlockInputStream
+ * @see LZ4FrameOutputStream
+ */
+public class ReuseBuffersLZ4BlockOutputStream extends FilterOutputStream {
+
+    private static class ArrayBox {
+        private byte[] uncompressed = BytesRef.EMPTY_BYTES;
+        private byte[] compressed = BytesRef.EMPTY_BYTES;
+        private boolean owned = false;
+
+        private void markOwnership(int uncompressedBlockSize, int compressedMaxSize) {
+            assert owned == false;
+            owned = true;
+            if (uncompressedBlockSize > uncompressed.length) {
+                uncompressed = new byte[uncompressedBlockSize];
+            }
+            if (compressedMaxSize > compressed.length) {
+                compressed = new byte[compressedMaxSize];
+            }
+        }
+
+        private void release() {
+            owned = false;
+        }
+    }
+
+    private static final ThreadLocal<ArrayBox> ARRAY_BOX = ThreadLocal.withInitial(ArrayBox::new);
+
+    static final byte[] MAGIC = new byte[] { 'L', 'Z', '4', 'B', 'l', 'o', 'c', 'k' };
+    static final int MAGIC_LENGTH = MAGIC.length;
+
+    static final int HEADER_LENGTH =
+        MAGIC_LENGTH // magic bytes
+            + 1          // token
+            + 4          // compressed length
+            + 4          // decompressed length
+            + 4;         // checksum
+
+    static final int COMPRESSION_LEVEL_BASE = 10;
+    static final int MIN_BLOCK_SIZE = 64;
+    static final int MAX_BLOCK_SIZE = 1 << (COMPRESSION_LEVEL_BASE + 0x0F);
+
+    static final int COMPRESSION_METHOD_RAW = 0x10;
+    static final int COMPRESSION_METHOD_LZ4 = 0x20;
+
+    static final int DEFAULT_SEED = 0x9747b28c;
+
+    private static int compressionLevel(int blockSize) {
+        if (blockSize < MIN_BLOCK_SIZE) {
+            throw new IllegalArgumentException("blockSize must be >= " + MIN_BLOCK_SIZE + ", got " + blockSize);
+        } else if (blockSize > MAX_BLOCK_SIZE) {
+            throw new IllegalArgumentException("blockSize must be <= " + MAX_BLOCK_SIZE + ", got " + blockSize);
+        }
+        int compressionLevel = 32 - Integer.numberOfLeadingZeros(blockSize - 1); // ceil of log2
+        assert (1 << compressionLevel) >= blockSize;
+        assert blockSize * 2 > (1 << compressionLevel);
+        compressionLevel = Math.max(0, compressionLevel - COMPRESSION_LEVEL_BASE);
+        assert compressionLevel >= 0 && compressionLevel <= 0x0F;
+        return compressionLevel;
+    }
+
+    private final int blockSize;
+    private final int compressionLevel;
+    private final LZ4Compressor compressor;
+    private final Checksum checksum;
+    private final ArrayBox arrayBox;
+    private final byte[] buffer;
+    private final byte[] compressedBuffer;
+    private final boolean syncFlush;
+    private boolean finished;
+    private int o;
+
+    /**
+     * Creates a new {@link OutputStream} with configurable block size. Large
+     * blocks require more memory at compression and decompression time but
+     * should improve the compression ratio.
+     *
+     * @param out         the {@link OutputStream} to feed
+     * @param blockSize   the maximum number of bytes to try to compress at once,
+     *                    must be &gt;= 64 and &lt;= 32 M
+     * @param compressor  the {@link LZ4Compressor} instance to use to compress
+     *                    data
+     * @param checksum    the {@link Checksum} instance to use to check data for
+     *                    integrity.
+     * @param syncFlush   true if pending data should also be flushed on {@link #flush()}
+     */
+    public ReuseBuffersLZ4BlockOutputStream(OutputStream out, int blockSize, LZ4Compressor compressor, Checksum checksum,
+                                            boolean syncFlush) {
+        super(out);
+        this.blockSize = blockSize;
+        this.compressor = compressor;
+        this.checksum = checksum;
+        this.compressionLevel = compressionLevel(blockSize);
+        final int compressedBlockSize = HEADER_LENGTH + compressor.maxCompressedLength(blockSize);
+        this.arrayBox = ARRAY_BOX.get();
+        arrayBox.markOwnership(blockSize, compressedBlockSize);
+        this.buffer = arrayBox.uncompressed;
+        this.compressedBuffer = arrayBox.compressed;
+        this.syncFlush = syncFlush;
+        o = 0;
+        finished = false;
+        System.arraycopy(MAGIC, 0, compressedBuffer, 0, MAGIC_LENGTH);
+    }
+
+    /**
+     * Creates a new instance which checks stream integrity using
+     * {@link StreamingXXHash32} and doesn't sync flush.
+     *
+     * @param out         the {@link OutputStream} to feed
+     * @param blockSize   the maximum number of bytes to try to compress at once,
+     *                    must be &gt;= 64 and &lt;= 32 M
+     * @param compressor  the {@link LZ4Compressor} instance to use to compress
+     *                    data
+     *
+     * @see #ReuseBuffersLZ4BlockOutputStream(OutputStream, int, LZ4Compressor, Checksum, boolean)
+     * @see StreamingXXHash32#asChecksum()
+     */
+    public ReuseBuffersLZ4BlockOutputStream(OutputStream out, int blockSize, LZ4Compressor compressor) {
+        this(out, blockSize, compressor, XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum(), false);
+    }
+
+    /**
+     * Creates a new instance which compresses with the standard LZ4 compression
+     * algorithm.
+     *
+     * @param out         the {@link OutputStream} to feed
+     * @param blockSize   the maximum number of bytes to try to compress at once,
+     *                    must be &gt;= 64 and &lt;= 32 M
+     *
+     * @see #ReuseBuffersLZ4BlockOutputStream(OutputStream, int, LZ4Compressor)
+     * @see LZ4Factory#fastCompressor()
+     */
+    public ReuseBuffersLZ4BlockOutputStream(OutputStream out, int blockSize) {
+        this(out, blockSize, LZ4Factory.fastestInstance().fastCompressor());
+    }
+
+    /**
+     * Creates a new instance which compresses into blocks of 64 KB.
+     *
+     * @param out         the {@link OutputStream} to feed
+     *
+     * @see #ReuseBuffersLZ4BlockOutputStream(OutputStream, int)
+     */
+    public ReuseBuffersLZ4BlockOutputStream(OutputStream out) {
+        this(out, 1 << 16);
+    }
+
+    private void ensureNotFinished() {
+        if (finished) {
+            throw new IllegalStateException("This stream is already closed");
+        }
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        ensureNotFinished();
+        if (o == blockSize) {
+            flushBufferedData();
+        }
+        buffer[o++] = (byte) b;
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        SafeUtils.checkRange(b, off, len);
+        ensureNotFinished();
+
+        while (o + len > blockSize) {
+            final int l = blockSize - o;
+            System.arraycopy(b, off, buffer, o, blockSize - o);
+            o = blockSize;
+            flushBufferedData();
+            off += l;
+            len -= l;
+        }
+        System.arraycopy(b, off, buffer, o, len);
+        o += len;
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        ensureNotFinished();
+        write(b, 0, b.length);
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            if (finished == false) {
+                finish();
+            }
+            if (out != null) {
+                out.close();
+                out = null;
+            }
+        } finally {
+            arrayBox.release();
+        }
+    }
+
+    private void flushBufferedData() throws IOException {
+        if (o == 0) {
+            return;
+        }
+        checksum.reset();
+        checksum.update(buffer, 0, o);
+        final int check = (int) checksum.getValue();
+        int compressedLength = compressor.compress(buffer, 0, o, compressedBuffer, HEADER_LENGTH);
+        final int compressMethod;
+        if (compressedLength >= o) {
+            compressMethod = COMPRESSION_METHOD_RAW;
+            compressedLength = o;
+            System.arraycopy(buffer, 0, compressedBuffer, HEADER_LENGTH, o);
+        } else {
+            compressMethod = COMPRESSION_METHOD_LZ4;
+        }
+
+        compressedBuffer[MAGIC_LENGTH] = (byte) (compressMethod | compressionLevel);
+        writeIntLE(compressedLength, compressedBuffer, MAGIC_LENGTH + 1);
+        writeIntLE(o, compressedBuffer, MAGIC_LENGTH + 5);
+        writeIntLE(check, compressedBuffer, MAGIC_LENGTH + 9);
+        assert MAGIC_LENGTH + 13 == HEADER_LENGTH;
+        out.write(compressedBuffer, 0, HEADER_LENGTH + compressedLength);
+        o = 0;
+    }
+
+    /**
+     * Flushes this compressed {@link OutputStream}.
+     *
+     * If the stream has been created with <code>syncFlush=true</code>, pending
+     * data will be compressed and appended to the underlying {@link OutputStream}
+     * before calling {@link OutputStream#flush()} on the underlying stream.
+     * Otherwise, this method just flushes the underlying stream, so pending
+     * data might not be available for reading until {@link #finish()} or
+     * {@link #close()} is called.
+     */
+    @Override
+    public void flush() throws IOException {
+        if (out != null) {
+            if (syncFlush) {
+                flushBufferedData();
+            }
+            out.flush();
+        }
+    }
+
+    /**
+     * Same as {@link #close()} except that it doesn't close the underlying stream.
+     * This can be useful if you want to keep on using the underlying stream.
+     *
+     * @throws IOException if an I/O error occurs.
+     */
+    public void finish() throws IOException {
+        ensureNotFinished();
+        flushBufferedData();
+        compressedBuffer[MAGIC_LENGTH] = (byte) (COMPRESSION_METHOD_RAW | compressionLevel);
+        writeIntLE(0, compressedBuffer, MAGIC_LENGTH + 1);
+        writeIntLE(0, compressedBuffer, MAGIC_LENGTH + 5);
+        writeIntLE(0, compressedBuffer, MAGIC_LENGTH + 9);
+        assert MAGIC_LENGTH + 13 == HEADER_LENGTH;
+        out.write(compressedBuffer, 0, HEADER_LENGTH);
+        finished = true;
+        out.flush();
+    }
+
+    private static void writeIntLE(int i, byte[] buf, int off) {
+        buf[off++] = (byte) i;
+        buf[off++] = (byte) (i >>> 8);
+        buf[off++] = (byte) (i >>> 16);
+        buf[off++] = (byte) (i >>> 24);
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + "(out=" + out + ", blockSize=" + blockSize
+            + ", compressor=" + compressor + ", checksum=" + checksum + ")";
+    }
+
+}
+

+ 6 - 3
server/src/main/java/org/elasticsearch/transport/TcpTransport.java

@@ -133,10 +133,11 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
         this.pageCacheRecycler = pageCacheRecycler;
         this.circuitBreakerService = circuitBreakerService;
         this.networkService = networkService;
+        Compression.Scheme compressionScheme = TransportSettings.TRANSPORT_COMPRESSION_SCHEME.get(settings);
         String nodeName = Node.NODE_NAME_SETTING.get(settings);
         BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.IN_FLIGHT_REQUESTS);
 
-        this.outboundHandler = new OutboundHandler(nodeName, version, statsTracker, threadPool, bigArrays);
+        this.outboundHandler = new OutboundHandler(nodeName, version, statsTracker, threadPool, bigArrays, compressionScheme);
         this.handshaker = new TransportHandshaker(version, threadPool,
             (node, channel, requestId, v) -> outboundHandler.sendRequest(node, channel, requestId,
                 TransportHandshaker.HANDSHAKE_ACTION_NAME, new TransportHandshaker.HandshakeRequest(version),
@@ -183,7 +184,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
         private final List<TcpChannel> channels;
         private final DiscoveryNode node;
         private final Version version;
-        private final boolean compress;
+        private final Compression.Enabled compress;
         private final AtomicBoolean isClosing = new AtomicBoolean(false);
 
         NodeChannels(DiscoveryNode node, List<TcpChannel> channels, ConnectionProfile connectionProfile, Version handshakeVersion) {
@@ -242,7 +243,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
                 throw new NodeNotConnectedException(node, "connection already closed");
             }
             TcpChannel channel = channel(options.type());
-            outboundHandler.sendRequest(node, channel, requestId, action, request, options, getVersion(), compress, false);
+            boolean shouldCompress = compress == Compression.Enabled.TRUE ||
+                (compress == Compression.Enabled.INDEXING_DATA && request instanceof RawIndexingDataTransportRequest);
+            outboundHandler.sendRequest(node, channel, requestId, action, request, options, getVersion(), shouldCompress, false);
         }
 
         @Override

+ 29 - 108
server/src/main/java/org/elasticsearch/transport/TransportDecompressor.java

@@ -8,130 +8,51 @@
 
 package org.elasticsearch.transport;
 
-import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util.BytesRefIterator;
-import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
-import org.elasticsearch.common.compress.CompressorFactory;
-import org.elasticsearch.core.Releasable;
-import org.elasticsearch.common.recycler.Recycler;
 import org.elasticsearch.common.util.PageCacheRecycler;
+import org.elasticsearch.core.Releasable;
 
 import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.zip.DataFormatException;
-import java.util.zip.Inflater;
 
-public class TransportDecompressor implements Releasable {
+public interface TransportDecompressor extends Releasable {
 
-    private final Inflater inflater;
-    private final PageCacheRecycler recycler;
-    private final ArrayDeque<Recycler.V<byte[]>> pages;
-    private int pageOffset = PageCacheRecycler.BYTE_PAGE_SIZE;
-    private boolean hasReadHeader = false;
+    /**
+     * Decompress the provided bytes
+     *
+     * @param bytesReference to decompress
+     * @return number of compressed bytes consumed
+     */
+    int decompress(BytesReference bytesReference) throws IOException;
 
-    public TransportDecompressor(PageCacheRecycler recycler) {
-        this.recycler = recycler;
-        inflater = new Inflater(true);
-        pages = new ArrayDeque<>(4);
-    }
+    ReleasableBytesReference pollDecompressedPage(boolean isEOS);
 
-    public int decompress(BytesReference bytesReference) throws IOException {
-        int bytesConsumed = 0;
-        if (hasReadHeader == false) {
-            if (CompressorFactory.COMPRESSOR.isCompressed(bytesReference) == false) {
-                int maxToRead = Math.min(bytesReference.length(), 10);
-                StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [")
-                    .append(maxToRead).append("] content bytes out of [").append(bytesReference.length())
-                    .append("] readable bytes with message size [").append(bytesReference.length()).append("] ").append("] are [");
-                for (int i = 0; i < maxToRead; i++) {
-                    sb.append(bytesReference.get(i)).append(",");
-                }
-                sb.append("]");
-                throw new IllegalStateException(sb.toString());
-            }
-            hasReadHeader = true;
-            int headerLength = CompressorFactory.COMPRESSOR.headerLength();
-            bytesReference = bytesReference.slice(headerLength, bytesReference.length() - headerLength);
-            bytesConsumed += headerLength;
-        }
+    @Override
+    void close();
 
-        BytesRefIterator refIterator = bytesReference.iterator();
-        BytesRef ref;
-        while ((ref = refIterator.next()) != null) {
-            inflater.setInput(ref.bytes, ref.offset, ref.length);
-            bytesConsumed += ref.length;
-            boolean continueInflating = true;
-            while (continueInflating) {
-                final Recycler.V<byte[]> page;
-                final boolean isNewPage = pageOffset == PageCacheRecycler.BYTE_PAGE_SIZE;
-                if (isNewPage) {
-                    pageOffset = 0;
-                    page = recycler.bytePage(false);
-                } else {
-                    page = pages.getLast();
-                }
-                byte[] output = page.v();
-                try {
-                    int bytesInflated = inflater.inflate(output, pageOffset, PageCacheRecycler.BYTE_PAGE_SIZE - pageOffset);
-                    pageOffset += bytesInflated;
-                    if (isNewPage) {
-                        if (bytesInflated == 0) {
-                            page.close();
-                            pageOffset = PageCacheRecycler.BYTE_PAGE_SIZE;
-                        } else {
-                            pages.add(page);
-                        }
-                    }
-                } catch (DataFormatException e) {
-                    throw new IOException("Exception while inflating bytes", e);
-                }
-                if (inflater.needsInput()) {
-                    continueInflating = false;
-                }
-                if (inflater.finished()) {
-                    bytesConsumed -= inflater.getRemaining();
-                    continueInflating = false;
-                }
-                assert inflater.needsDictionary() == false;
-            }
+    static TransportDecompressor getDecompressor(PageCacheRecycler recycler, BytesReference bytes) throws IOException {
+        if (bytes.length() < Compression.Scheme.HEADER_LENGTH) {
+            return null;
         }
 
-        return bytesConsumed;
-    }
-
-    public boolean canDecompress(int bytesAvailable) {
-        return hasReadHeader || bytesAvailable >= CompressorFactory.COMPRESSOR.headerLength();
-    }
-
-    public boolean isEOS() {
-        return inflater.finished();
-    }
-
-    public ReleasableBytesReference pollDecompressedPage() {
-        if (pages.isEmpty()) {
-            return null;
-        } else if (pages.size() == 1) {
-            if (isEOS()) {
-                Recycler.V<byte[]> page = pages.pollFirst();
-                ReleasableBytesReference reference = new ReleasableBytesReference(new BytesArray(page.v(), 0, pageOffset), page);
-                pageOffset = 0;
-                return reference;
-            } else {
-                return null;
-            }
+        if (Compression.Scheme.isDeflate(bytes)) {
+            return new DeflateTransportDecompressor(recycler);
+        } else if (Compression.Scheme.isLZ4(bytes)) {
+            return new Lz4TransportDecompressor(recycler);
         } else {
-            Recycler.V<byte[]> page = pages.pollFirst();
-            return new ReleasableBytesReference(new BytesArray(page.v()), page);
+            throw createIllegalState(bytes);
         }
     }
 
-    @Override
-    public void close() {
-        inflater.end();
-        for (Recycler.V<byte[]> page : pages) {
-            page.close();
+    private static IllegalStateException createIllegalState(BytesReference bytes) {
+        int maxToRead = Math.min(bytes.length(), 10);
+        StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [")
+            .append(maxToRead).append("] content bytes out of [").append(bytes.length())
+            .append("] readable bytes with message size [").append(bytes.length()).append("] ").append("] are [");
+        for (int i = 0; i < maxToRead; i++) {
+            sb.append(bytes.get(i)).append(",");
         }
+        sb.append("]");
+        return new IllegalStateException(sb.toString());
     }
 }

+ 6 - 2
server/src/main/java/org/elasticsearch/transport/TransportSettings.java

@@ -21,6 +21,7 @@ import java.util.function.Function;
 import static java.util.Collections.emptyList;
 import static org.elasticsearch.common.settings.Setting.affixKeySetting;
 import static org.elasticsearch.common.settings.Setting.boolSetting;
+import static org.elasticsearch.common.settings.Setting.enumSetting;
 import static org.elasticsearch.common.settings.Setting.intSetting;
 import static org.elasticsearch.common.settings.Setting.listSetting;
 import static org.elasticsearch.common.settings.Setting.timeSetting;
@@ -49,8 +50,11 @@ public final class TransportSettings {
         intSetting("transport.publish_port", -1, -1, Setting.Property.NodeScope);
     public static final Setting.AffixSetting<Integer> PUBLISH_PORT_PROFILE = affixKeySetting("transport.profiles.", "publish_port",
         key -> intSetting(key, -1, -1, Setting.Property.NodeScope));
-    public static final Setting<Boolean> TRANSPORT_COMPRESS =
-        boolSetting("transport.compress", false, Setting.Property.NodeScope);
+    public static final Setting<Compression.Enabled> TRANSPORT_COMPRESS =
+        enumSetting(Compression.Enabled.class, "transport.compress", Compression.Enabled.FALSE, Setting.Property.NodeScope);
+    public static final Setting<Compression.Scheme> TRANSPORT_COMPRESSION_SCHEME =
+        enumSetting(Compression.Scheme.class, "transport.compression_scheme", Compression.Scheme.DEFLATE,
+            Setting.Property.NodeScope);
     // the scheduled internal ping interval setting, defaults to disabled (-1)
     public static final Setting<TimeValue> PING_SCHEDULE =
         timeSetting("transport.ping_schedule", TimeValue.timeValueSeconds(-1), Setting.Property.NodeScope);

+ 1 - 1
server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java

@@ -56,7 +56,7 @@ public class ClusterConnectionManagerTests extends ESTestCase {
         TimeValue oneSecond = new TimeValue(1000);
         TimeValue oneMinute = TimeValue.timeValueMinutes(1);
         connectionProfile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, oneSecond, oneSecond,
-            oneMinute, false);
+            oneMinute, Compression.Enabled.FALSE, Compression.Scheme.DEFLATE);
     }
 
     @After

+ 29 - 2
server/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java

@@ -31,7 +31,10 @@ public class ConnectionProfileTests extends ESTestCase {
         TimeValue connectTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 10));
         TimeValue handshakeTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 10));
         TimeValue pingInterval = TimeValue.timeValueMillis(randomIntBetween(1, 10));
-        boolean compressionEnabled = randomBoolean();
+        Compression.Enabled compressionEnabled =
+            randomFrom(Compression.Enabled.TRUE, Compression.Enabled.FALSE, Compression.Enabled.INDEXING_DATA);
+        Compression.Scheme compressionScheme =
+            randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4);
         final boolean setConnectTimeout = randomBoolean();
         if (setConnectTimeout) {
             builder.setConnectTimeout(connectTimeout);
@@ -40,10 +43,16 @@ public class ConnectionProfileTests extends ESTestCase {
         if (setHandshakeTimeout) {
             builder.setHandshakeTimeout(handshakeTimeout);
         }
+
         final boolean setCompress = randomBoolean();
         if (setCompress) {
             builder.setCompressionEnabled(compressionEnabled);
         }
+
+        final boolean setCompressionScheme = randomBoolean();
+        if (setCompressionScheme) {
+            builder.setCompressionScheme(compressionScheme);
+        }
         final boolean setPingInterval = randomBoolean();
         if (setPingInterval) {
             builder.setPingInterval(pingInterval);
@@ -81,6 +90,12 @@ public class ConnectionProfileTests extends ESTestCase {
             assertNull(build.getCompressionEnabled());
         }
 
+        if (setCompressionScheme) {
+            assertEquals(compressionScheme, build.getCompressionScheme());
+        } else {
+            assertNull(build.getCompressionScheme());
+        }
+
         if (setPingInterval) {
             assertEquals(pingInterval, build.getPingInterval());
         } else {
@@ -171,7 +186,15 @@ public class ConnectionProfileTests extends ESTestCase {
         }
         final boolean connectionCompressSet = randomBoolean();
         if (connectionCompressSet) {
-            builder.setCompressionEnabled(randomBoolean());
+            Compression.Enabled compressionEnabled =
+                randomFrom(Compression.Enabled.TRUE, Compression.Enabled.FALSE, Compression.Enabled.INDEXING_DATA);
+            builder.setCompressionEnabled(compressionEnabled);
+        }
+        final boolean connectionCompressionScheme = randomBoolean();
+        if (connectionCompressionScheme) {
+            Compression.Scheme compressionScheme =
+                randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4);
+            builder.setCompressionScheme(compressionScheme);
         }
 
         final ConnectionProfile profile = builder.build();
@@ -188,6 +211,9 @@ public class ConnectionProfileTests extends ESTestCase {
             equalTo(pingIntervalSet ? profile.getPingInterval() : defaultProfile.getPingInterval()));
         assertThat(resolved.getCompressionEnabled(),
             equalTo(connectionCompressSet ? profile.getCompressionEnabled() : defaultProfile.getCompressionEnabled()));
+        assertThat(resolved.getCompressionScheme(),
+            equalTo(connectionCompressionScheme ? profile.getCompressionScheme() :
+                defaultProfile.getCompressionScheme()));
     }
 
     public void testDefaultConnectionProfile() {
@@ -201,6 +227,7 @@ public class ConnectionProfileTests extends ESTestCase {
         assertEquals(TransportSettings.CONNECT_TIMEOUT.get(Settings.EMPTY), profile.getConnectTimeout());
         assertEquals(TransportSettings.CONNECT_TIMEOUT.get(Settings.EMPTY), profile.getHandshakeTimeout());
         assertEquals(TransportSettings.TRANSPORT_COMPRESS.get(Settings.EMPTY), profile.getCompressionEnabled());
+        assertEquals(TransportSettings.TRANSPORT_COMPRESSION_SCHEME.get(Settings.EMPTY), profile.getCompressionScheme());
         assertEquals(TransportSettings.PING_SCHEDULE.get(Settings.EMPTY), profile.getPingInterval());
 
         profile = ConnectionProfile.buildDefaultConnectionProfile(nonMasterNode());

+ 13 - 14
server/src/test/java/org/elasticsearch/transport/TransportDecompressorTests.java → server/src/test/java/org/elasticsearch/transport/DeflateTransportDecompressorTests.java

@@ -24,7 +24,7 @@ import org.elasticsearch.test.ESTestCase;
 import java.io.IOException;
 import java.io.OutputStream;
 
-public class TransportDecompressorTests extends ESTestCase {
+public class DeflateTransportDecompressorTests extends ESTestCase {
 
     public void testSimpleCompression() throws IOException {
         try (BytesStreamOutput output = new BytesStreamOutput()) {
@@ -35,11 +35,11 @@ public class TransportDecompressorTests extends ESTestCase {
 
             BytesReference bytes = output.bytes();
 
-            TransportDecompressor decompressor = new TransportDecompressor(PageCacheRecycler.NON_RECYCLING_INSTANCE);
+            DeflateTransportDecompressor decompressor = new DeflateTransportDecompressor(PageCacheRecycler.NON_RECYCLING_INSTANCE);
             int bytesConsumed = decompressor.decompress(bytes);
             assertEquals(bytes.length(), bytesConsumed);
             assertTrue(decompressor.isEOS());
-            ReleasableBytesReference releasableBytesReference = decompressor.pollDecompressedPage();
+            ReleasableBytesReference releasableBytesReference = decompressor.pollDecompressedPage(true);
             assertEquals(randomByte, releasableBytesReference.get(0));
             releasableBytesReference.close();
 
@@ -57,14 +57,14 @@ public class TransportDecompressorTests extends ESTestCase {
 
             BytesReference bytes = output.bytes();
 
-            TransportDecompressor decompressor = new TransportDecompressor(PageCacheRecycler.NON_RECYCLING_INSTANCE);
+            DeflateTransportDecompressor decompressor = new DeflateTransportDecompressor(PageCacheRecycler.NON_RECYCLING_INSTANCE);
             int bytesConsumed = decompressor.decompress(bytes);
             assertEquals(bytes.length(), bytesConsumed);
             assertTrue(decompressor.isEOS());
-            ReleasableBytesReference reference1 = decompressor.pollDecompressedPage();
-            ReleasableBytesReference reference2 = decompressor.pollDecompressedPage();
-            ReleasableBytesReference reference3 = decompressor.pollDecompressedPage();
-            assertNull(decompressor.pollDecompressedPage());
+            ReleasableBytesReference reference1 = decompressor.pollDecompressedPage(false);
+            ReleasableBytesReference reference2 = decompressor.pollDecompressedPage(false);
+            ReleasableBytesReference reference3 = decompressor.pollDecompressedPage(true);
+            assertNull(decompressor.pollDecompressedPage(true));
             BytesReference composite = CompositeBytesReference.of(reference1, reference2, reference3);
             assertEquals(4 * 10000, composite.length());
             StreamInput streamInput = composite.streamInput();
@@ -86,7 +86,7 @@ public class TransportDecompressorTests extends ESTestCase {
 
             BytesReference bytes = output.bytes();
 
-            TransportDecompressor decompressor = new TransportDecompressor(PageCacheRecycler.NON_RECYCLING_INSTANCE);
+            DeflateTransportDecompressor decompressor = new DeflateTransportDecompressor(PageCacheRecycler.NON_RECYCLING_INSTANCE);
 
             int split1 = (int) (bytes.length() * 0.3);
             int split2 = (int) (bytes.length() * 0.65);
@@ -103,10 +103,10 @@ public class TransportDecompressorTests extends ESTestCase {
             int bytesConsumed3 = decompressor.decompress(inbound3);
             assertEquals(inbound3.length(), bytesConsumed3);
             assertTrue(decompressor.isEOS());
-            ReleasableBytesReference reference1 = decompressor.pollDecompressedPage();
-            ReleasableBytesReference reference2 = decompressor.pollDecompressedPage();
-            ReleasableBytesReference reference3 = decompressor.pollDecompressedPage();
-            assertNull(decompressor.pollDecompressedPage());
+            ReleasableBytesReference reference1 = decompressor.pollDecompressedPage(false);
+            ReleasableBytesReference reference2 = decompressor.pollDecompressedPage(false);
+            ReleasableBytesReference reference3 = decompressor.pollDecompressedPage(true);
+            assertNull(decompressor.pollDecompressedPage(false));
             BytesReference composite = CompositeBytesReference.of(reference1, reference2, reference3);
             assertEquals(4 * 10000, composite.length());
             StreamInput streamInput = composite.streamInput();
@@ -117,5 +117,4 @@ public class TransportDecompressorTests extends ESTestCase {
 
         }
     }
-
 }

+ 15 - 10
server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java

@@ -47,10 +47,10 @@ public class InboundDecoderTests extends ESTestCase {
         OutboundMessage message;
         if (isRequest) {
             message = new OutboundMessage.Request(threadContext, new TestRequest(randomAlphaOfLength(100)),
-                Version.CURRENT, action, requestId, false, false);
+                Version.CURRENT, action, requestId, false, null);
         } else {
             message = new OutboundMessage.Response(threadContext, new TestResponse(randomAlphaOfLength(100)),
-                Version.CURRENT, requestId, false, false);
+                Version.CURRENT, requestId, false, null);
         }
 
         final BytesReference totalBytes = message.serialize(new BytesStreamOutput());
@@ -96,14 +96,14 @@ public class InboundDecoderTests extends ESTestCase {
 
     public void testDecodePreHeaderSizeVariableInt() throws IOException {
         // TODO: Can delete test on 9.0
-        boolean isCompressed = randomBoolean();
+        Compression.Scheme compressionScheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.DEFLATE, null);
         String action = "test-request";
         long requestId = randomNonNegativeLong();
         final Version preHeaderVariableInt = Version.V_7_5_0;
         final String contentValue = randomAlphaOfLength(100);
         // 8.0 is only compatible with handshakes on a pre-variable int version
         final OutboundMessage message = new OutboundMessage.Request(threadContext, new TestRequest(contentValue),
-            preHeaderVariableInt, action, requestId, true, isCompressed);
+            preHeaderVariableInt, action, requestId, true, compressionScheme);
 
         final BytesReference totalBytes = message.serialize(new BytesStreamOutput());
         int partialHeaderSize = TcpHeader.headerSize(preHeaderVariableInt);
@@ -118,7 +118,11 @@ public class InboundDecoderTests extends ESTestCase {
         final Header header = (Header) fragments.get(0);
         assertEquals(requestId, header.getRequestId());
         assertEquals(preHeaderVariableInt, header.getVersion());
-        assertEquals(isCompressed, header.isCompressed());
+        if (compressionScheme == null) {
+            assertFalse(header.isCompressed());
+        } else {
+            assertTrue(header.isCompressed());
+        }
         assertTrue(header.isHandshake());
         assertTrue(header.isRequest());
         assertTrue(header.needsToReadVariableHeader());
@@ -140,7 +144,7 @@ public class InboundDecoderTests extends ESTestCase {
         threadContext.putHeader(headerKey, headerValue);
         Version handshakeCompat = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion();
         OutboundMessage message = new OutboundMessage.Request(threadContext, new TestRequest(randomAlphaOfLength(100)),
-            handshakeCompat, action, requestId, true, false);
+            handshakeCompat, action, requestId, true, null);
 
         final BytesReference bytes = message.serialize(new BytesStreamOutput());
         int totalHeaderSize = TcpHeader.headerSize(handshakeCompat);
@@ -176,12 +180,13 @@ public class InboundDecoderTests extends ESTestCase {
         }
         OutboundMessage message;
         TransportMessage transportMessage;
+        Compression.Scheme scheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4);
         if (isRequest) {
             transportMessage = new TestRequest(randomAlphaOfLength(100));
-            message = new OutboundMessage.Request(threadContext, transportMessage, Version.CURRENT, action, requestId, false, true);
+            message = new OutboundMessage.Request(threadContext, transportMessage, Version.CURRENT, action, requestId, false, scheme);
         } else {
             transportMessage = new TestResponse(randomAlphaOfLength(100));
-            message = new OutboundMessage.Response(threadContext, transportMessage, Version.CURRENT, requestId, false, true);
+            message = new OutboundMessage.Response(threadContext, transportMessage, Version.CURRENT, requestId, false, scheme);
         }
 
         final BytesReference totalBytes = message.serialize(new BytesStreamOutput());
@@ -235,7 +240,7 @@ public class InboundDecoderTests extends ESTestCase {
         threadContext.putHeader(headerKey, headerValue);
         Version handshakeCompat = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion();
         OutboundMessage message = new OutboundMessage.Request(threadContext, new TestRequest(randomAlphaOfLength(100)),
-            handshakeCompat, action, requestId, true, true);
+            handshakeCompat, action, requestId, true, Compression.Scheme.DEFLATE);
 
         final BytesReference bytes = message.serialize(new BytesStreamOutput());
         int totalHeaderSize = TcpHeader.headerSize(handshakeCompat);
@@ -263,7 +268,7 @@ public class InboundDecoderTests extends ESTestCase {
         long requestId = randomNonNegativeLong();
         Version incompatibleVersion = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion();
         OutboundMessage message = new OutboundMessage.Request(threadContext, new TestRequest(randomAlphaOfLength(100)),
-            incompatibleVersion, action, requestId, false, true);
+            incompatibleVersion, action, requestId, false, Compression.Scheme.DEFLATE);
 
         final BytesReference bytes = message.serialize(new BytesStreamOutput());
 

+ 2 - 2
server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java

@@ -65,7 +65,7 @@ public class InboundHandlerTests extends ESTestCase {
         TransportHandshaker handshaker = new TransportHandshaker(version, threadPool, (n, c, r, v) -> {});
         TransportKeepAlive keepAlive = new TransportKeepAlive(threadPool, TcpChannel::sendMessage);
         OutboundHandler outboundHandler = new OutboundHandler("node", version, new StatsTracker(), threadPool,
-            BigArrays.NON_RECYCLING_INSTANCE);
+            BigArrays.NON_RECYCLING_INSTANCE, randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4));
         requestHandlers = new Transport.RequestHandlers();
         responseHandlers = new Transport.ResponseHandlers();
         handler = new InboundHandler(threadPool, outboundHandler, namedWriteableRegistry, handshaker, keepAlive, requestHandlers,
@@ -125,7 +125,7 @@ public class InboundHandlerTests extends ESTestCase {
         requestHandlers.registerHandler(registry);
         String requestValue = randomAlphaOfLength(10);
         OutboundMessage.Request request = new OutboundMessage.Request(threadPool.getThreadContext(),
-            new TestRequest(requestValue), version, action, requestId, false, false);
+            new TestRequest(requestValue), version, action, requestId, false, null);
 
         BytesReference fullRequestBytes = request.serialize(new BytesStreamOutput());
         BytesReference requestContent = fullRequestBytes.slice(headerSize, fullRequestBytes.length() - headerSize);

+ 24 - 9
server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java

@@ -93,9 +93,16 @@ public class InboundPipelineTests extends ESTestCase {
             try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
                 while (streamOutput.size() < BYTE_THRESHOLD) {
                     final Version version = randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion());
-                    final String value = randomAlphaOfLength(randomIntBetween(10, 200));
+                    final String value = randomRealisticUnicodeOfCodepointLength(randomIntBetween(200, 400));
                     final boolean isRequest = randomBoolean();
-                    final boolean isCompressed = randomBoolean();
+
+                    Compression.Scheme scheme;
+                    if (randomBoolean()) {
+                        scheme = null;
+                    } else {
+                        scheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4);
+                    }
+                    boolean isCompressed = isCompressed(version, scheme);
                     final long requestId = totalMessages++;
 
                     final MessageData messageData;
@@ -106,17 +113,17 @@ public class InboundPipelineTests extends ESTestCase {
                         if (rarely()) {
                             messageData = new MessageData(version, requestId, true, isCompressed, breakThisAction, null);
                             message = new OutboundMessage.Request(threadContext, new TestRequest(value),
-                                version, breakThisAction, requestId, false, isCompressed);
+                                version, breakThisAction, requestId, false, scheme);
                             expectedExceptionClass = new CircuitBreakingException("", CircuitBreaker.Durability.PERMANENT);
                         } else {
                             messageData = new MessageData(version, requestId, true, isCompressed, actionName, value);
                             message = new OutboundMessage.Request(threadContext, new TestRequest(value),
-                                version, actionName, requestId, false, isCompressed);
+                                version, actionName, requestId, false, scheme);
                         }
                     } else {
                         messageData = new MessageData(version, requestId, false, isCompressed, null, value);
                         message = new OutboundMessage.Response(threadContext, new TestResponse(value),
-                            version, requestId, false, isCompressed);
+                            version, requestId, false, scheme);
                     }
 
                     expected.add(new Tuple<>(messageData, expectedExceptionClass));
@@ -165,6 +172,14 @@ public class InboundPipelineTests extends ESTestCase {
         }
     }
 
+    private static boolean isCompressed(Version version, Compression.Scheme scheme) {
+        if (version.before(Compression.Scheme.LZ4_VERSION) && scheme == Compression.Scheme.LZ4) {
+            return false;
+        } else {
+            return scheme != null;
+        }
+    }
+
     public void testDecodeExceptionIsPropagated() throws IOException {
         BiConsumer<TcpChannel, InboundMessage> messageHandler = (c, m) -> {};
         final StatsTracker statsTracker = new StatsTracker();
@@ -184,10 +199,10 @@ public class InboundPipelineTests extends ESTestCase {
             OutboundMessage message;
             if (isRequest) {
                 message = new OutboundMessage.Request(threadContext, new TestRequest(value),
-                    invalidVersion, actionName, requestId, false, false);
+                    invalidVersion, actionName, requestId, false, null);
             } else {
                 message = new OutboundMessage.Response(threadContext, new TestResponse(value),
-                    invalidVersion, requestId, false, false);
+                    invalidVersion, requestId, false, null);
             }
 
             final BytesReference reference = message.serialize(streamOutput);
@@ -221,10 +236,10 @@ public class InboundPipelineTests extends ESTestCase {
             OutboundMessage message;
             if (isRequest) {
                 message = new OutboundMessage.Request(threadContext, new TestRequest(value),
-                    version, actionName, requestId, false, false);
+                    version, actionName, requestId, false, null);
             } else {
                 message = new OutboundMessage.Response(threadContext, new TestResponse(value),
-                    version, requestId, false, false);
+                    version, requestId, false, null);
             }
 
             final BytesReference reference = message.serialize(streamOutput);

+ 173 - 0
server/src/test/java/org/elasticsearch/transport/Lz4TransportDecompressorTests.java

@@ -0,0 +1,173 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.transport;
+
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.CompositeBytesReference;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
+import org.elasticsearch.common.io.Streams;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.util.PageCacheRecycler;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import static org.hamcrest.Matchers.lessThan;
+
+public class Lz4TransportDecompressorTests extends ESTestCase {
+
+    public void testSimpleCompression() throws IOException {
+        try (BytesStreamOutput output = new BytesStreamOutput()) {
+            byte randomByte = randomByte();
+            try (OutputStream lz4BlockStream = Compression.Scheme.lz4OutputStream(Streams.noCloseStream(output))) {
+                lz4BlockStream.write(randomByte);
+            }
+
+            BytesReference bytes = output.bytes();
+
+            Lz4TransportDecompressor decompressor = new Lz4TransportDecompressor(PageCacheRecycler.NON_RECYCLING_INSTANCE);
+            int bytesConsumed = decompressor.decompress(bytes);
+            assertEquals(bytes.length(), bytesConsumed);
+            ReleasableBytesReference releasableBytesReference = decompressor.pollDecompressedPage(true);
+            assertEquals(randomByte, releasableBytesReference.get(0));
+            releasableBytesReference.close();
+        }
+    }
+
+    public void testMultiPageCompression() throws IOException {
+        int intsToWrite = 50000;
+        int uncompressedLength = intsToWrite * 4;
+
+        try (BytesStreamOutput output = new BytesStreamOutput()) {
+            try (StreamOutput lz4BlockStream = new OutputStreamStreamOutput(Compression.Scheme.lz4OutputStream(
+                Streams.flushOnCloseStream(output)))) {
+                for (int i = 0; i < intsToWrite; ++i) {
+                    int lowByte = (i & 0xFF);
+                    if (lowByte < 128) {
+                        lz4BlockStream.writeInt(0);
+                    } else if (lowByte < 200) {
+                        lz4BlockStream.writeInt(1);
+                    } else {
+                        lz4BlockStream.writeInt(i);
+                    }
+                }
+            }
+
+            BytesReference bytes = output.bytes();
+            // Since 200 / 255 data is repeated, we should get a compression ratio of at least 50%
+            assertThat(bytes.length(), lessThan(uncompressedLength / 2));
+
+            Lz4TransportDecompressor decompressor = new Lz4TransportDecompressor(PageCacheRecycler.NON_RECYCLING_INSTANCE);
+            int bytesConsumed = decompressor.decompress(bytes);
+            assertEquals(bytes.length(), bytesConsumed);
+
+            int numOfUncompressedPages = uncompressedLength / PageCacheRecycler.BYTE_PAGE_SIZE;
+            if (bytes.length() % PageCacheRecycler.BYTE_PAGE_SIZE > 0) {
+                numOfUncompressedPages += 1;
+            }
+
+            ReleasableBytesReference[] polledReferences = new ReleasableBytesReference[numOfUncompressedPages];
+            for (int i = 0; i < numOfUncompressedPages - 1; ++i) {
+                polledReferences[i] = decompressor.pollDecompressedPage(false);
+            }
+
+            polledReferences[numOfUncompressedPages - 1] = decompressor.pollDecompressedPage(true);
+            assertNull(decompressor.pollDecompressedPage(true));
+
+            BytesReference composite = CompositeBytesReference.of(polledReferences);
+            assertEquals(uncompressedLength, composite.length());
+            StreamInput streamInput = composite.streamInput();
+            for (int i = 0; i < intsToWrite; ++i) {
+                int lowByte = (i & 0xFF);
+                if (lowByte < 128) {
+                    assertEquals(0, streamInput.readInt());
+                }  else if (lowByte < 200) {
+                    assertEquals(1, streamInput.readInt());
+                } else {
+                    assertEquals(i, streamInput.readInt());
+                }
+            }
+            Releasables.close(polledReferences);
+        }
+    }
+
+    public void testIncrementalMultiPageCompression() throws IOException {
+        int intsToWrite = 50000;
+        int uncompressedLength = intsToWrite * 4;
+
+        try (BytesStreamOutput output = new BytesStreamOutput()) {
+            try (StreamOutput lz4BlockStream = new OutputStreamStreamOutput(
+                Compression.Scheme.lz4OutputStream(Streams.flushOnCloseStream(output)))) {
+                for (int i = 0; i < intsToWrite; ++i) {
+                    int lowByte = (i & 0xFF);
+                    if (lowByte < 128) {
+                        lz4BlockStream.writeInt(0);
+                    } else if (lowByte < 200) {
+                        lz4BlockStream.writeInt(1);
+                    } else {
+                        lz4BlockStream.writeInt(i);
+                    }
+                }
+            }
+
+            BytesReference bytes = output.bytes();
+            // Since 200 / 255 data is repeated, we should get a compression ratio of at least 50%
+            assertThat(bytes.length(), lessThan(uncompressedLength / 2));
+
+            Lz4TransportDecompressor decompressor = new Lz4TransportDecompressor(PageCacheRecycler.NON_RECYCLING_INSTANCE);
+
+            int split1 = (int) (bytes.length() * 0.3);
+            int split2 = (int) (bytes.length() * 0.65);
+            BytesReference inbound1 = bytes.slice(0, split1);
+            BytesReference inbound2 = bytes.slice(split1, split2 - split1);
+            BytesReference inbound3 = bytes.slice(split2, bytes.length() - split2);
+
+            int bytesConsumed1 = decompressor.decompress(inbound1);
+            BytesReference next = CompositeBytesReference.of(inbound1.slice(bytesConsumed1, inbound1.length() - bytesConsumed1), inbound2);
+            int bytesConsumed2 = decompressor.decompress(next);
+            BytesReference next2 = CompositeBytesReference.of(next.slice(bytesConsumed2, next.length() - bytesConsumed2), inbound3);
+            int bytesConsumed3 = decompressor.decompress(next2);
+            assertEquals(bytes.length(), bytesConsumed1 + bytesConsumed2 + bytesConsumed3);
+
+            int numOfUncompressedPages = uncompressedLength / PageCacheRecycler.BYTE_PAGE_SIZE;
+            if (bytes.length() % PageCacheRecycler.BYTE_PAGE_SIZE > 0) {
+                numOfUncompressedPages += 1;
+            }
+
+            ReleasableBytesReference[] polledReferences = new ReleasableBytesReference[numOfUncompressedPages];
+            for (int i = 0; i < numOfUncompressedPages - 1; ++i) {
+                polledReferences[i] = decompressor.pollDecompressedPage(false);
+            }
+
+            polledReferences[numOfUncompressedPages - 1] = decompressor.pollDecompressedPage(true);
+            assertNull(decompressor.pollDecompressedPage(true));
+
+            BytesReference composite = CompositeBytesReference.of(polledReferences);
+            assertEquals(uncompressedLength, composite.length());
+            StreamInput streamInput = composite.streamInput();
+            for (int i = 0; i < intsToWrite; ++i) {
+                int lowByte = (i & 0xFF);
+                if (lowByte < 128) {
+                    assertEquals(0, streamInput.readInt());
+                }  else if (lowByte < 200) {
+                    assertEquals(1, streamInput.readInt());
+                } else {
+                    assertEquals(i, streamInput.readInt());
+                }
+            }
+            Releasables.close(polledReferences);
+
+        }
+    }
+}

+ 11 - 3
server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java

@@ -58,6 +58,7 @@ public class OutboundHandlerTests extends ESTestCase {
     private OutboundHandler handler;
     private FakeTcpChannel channel;
     private DiscoveryNode node;
+    private Compression.Scheme compressionScheme;
 
     @Before
     public void setUp() throws Exception {
@@ -66,7 +67,9 @@ public class OutboundHandlerTests extends ESTestCase {
         TransportAddress transportAddress = buildNewFakeTransportAddress();
         node = new DiscoveryNode("", transportAddress, Version.CURRENT);
         StatsTracker statsTracker = new StatsTracker();
-        handler = new OutboundHandler("node", Version.CURRENT, statsTracker, threadPool, BigArrays.NON_RECYCLING_INSTANCE);
+        compressionScheme = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4);
+        handler = new OutboundHandler("node", Version.CURRENT, statsTracker, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
+            compressionScheme);
 
         final LongSupplier millisSupplier = () -> TimeValue.nsecToMSec(System.nanoTime());
         final InboundDecoder decoder = new InboundDecoder(Version.CURRENT, PageCacheRecycler.NON_RECYCLING_INSTANCE);
@@ -120,6 +123,8 @@ public class OutboundHandlerTests extends ESTestCase {
         long requestId = randomLongBetween(0, 300);
         boolean isHandshake = randomBoolean();
         boolean compress = randomBoolean();
+        boolean compressUnsupportedDueToVersion = compressionScheme == Compression.Scheme.LZ4
+            && version.before(Compression.Scheme.LZ4_VERSION);
         String value = "message";
         threadContext.putHeader("header", "header_value");
         TestRequest request = new TestRequest(value);
@@ -166,7 +171,7 @@ public class OutboundHandlerTests extends ESTestCase {
         } else {
             assertFalse(header.isHandshake());
         }
-        if (compress) {
+        if (compress && compressUnsupportedDueToVersion == false) {
             assertTrue(header.isCompressed());
         } else {
             assertFalse(header.isCompressed());
@@ -183,6 +188,9 @@ public class OutboundHandlerTests extends ESTestCase {
         long requestId = randomLongBetween(0, 300);
         boolean isHandshake = randomBoolean();
         boolean compress = randomBoolean();
+        boolean compressUnsupportedDueToVersion = compressionScheme == Compression.Scheme.LZ4
+            && version.before(Compression.Scheme.LZ4_VERSION);
+
         String value = "message";
         threadContext.putHeader("header", "header_value");
         TestResponse response = new TestResponse(value);
@@ -225,7 +233,7 @@ public class OutboundHandlerTests extends ESTestCase {
         } else {
             assertFalse(header.isHandshake());
         }
-        if (compress) {
+        if (compress && compressUnsupportedDueToVersion == false) {
             assertTrue(header.isCompressed());
         } else {
             assertFalse(header.isCompressed());

+ 14 - 3
server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java

@@ -369,8 +369,13 @@ public class RemoteClusterServiceTests extends ESTestCase {
                     Settings.Builder settingsChange = Settings.builder();
                     TimeValue pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(6, 8));
                     settingsChange.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule);
-                    boolean compressionEnabled = true;
-                    settingsChange.put("cluster.remote.cluster_1.transport.compress", compressionEnabled);
+                    boolean compressionScheme = randomBoolean();
+                    Compression.Enabled enabled = randomFrom(Compression.Enabled.TRUE, Compression.Enabled.INDEXING_DATA);
+                    if (compressionScheme) {
+                        settingsChange.put("cluster.remote.cluster_1.transport.compression_scheme", Compression.Scheme.LZ4);
+                    } else {
+                        settingsChange.put("cluster.remote.cluster_1.transport.compress", enabled);
+                    }
                     settingsChange.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
                     service.validateAndUpdateRemoteCluster("cluster_1", settingsChange.build());
                     assertBusy(remoteClusterConnection::isClosed);
@@ -378,7 +383,13 @@ public class RemoteClusterServiceTests extends ESTestCase {
                     remoteClusterConnection = service.getRemoteClusterConnection("cluster_1");
                     ConnectionProfile connectionProfile = remoteClusterConnection.getConnectionManager().getConnectionProfile();
                     assertEquals(pingSchedule, connectionProfile.getPingInterval());
-                    assertEquals(compressionEnabled, connectionProfile.getCompressionEnabled());
+                    if (compressionScheme) {
+                        assertEquals(Compression.Enabled.FALSE, connectionProfile.getCompressionEnabled());
+                        assertEquals(Compression.Scheme.LZ4, connectionProfile.getCompressionScheme());
+                    } else {
+                        assertEquals(enabled, connectionProfile.getCompressionEnabled());
+                        assertEquals(Compression.Scheme.DEFLATE, connectionProfile.getCompressionScheme());
+                    }
                 }
             }
         }

+ 16 - 3
server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java

@@ -45,7 +45,8 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
     public void testChangeInConnectionProfileMeansTheStrategyMustBeRebuilt() {
         ClusterConnectionManager connectionManager = new ClusterConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class));
         assertEquals(TimeValue.MINUS_ONE, connectionManager.getConnectionProfile().getPingInterval());
-        assertEquals(false, connectionManager.getConnectionProfile().getCompressionEnabled());
+        assertEquals(Compression.Enabled.FALSE, connectionManager.getConnectionProfile().getCompressionEnabled());
+        assertEquals(Compression.Scheme.DEFLATE, connectionManager.getConnectionProfile().getCompressionScheme());
         RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager);
         FakeConnectionStrategy first = new FakeConnectionStrategy("cluster-alias", mock(TransportService.class), remoteConnectionManager,
             RemoteConnectionStrategy.ConnectionStrategy.PROXY);
@@ -53,11 +54,23 @@ public class RemoteConnectionStrategyTests extends ESTestCase {
         Settings.Builder newBuilder = Settings.builder();
         newBuilder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace("cluster-alias").getKey(), "proxy");
         newBuilder.put(ProxyConnectionStrategy.PROXY_ADDRESS.getConcreteSettingForNamespace("cluster-alias").getKey(), "127.0.0.1:9300");
-        if (randomBoolean()) {
+        String ping = "ping";
+        String compress = "compress";
+        String compressionScheme = "compression_scheme";
+        String change = randomFrom(ping, compress, compressionScheme);
+        if (change.equals(ping)) {
             newBuilder.put(RemoteClusterService.REMOTE_CLUSTER_PING_SCHEDULE.getConcreteSettingForNamespace("cluster-alias").getKey(),
                 TimeValue.timeValueSeconds(5));
+        } else if (change.equals(compress)) {
+            newBuilder.put(RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("cluster-alias").getKey(),
+                randomFrom(Compression.Enabled.INDEXING_DATA, Compression.Enabled.TRUE));
+        } else if (change.equals(compressionScheme)) {
+            newBuilder.put(
+                RemoteClusterService.REMOTE_CLUSTER_COMPRESSION_SCHEME.getConcreteSettingForNamespace("cluster-alias").getKey(),
+                Compression.Scheme.LZ4
+            );
         } else {
-            newBuilder.put(RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("cluster-alias").getKey(), true);
+            throw new AssertionError("Unexpected option: " + change);
         }
         assertTrue(first.shouldRebuildConnection(newBuilder.build()));
     }

+ 1 - 1
server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java

@@ -403,7 +403,7 @@ public class TcpTransportTests extends ESTestCase {
 
             TcpTransport.handleException(channel, exception, lifecycle,
                 new OutboundHandler(randomAlphaOfLength(10), Version.CURRENT, new StatsTracker(), testThreadPool,
-                    BigArrays.NON_RECYCLING_INSTANCE));
+                    BigArrays.NON_RECYCLING_INSTANCE, randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4)));
 
             if (expectClosed) {
                 assertTrue(listener.isDone());

+ 1 - 1
server/src/test/java/org/elasticsearch/transport/TransportLoggerTests.java

@@ -78,7 +78,7 @@ public class TransportLoggerTests extends ESTestCase {
     }
 
     private BytesReference buildRequest() throws IOException {
-        boolean compress = randomBoolean();
+        Compression.Scheme compress = randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4, null);
         try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
             OutboundMessage.Request request = new OutboundMessage.Request(new ThreadContext(Settings.EMPTY), new ClusterStatsRequest(),
                 Version.CURRENT, ClusterStatsAction.NAME, randomInt(30), false, compress);

+ 19 - 4
test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

@@ -15,6 +15,7 @@ import com.carrotsearch.randomizedtesting.SeedUtils;
 import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
 import com.carrotsearch.randomizedtesting.generators.RandomPicks;
 import com.carrotsearch.randomizedtesting.generators.RandomStrings;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.store.AlreadyClosedException;
@@ -44,26 +45,26 @@ import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
 import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
 import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.component.LifecycleListener;
 import org.elasticsearch.common.io.FileSystemUtils;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
-import org.elasticsearch.core.Releasables;
 import org.elasticsearch.common.settings.MockSecureSettings;
 import org.elasticsearch.common.settings.SecureSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.Settings.Builder;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.NodeEnvironment;
@@ -97,6 +98,7 @@ import org.elasticsearch.search.SearchService;
 import org.elasticsearch.tasks.TaskManager;
 import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
 import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.transport.Compression;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.transport.TransportSettings;
 
@@ -439,7 +441,20 @@ public final class InternalTestCluster extends TestCluster {
     private static Settings getRandomNodeSettings(long seed) {
         Random random = new Random(seed);
         Builder builder = Settings.builder();
-        builder.put(TransportSettings.TRANSPORT_COMPRESS.getKey(), rarely(random));
+        if (rarely(random)) {
+            builder.put(TransportSettings.TRANSPORT_COMPRESS.getKey(), Compression.Enabled.TRUE);
+        } else {
+            if (random.nextBoolean()) {
+                builder.put(TransportSettings.TRANSPORT_COMPRESS.getKey(), Compression.Enabled.FALSE);
+            } else {
+                builder.put(TransportSettings.TRANSPORT_COMPRESS.getKey(), Compression.Enabled.INDEXING_DATA);
+            }
+        }
+        if (random.nextBoolean()) {
+            builder.put(TransportSettings.TRANSPORT_COMPRESSION_SCHEME.getKey(), Compression.Scheme.DEFLATE);
+        } else {
+            builder.put(TransportSettings.TRANSPORT_COMPRESSION_SCHEME.getKey(), Compression.Scheme.LZ4);
+        }
         if (random.nextBoolean()) {
             builder.put("cache.recycler.page.type", RandomPicks.randomFrom(random, PageCacheRecycler.Type.values()));
         }

+ 10 - 2
test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

@@ -555,7 +555,11 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                     }
                 });
 
-            Settings settingsWithCompress = Settings.builder().put(TransportSettings.TRANSPORT_COMPRESS.getKey(), true).build();
+            Settings settingsWithCompress = Settings.builder()
+                .put(TransportSettings.TRANSPORT_COMPRESS.getKey(), Compression.Enabled.TRUE)
+                .put(TransportSettings.TRANSPORT_COMPRESSION_SCHEME.getKey(),
+                    randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4))
+                .build();
             ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(settingsWithCompress);
             connectToNode(serviceC, serviceA.getLocalDiscoNode(), connectionProfile);
 
@@ -598,7 +602,11 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                     }
                 });
 
-            Settings settingsWithCompress = Settings.builder().put(TransportSettings.TRANSPORT_COMPRESS.getKey(), true).build();
+            Settings settingsWithCompress = Settings.builder()
+                .put(TransportSettings.TRANSPORT_COMPRESS.getKey(), Compression.Enabled.TRUE)
+                .put(TransportSettings.TRANSPORT_COMPRESSION_SCHEME.getKey(),
+                    randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4))
+                .build();
             ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(settingsWithCompress);
             connectToNode(serviceC, serviceA.getLocalDiscoNode(), connectionProfile);
 

+ 1 - 0
test/framework/src/main/java/org/elasticsearch/transport/TestProfiles.java

@@ -26,6 +26,7 @@ public final class TestProfiles {
         builder.setConnectTimeout(source.getConnectTimeout());
         builder.setHandshakeTimeout(source.getHandshakeTimeout());
         builder.setCompressionEnabled(source.getCompressionEnabled());
+        builder.setCompressionScheme(source.getCompressionScheme());
         builder.setPingInterval(source.getPingInterval());
         builder.addConnections(1,
             TransportRequestOptions.Type.BULK,

+ 4 - 1
test/framework/src/main/java/org/elasticsearch/transport/TestTransportChannels.java

@@ -12,12 +12,15 @@ import org.elasticsearch.Version;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.threadpool.ThreadPool;
 
+import static org.elasticsearch.test.ESTestCase.randomFrom;
+
 public class TestTransportChannels {
 
     public static TcpTransportChannel newFakeTcpTransportChannel(String nodeName, TcpChannel channel, ThreadPool threadPool,
                                                                  String action, long requestId, Version version) {
         return new TcpTransportChannel(
-            new OutboundHandler(nodeName, version, new StatsTracker(), threadPool, BigArrays.NON_RECYCLING_INSTANCE),
+            new OutboundHandler(nodeName, version, new StatsTracker(), threadPool, BigArrays.NON_RECYCLING_INSTANCE,
+                randomFrom(Compression.Scheme.DEFLATE, Compression.Scheme.LZ4)),
             channel, action, requestId, version, false, false, () -> {});
     }
 }

+ 1 - 0
test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java

@@ -171,6 +171,7 @@ public class MockNioTransport extends TcpTransport {
         builder.setConnectTimeout(connectionProfile.getConnectTimeout());
         builder.setPingInterval(connectionProfile.getPingInterval());
         builder.setCompressionEnabled(connectionProfile.getCompressionEnabled());
+        builder.setCompressionScheme(connectionProfile.getCompressionScheme());
         return builder.build();
     }
 

+ 5 - 2
x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java

@@ -78,6 +78,7 @@ import org.elasticsearch.snapshots.SnapshotRestoreException;
 import org.elasticsearch.tasks.TaskInfo;
 import org.elasticsearch.test.BackgroundIndexer;
 import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.transport.Compression;
 import org.elasticsearch.transport.NoSuchRemoteClusterException;
 import org.elasticsearch.transport.RemoteClusterService;
 import org.elasticsearch.transport.SniffConnectionStrategy;
@@ -1321,7 +1322,8 @@ public class IndexFollowingIT extends CcrIntegTestCase {
 
             ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest().masterNodeTimeout(TimeValue.MAX_VALUE);
             String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
-            Setting<Boolean> compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster");
+            Setting<Compression.Enabled> compress =
+                RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster");
             Setting<List<String>> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("leader_cluster");
             settingsRequest.persistentSettings(Settings.builder().put(compress.getKey(), true).put(seeds.getKey(), address));
             assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
@@ -1352,7 +1354,8 @@ public class IndexFollowingIT extends CcrIntegTestCase {
         } finally {
             ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest().masterNodeTimeout(TimeValue.MAX_VALUE);
             String address = getLeaderCluster().getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
-            Setting<Boolean> compress = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster");
+            Setting<Compression.Enabled> compress =
+                RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace("leader_cluster");
             Setting<List<String>> seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("leader_cluster");
             settingsRequest.persistentSettings(Settings.builder().put(compress.getKey(), compress.getDefault(Settings.EMPTY))
                 .put(seeds.getKey(), address));

+ 2 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

@@ -41,6 +41,7 @@ import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.RawIndexingDataTransportRequest;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.ccr.Ccr;
 
@@ -66,7 +67,7 @@ public class ShardChangesAction extends ActionType<ShardChangesAction.Response>
         super(NAME, ShardChangesAction.Response::new);
     }
 
-    public static class Request extends SingleShardRequest<Request> {
+    public static class Request extends SingleShardRequest<Request> implements RawIndexingDataTransportRequest {
 
         private long fromSeqNo;
         private int maxOperationCount;