Ver Fonte

Use network recycler for publications (#80650)

Today we allocate memory for the cluster state to be sent in a
publication using `BigArrays`, but really this memory usage is
network-related so (as in #80111) we should allocate its memory via the
recycler used in the transport layers (i.e. Netty's allocator) instead.
This commit does that.
David Turner há 3 anos atrás
pai
commit
e662874f98

+ 0 - 3
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

@@ -47,7 +47,6 @@ import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.common.xcontent.XContentHelper;
@@ -167,7 +166,6 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
         String nodeName,
         Settings settings,
         ClusterSettings clusterSettings,
-        BigArrays bigArrays,
         TransportService transportService,
         Client client,
         NamedWriteableRegistry namedWriteableRegistry,
@@ -228,7 +226,6 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
             configuredHostsResolver
         );
         this.publicationHandler = new PublicationTransportHandler(
-            bigArrays,
             transportService,
             namedWriteableRegistry,
             this::handlePublishRequest,

+ 5 - 10
server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java

@@ -25,15 +25,13 @@ import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.compress.Compressor;
 import org.elasticsearch.common.compress.CompressorFactory;
 import org.elasticsearch.common.io.Streams;
-import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.InputStreamStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.PositionTrackingOutputStreamStreamOutput;
-import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
+import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.LazyInitializable;
 import org.elasticsearch.core.AbstractRefCounted;
 import org.elasticsearch.core.Releasables;
@@ -61,7 +59,6 @@ public class PublicationTransportHandler {
     public static final String PUBLISH_STATE_ACTION_NAME = "internal:cluster/coordination/publish_state";
     public static final String COMMIT_STATE_ACTION_NAME = "internal:cluster/coordination/commit_state";
 
-    private final BigArrays bigArrays;
     private final TransportService transportService;
     private final NamedWriteableRegistry namedWriteableRegistry;
     private final Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest;
@@ -88,13 +85,11 @@ public class PublicationTransportHandler {
     private final SerializationStatsTracker serializationStatsTracker = new SerializationStatsTracker();
 
     public PublicationTransportHandler(
-        BigArrays bigArrays,
         TransportService transportService,
         NamedWriteableRegistry namedWriteableRegistry,
         Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
         BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit
     ) {
-        this.bigArrays = bigArrays;
         this.transportService = transportService;
         this.namedWriteableRegistry = namedWriteableRegistry;
         this.handlePublishRequest = handlePublishRequest;
@@ -225,7 +220,7 @@ public class PublicationTransportHandler {
 
     private ReleasableBytesReference serializeFullClusterState(ClusterState clusterState, DiscoveryNode node) {
         final Version nodeVersion = node.getVersion();
-        final BytesStreamOutput bytesStream = new ReleasableBytesStreamOutput(bigArrays);
+        final RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream();
         boolean success = false;
         try {
             final long uncompressedBytes;
@@ -241,7 +236,7 @@ public class PublicationTransportHandler {
             } catch (IOException e) {
                 throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, node);
             }
-            final ReleasableBytesReference result = new ReleasableBytesReference(bytesStream.bytes(), bytesStream::close);
+            final ReleasableBytesReference result = new ReleasableBytesReference(bytesStream.bytes(), bytesStream);
             serializationStatsTracker.serializedFullState(uncompressedBytes, result.length());
             logger.trace(
                 "serialized full cluster state version [{}] for node version [{}] with size [{}]",
@@ -260,7 +255,7 @@ public class PublicationTransportHandler {
 
     private ReleasableBytesReference serializeDiffClusterState(long clusterStateVersion, Diff<ClusterState> diff, DiscoveryNode node) {
         final Version nodeVersion = node.getVersion();
-        final BytesStreamOutput bytesStream = new ReleasableBytesStreamOutput(bigArrays);
+        final RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream();
         boolean success = false;
         try {
             final long uncompressedBytes;
@@ -276,7 +271,7 @@ public class PublicationTransportHandler {
             } catch (IOException e) {
                 throw new ElasticsearchException("failed to serialize cluster state diff for publishing to node {}", e, node);
             }
-            final ReleasableBytesReference result = new ReleasableBytesReference(bytesStream.bytes(), bytesStream::close);
+            final ReleasableBytesReference result = new ReleasableBytesReference(bytesStream.bytes(), bytesStream);
             serializationStatsTracker.serializedDiff(uncompressedBytes, result.length());
             logger.trace(
                 "serialized cluster state diff for version [{}] for node version [{}] with size [{}]",

+ 0 - 3
server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java

@@ -30,7 +30,6 @@ import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.gateway.GatewayMetaState;
 import org.elasticsearch.monitor.NodeHealthService;
 import org.elasticsearch.plugins.DiscoveryPlugin;
@@ -91,7 +90,6 @@ public class DiscoveryModule {
 
     public DiscoveryModule(
         Settings settings,
-        BigArrays bigArrays,
         TransportService transportService,
         Client client,
         NamedWriteableRegistry namedWriteableRegistry,
@@ -185,7 +183,6 @@ public class DiscoveryModule {
                 NODE_NAME_SETTING.get(settings),
                 settings,
                 clusterSettings,
-                bigArrays,
                 transportService,
                 client,
                 namedWriteableRegistry,

+ 0 - 1
server/src/main/java/org/elasticsearch/node/Node.java

@@ -799,7 +799,6 @@ public class Node implements Closeable {
 
             final DiscoveryModule discoveryModule = new DiscoveryModule(
                 settings,
-                bigArrays,
                 transportService,
                 client,
                 namedWriteableRegistry,

+ 2 - 0
server/src/main/java/org/elasticsearch/transport/BytesRefRecycler.java

@@ -14,6 +14,8 @@ import org.elasticsearch.common.util.PageCacheRecycler;
 
 public class BytesRefRecycler implements Recycler<BytesRef> {
 
+    public static final BytesRefRecycler NON_RECYCLING_INSTANCE = new BytesRefRecycler(PageCacheRecycler.NON_RECYCLING_INSTANCE);
+
     private final PageCacheRecycler recycler;
 
     public BytesRefRecycler(PageCacheRecycler recycler) {

+ 6 - 0
server/src/main/java/org/elasticsearch/transport/TcpTransport.java

@@ -26,6 +26,7 @@ import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.component.Lifecycle;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.network.CloseableChannel;
 import org.elasticsearch.common.network.NetworkAddress;
@@ -889,6 +890,11 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
         return Collections.unmodifiableSet(acceptedChannels);
     }
 
+    @Override
+    public RecyclerBytesStreamOutput newNetworkBytesStream() {
+        return new RecyclerBytesStreamOutput(recycler);
+    }
+
     /**
      * Ensures this transport is still started / open
      *

+ 7 - 0
server/src/main/java/org/elasticsearch/transport/Transport.java

@@ -12,6 +12,7 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.component.LifecycleComponent;
+import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
 import org.elasticsearch.common.transport.BoundTransportAddress;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.util.Maps;
@@ -29,6 +30,8 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
 
+import static org.elasticsearch.transport.BytesRefRecycler.NON_RECYCLING_INSTANCE;
+
 public interface Transport extends LifecycleComponent {
 
     /**
@@ -79,6 +82,10 @@ public interface Transport extends LifecycleComponent {
 
     RequestHandlers getRequestHandlers();
 
+    default RecyclerBytesStreamOutput newNetworkBytesStream() {
+        return new RecyclerBytesStreamOutput(NON_RECYCLING_INSTANCE);
+    }
+
     /**
      * A unidirectional connection to a {@link DiscoveryNode}
      */

+ 5 - 0
server/src/main/java/org/elasticsearch/transport/TransportService.java

@@ -19,6 +19,7 @@ import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -505,6 +506,10 @@ public class TransportService extends AbstractLifecycleComponent
         return connectionManager;
     }
 
+    public RecyclerBytesStreamOutput newNetworkBytesStream() {
+        return transport.newNetworkBytesStream();
+    }
+
     static class HandshakeRequest extends TransportRequest {
 
         public static final HandshakeRequest INSTANCE = new HandshakeRequest();

+ 0 - 4
server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java

@@ -26,12 +26,9 @@ import org.elasticsearch.cluster.service.MasterServiceTests;
 import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.MockBigArrays;
-import org.elasticsearch.common.util.MockPageCacheRecycler;
 import org.elasticsearch.common.util.concurrent.BaseFuture;
 import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
 import org.elasticsearch.common.util.concurrent.FutureUtils;
-import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.monitor.NodeHealthService;
 import org.elasticsearch.monitor.StatusInfo;
 import org.elasticsearch.node.Node;
@@ -205,7 +202,6 @@ public class NodeJoinTests extends ESTestCase {
             "test_node",
             Settings.EMPTY,
             clusterSettings,
-            new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()),
             transportService,
             null,
             writableRegistry(),

+ 17 - 6
server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTransportHandlerTests.java

@@ -22,20 +22,20 @@ import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.compress.Compressor;
 import org.elasticsearch.common.compress.CompressorFactory;
 import org.elasticsearch.common.io.stream.InputStreamStreamInput;
+import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.MockBigArrays;
-import org.elasticsearch.common.util.PageCacheRecycler;
+import org.elasticsearch.common.util.MockPageCacheRecycler;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.internal.io.IOUtils;
-import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.VersionUtils;
 import org.elasticsearch.test.transport.MockTransport;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.BytesRefRecycler;
 import org.elasticsearch.transport.BytesTransportRequest;
 import org.elasticsearch.transport.RemoteTransportException;
 import org.elasticsearch.transport.TransportRequest;
@@ -53,14 +53,19 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class PublicationTransportHandlerTests extends ESTestCase {
 
     public void testDiffSerializationFailure() {
         final DiscoveryNode localNode = new DiscoveryNode("localNode", buildNewFakeTransportAddress(), Version.CURRENT);
+
+        final TransportService transportService = mock(TransportService.class);
+        final BytesRefRecycler recycler = new BytesRefRecycler(new MockPageCacheRecycler(Settings.EMPTY));
+        when(transportService.newNetworkBytesStream()).then(invocation -> new RecyclerBytesStreamOutput(recycler));
+
         final PublicationTransportHandler handler = new PublicationTransportHandler(
-            new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService()),
-            mock(TransportService.class),
+            transportService,
             writableRegistry(),
             pu -> null,
             (pu, l) -> {}
@@ -130,6 +135,7 @@ public class PublicationTransportHandlerTests extends ESTestCase {
 
             final boolean simulateFailures = randomBoolean();
             final DiscoveryNode localNode = new DiscoveryNode("localNode", buildNewFakeTransportAddress(), Version.CURRENT);
+            final BytesRefRecycler recycler = new BytesRefRecycler(new MockPageCacheRecycler(Settings.EMPTY));
             final MockTransport mockTransport = new MockTransport() {
 
                 @Nullable
@@ -167,7 +173,13 @@ public class PublicationTransportHandlerTests extends ESTestCase {
                         handleError(requestId, new RemoteTransportException(node.getName(), node.getAddress(), action, exception));
                     }
                 }
+
+                @Override
+                public RecyclerBytesStreamOutput newNetworkBytesStream() {
+                    return new RecyclerBytesStreamOutput(recycler);
+                }
             };
+
             final TransportService transportService = mockTransport.createTransportService(
                 Settings.EMPTY,
                 threadPool,
@@ -177,7 +189,6 @@ public class PublicationTransportHandlerTests extends ESTestCase {
                 Collections.emptySet()
             );
             final PublicationTransportHandler handler = new PublicationTransportHandler(
-                new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService()),
                 transportService,
                 writableRegistry(),
                 pu -> null,

+ 0 - 2
server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java

@@ -18,7 +18,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.gateway.GatewayMetaState;
 import org.elasticsearch.plugins.DiscoveryPlugin;
@@ -80,7 +79,6 @@ public class DiscoveryModuleTests extends ESTestCase {
     private DiscoveryModule newModule(Settings settings, List<DiscoveryPlugin> plugins) {
         return new DiscoveryModule(
             settings,
-            BigArrays.NON_RECYCLING_INSTANCE,
             transportService,
             null,
             namedWriteableRegistry,

+ 9 - 1
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -121,6 +121,7 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.FakeThreadPoolMasterService;
 import org.elasticsearch.cluster.service.MasterService;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
 import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.IndexScopedSettings;
@@ -174,6 +175,7 @@ import org.elasticsearch.search.fetch.FetchPhase;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.disruption.DisruptableMockTransport;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.BytesRefRecycler;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 import org.junit.After;
@@ -1674,6 +1676,13 @@ public class SnapshotResiliencyTests extends ESTestCase {
                     protected NamedWriteableRegistry writeableRegistry() {
                         return namedWriteableRegistry;
                     }
+
+                    @Override
+                    public RecyclerBytesStreamOutput newNetworkBytesStream() {
+                        // skip leak checks in these tests since they do indeed leak
+                        return new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE);
+                        // TODO fix these leaks and implement leak checking
+                    }
                 };
                 transportService = mockTransport.createTransportService(
                     settings,
@@ -2144,7 +2153,6 @@ public class SnapshotResiliencyTests extends ESTestCase {
                     node.getName(),
                     clusterService.getSettings(),
                     clusterService.getClusterSettings(),
-                    bigArrays,
                     transportService,
                     null,
                     namedWriteableRegistry,

+ 52 - 96
test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

@@ -42,15 +42,17 @@ import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.recycler.Recycler;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.common.util.ByteArray;
 import org.elasticsearch.common.util.MockBigArrays;
 import org.elasticsearch.common.util.MockPageCacheRecycler;
+import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
 import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
 import org.elasticsearch.core.Nullable;
@@ -72,6 +74,7 @@ import org.elasticsearch.test.disruption.DisruptableMockTransport;
 import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
 import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.BytesRefRecycler;
 import org.elasticsearch.transport.TransportInterceptor;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportRequestOptions;
@@ -262,6 +265,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
         private final Map<Long, ClusterState> committedStatesByVersion = new HashMap<>();
         private final LinearizabilityChecker linearizabilityChecker = new LinearizabilityChecker();
         private final History history = new History();
+        private final Recycler<BytesRef> recycler;
         private final BigArrays bigArrays;
         private final NodeHealthService nodeHealthService;
 
@@ -280,9 +284,14 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
 
         Cluster(int initialNodeCount, boolean allNodesMasterEligible, Settings nodeSettings, NodeHealthService nodeHealthService) {
             this.nodeHealthService = nodeHealthService;
-            bigArrays = usually()
-                ? BigArrays.NON_RECYCLING_INSTANCE
-                : new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
+            if (usually()) {
+                recycler = BytesRefRecycler.NON_RECYCLING_INSTANCE;
+                bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
+            } else {
+                final PageCacheRecycler pageCacheRecycler = new MockPageCacheRecycler(Settings.EMPTY);
+                recycler = new BytesRefRecycler(pageCacheRecycler);
+                bigArrays = new MockBigArrays(pageCacheRecycler, new NoneCircuitBreakerService());
+            }
             deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
 
             assertThat(initialNodeCount, greaterThan(0));
@@ -1053,7 +1062,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
             private DisruptableMockTransport mockTransport;
             private final NodeHealthService nodeHealthService;
             List<BiConsumer<DiscoveryNode, ClusterState>> extraJoinValidators = new ArrayList<>();
-            private DelegatingBigArrays delegatingBigArrays;
+            private ClearableRecycler clearableRecycler;
 
             ClusterNode(int nodeIndex, boolean masterEligible, Settings nodeSettings, NodeHealthService nodeHealthService) {
                 this(
@@ -1091,6 +1100,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
 
             private void setUp() {
                 final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(this::onNode);
+                clearableRecycler = new ClearableRecycler(recycler);
                 mockTransport = new DisruptableMockTransport(localNode, logger, deterministicTaskQueue) {
                     @Override
                     protected void execute(Runnable runnable) {
@@ -1140,6 +1150,11 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
 
                         super.onSendRequest(requestId, action, request, options, destinationTransport);
                     }
+
+                    @Override
+                    public RecyclerBytesStreamOutput newNetworkBytesStream() {
+                        return new RecyclerBytesStreamOutput(clearableRecycler);
+                    }
                 };
                 final Settings settings = nodeSettings.hasValue(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey())
                     ? nodeSettings
@@ -1181,7 +1196,6 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                     (dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs))
                 );
                 final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
-                delegatingBigArrays = new DelegatingBigArrays(bigArrays);
                 final NodeClient client = new NodeClient(Settings.EMPTY, threadPool);
                 client.initialize(
                     singletonMap(
@@ -1198,7 +1212,6 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                     "test_node",
                     settings,
                     clusterSettings,
-                    delegatingBigArrays,
                     transportService,
                     client,
                     getNamedWriteableRegistry(),
@@ -1272,7 +1285,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                         nodeHealthService
                     );
                 } finally {
-                    delegatingBigArrays.releaseAll();
+                    clearableRecycler.clear();
                 }
             }
 
@@ -1813,107 +1826,50 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
     }
 
     /**
-     * A wrapper around a {@link BigArrays} which tracks the arrays it allocates so that they can be released if the node reboots. Only
-     * works for {@link ByteArray} allocations since that's all the {@link Coordinator} needs.
+     * A wrapper around a {@link Recycler<BytesRef>} which tracks the refs it allocates so they can be released if the node reboots.
      */
-    static class DelegatingBigArrays extends BigArrays {
-
-        private final BigArrays delegate;
+    static class ClearableRecycler implements Recycler<BytesRef> {
 
-        private final Set<DelegatingByteArray> trackedArrays = new HashSet<>();
+        private final Recycler<BytesRef> delegate;
+        private final Set<V<BytesRef>> trackedRefs = new HashSet<>();
 
-        DelegatingBigArrays(BigArrays delegate) {
-            super(null, null, null);
+        ClearableRecycler(Recycler<BytesRef> delegate) {
             this.delegate = delegate;
         }
 
         @Override
-        public ByteArray newByteArray(long size, boolean clearOnResize) {
-            return track(delegate.newByteArray(size, clearOnResize));
-        }
+        public V<BytesRef> obtain() {
+            V<BytesRef> innerRef = delegate.obtain();
+            final V<BytesRef> trackedRef = new V<>() {
+                @Override
+                public BytesRef v() {
+                    return innerRef.v();
+                }
 
-        @Override
-        public ByteArray resize(ByteArray array, long size) {
-            assert array instanceof DelegatingByteArray;
-            trackedArrays.remove(array);
-            return track(delegate.resize(((DelegatingByteArray) array).getDelegate(), size));
-        }
+                @Override
+                public boolean isRecycled() {
+                    return innerRef.isRecycled();
+                }
 
-        private ByteArray track(ByteArray byteArray) {
-            final DelegatingByteArray wrapped = new DelegatingByteArray(byteArray);
-            trackedArrays.add(wrapped);
-            return wrapped;
+                @Override
+                public void close() {
+                    innerRef.close();
+                    trackedRefs.remove(this);
+                }
+            };
+            trackedRefs.add(trackedRef);
+            return trackedRef;
         }
 
-        void releaseAll() {
-            for (DelegatingByteArray trackedArray : List.copyOf(trackedArrays)) {
-                trackedArray.close();
+        /**
+         * Release all tracked refs as if the node rebooted.
+         */
+        void clear() {
+            for (V<BytesRef> trackedRef : List.copyOf(trackedRefs)) {
+                trackedRef.close();
             }
-            assert trackedArrays.isEmpty() : trackedArrays;
+            assert trackedRefs.isEmpty() : trackedRefs;
         }
 
-        private class DelegatingByteArray implements ByteArray {
-
-            private final ByteArray delegate;
-
-            DelegatingByteArray(ByteArray delegate) {
-                this.delegate = delegate;
-            }
-
-            ByteArray getDelegate() {
-                return delegate;
-            }
-
-            @Override
-            public void close() {
-                delegate.close();
-                trackedArrays.remove(this);
-            }
-
-            @Override
-            public long size() {
-                return delegate.size();
-            }
-
-            @Override
-            public byte get(long index) {
-                return delegate.get(index);
-            }
-
-            @Override
-            public byte set(long index, byte value) {
-                return delegate.set(index, value);
-            }
-
-            @Override
-            public boolean get(long index, int len, BytesRef ref) {
-                return delegate.get(index, len, ref);
-            }
-
-            @Override
-            public void set(long index, byte[] buf, int offset, int len) {
-                delegate.set(index, buf, offset, len);
-            }
-
-            @Override
-            public void fill(long fromIndex, long toIndex, byte value) {
-                delegate.fill(fromIndex, toIndex, value);
-            }
-
-            @Override
-            public boolean hasArray() {
-                return delegate.hasArray();
-            }
-
-            @Override
-            public byte[] array() {
-                return delegate.array();
-            }
-
-            @Override
-            public long ramBytesUsed() {
-                return delegate.ramBytesUsed();
-            }
-        }
     }
 }

+ 11 - 0
test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java

@@ -13,9 +13,14 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.component.Lifecycle;
 import org.elasticsearch.common.component.LifecycleListener;
+import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.BoundTransportAddress;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.util.MockPageCacheRecycler;
+import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.tasks.Task;
+import org.elasticsearch.transport.BytesRefRecycler;
 import org.elasticsearch.transport.ConnectionProfile;
 import org.elasticsearch.transport.RequestHandlerRegistry;
 import org.elasticsearch.transport.Transport;
@@ -44,6 +49,7 @@ public class StubbableTransport implements Transport {
     private volatile SendRequestBehavior defaultSendRequest = null;
     private volatile OpenConnectionBehavior defaultConnectBehavior = null;
     private final Transport delegate;
+    private final PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY);
 
     public StubbableTransport(Transport transport) {
         this.delegate = transport;
@@ -336,4 +342,9 @@ public class StubbableTransport implements Transport {
 
         default void clearCallback() {}
     }
+
+    @Override
+    public RecyclerBytesStreamOutput newNetworkBytesStream() {
+        return new RecyclerBytesStreamOutput(new BytesRefRecycler(recycler));
+    }
 }