Преглед изворни кода

Recycle buffers used for cluster state publication (#63537)

Uses a `ReleasableBytesStreamOutput` to serialize the cluster state (or
diff) and releases it on completion of the publication.
David Turner пре 5 година
родитељ
комит
faaf2e72bd

+ 24 - 13
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

@@ -56,6 +56,7 @@ import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.common.xcontent.XContentHelper;
@@ -160,7 +161,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
                        NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
                        Supplier<CoordinationState.PersistedState> persistedStateSupplier, SeedHostsProvider seedHostsProvider,
                        ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random,
-                       RerouteService rerouteService, ElectionStrategy electionStrategy, NodeHealthService nodeHealthService) {
+                       RerouteService rerouteService, ElectionStrategy electionStrategy, NodeHealthService nodeHealthService,
+                       BigArrays bigArrays) {
         this.settings = settings;
         this.transportService = transportService;
         this.masterService = masterService;
@@ -186,7 +188,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
         this.peerFinder = new CoordinatorPeerFinder(settings, transportService,
             new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);
         this.publicationHandler = new PublicationTransportHandler(transportService, namedWriteableRegistry,
-            this::handlePublishRequest, this::handleApplyCommit);
+            this::handlePublishRequest, this::handleApplyCommit, bigArrays);
         this.leaderChecker = new LeaderChecker(settings, transportService, this::onLeaderFailure, nodeHealthService);
         this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode,
             nodeHealthService);
@@ -1069,17 +1071,24 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
 
                 final PublicationTransportHandler.PublicationContext publicationContext =
                     publicationHandler.newPublicationContext(clusterChangedEvent);
-
-                final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
-                final CoordinatorPublication publication = new CoordinatorPublication(publishRequest, publicationContext,
-                    new ListenableFuture<>(), ackListener, publishListener);
-                currentPublication = Optional.of(publication);
-
-                final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes();
-                leaderChecker.setCurrentNodes(publishNodes);
-                followersChecker.setCurrentNodes(publishNodes);
-                lagDetector.setTrackedNodes(publishNodes);
-                publication.start(followersChecker.getFaultyNodes());
+                boolean publicationStarted = false;
+                try {
+                    final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
+                    final CoordinatorPublication publication = new CoordinatorPublication(publishRequest, publicationContext,
+                        new ListenableFuture<>(), ackListener, publishListener);
+                    currentPublication = Optional.of(publication);
+
+                    final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes();
+                    leaderChecker.setCurrentNodes(publishNodes);
+                    followersChecker.setCurrentNodes(publishNodes);
+                    lagDetector.setTrackedNodes(publishNodes);
+                    publication.start(followersChecker.getFaultyNodes());
+                    publicationStarted = true;
+                } finally {
+                    if (publicationStarted == false) {
+                        publicationContext.releaseSerializedStates();
+                    }
+                }
             }
         } catch (Exception e) {
             logger.debug(() -> new ParameterizedMessage("[{}] publishing failed", clusterChangedEvent.source()), e);
@@ -1351,6 +1360,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
         protected void onCompletion(boolean committed) {
             assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
 
+            publicationContext.releaseSerializedStates();
+
             localNodeAckEvent.addListener(new ActionListener<Void>() {
                 @Override
                 public void onResponse(Void ignore) {

+ 3 - 3
server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java

@@ -360,10 +360,10 @@ public abstract class Publication {
 
             @Override
             public void onFailure(Exception e) {
-                assert e instanceof TransportException;
-                final TransportException exp = (TransportException) e;
+                assert e instanceof ElasticsearchException : e;
+                final ElasticsearchException exp = (ElasticsearchException) e;
                 logger.debug(() -> new ParameterizedMessage("PublishResponseHandler: [{}] failed", discoveryNode), exp);
-                assert ((TransportException) e).getRootCause() instanceof Exception;
+                assert exp.getRootCause() instanceof Exception;
                 setFailed((Exception) exp.getRootCause());
                 onPossibleCommitFailure();
                 assert publicationCompletedIffAllTargetsInactiveOrCancelled();

+ 166 - 50
server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java

@@ -31,15 +31,18 @@ import org.elasticsearch.cluster.IncompatibleClusterStateVersionException;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.compress.Compressor;
 import org.elasticsearch.common.compress.CompressorFactory;
-import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.common.io.stream.InputStreamStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
+import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.BytesTransportRequest;
@@ -69,6 +72,7 @@ public class PublicationTransportHandler {
     private final TransportService transportService;
     private final NamedWriteableRegistry namedWriteableRegistry;
     private final Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest;
+    private final BigArrays bigArrays;
 
     private final AtomicReference<ClusterState> lastSeenClusterState = new AtomicReference<>();
 
@@ -89,10 +93,12 @@ public class PublicationTransportHandler {
 
     public PublicationTransportHandler(TransportService transportService, NamedWriteableRegistry namedWriteableRegistry,
                                        Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
-                                       BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit) {
+                                       BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit,
+                                       BigArrays bigArrays) {
         this.transportService = transportService;
         this.namedWriteableRegistry = namedWriteableRegistry;
         this.handlePublishRequest = handlePublishRequest;
+        this.bigArrays = bigArrays;
 
         transportService.registerRequestHandler(PUBLISH_STATE_ACTION_NAME, ThreadPool.Names.GENERIC, false, false,
             BytesTransportRequest::new, (request, channel, task) -> channel.sendResponse(handleIncomingPublishRequest(request)));
@@ -212,31 +218,60 @@ public class PublicationTransportHandler {
         // Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
         // straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
         // therefore serializing it) if the diff-based publication fails.
-        publicationContext.buildDiffAndSerializeStates();
-        return publicationContext;
+        boolean success = false;
+        try {
+            publicationContext.buildDiffAndSerializeStates();
+            success = true;
+            return publicationContext;
+        } finally {
+            if (success == false) {
+                publicationContext.releaseSerializedStates();
+            }
+        }
     }
 
-    private static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
-        final BytesStreamOutput bStream = new BytesStreamOutput();
-        try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) {
-            stream.setVersion(nodeVersion);
-            stream.writeBoolean(true);
-            clusterState.writeTo(stream);
+    private ReleasableBytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
+        boolean success = false;
+        final ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
+        try {
+            try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(
+                    Streams.flushOnCloseStream(bStream)))) {
+                stream.setVersion(nodeVersion);
+                stream.writeBoolean(true);
+                clusterState.writeTo(stream);
+            }
+            final BytesReference serializedState = bStream.bytes();
+            logger.trace("serialized full cluster state version [{}] for node version [{}] with size [{}]",
+                    clusterState.version(), nodeVersion, serializedState.length());
+            final ReleasableBytesReference releasableBytesReference = new ReleasableBytesReference(serializedState, bStream);
+            success = true;
+            return releasableBytesReference;
+        } finally {
+            if (success == false) {
+                IOUtils.closeWhileHandlingException(bStream);
+            }
         }
-        final BytesReference serializedState = bStream.bytes();
-        logger.trace("serialized full cluster state version [{}] for node version [{}] with size [{}]",
-            clusterState.version(), nodeVersion, serializedState.length());
-        return serializedState;
     }
 
-    private static BytesReference serializeDiffClusterState(Diff<ClusterState> diff, Version nodeVersion) throws IOException {
-        final BytesStreamOutput bStream = new BytesStreamOutput();
-        try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) {
-            stream.setVersion(nodeVersion);
-            stream.writeBoolean(false);
-            diff.writeTo(stream);
+    private ReleasableBytesReference serializeDiffClusterState(Diff<ClusterState> diff, Version nodeVersion) throws IOException {
+        boolean success = false;
+        final ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
+        try {
+            try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(
+                    Streams.flushOnCloseStream(bStream)))) {
+                stream.setVersion(nodeVersion);
+                stream.writeBoolean(false);
+                diff.writeTo(stream);
+            }
+            final BytesReference serializedDiff = bStream.bytes();
+            final ReleasableBytesReference releasableBytesReference = new ReleasableBytesReference(serializedDiff, bStream);
+            success = true;
+            return releasableBytesReference;
+        } finally {
+            if (success == false) {
+                IOUtils.closeWhileHandlingException(bStream);
+            }
         }
-        return bStream.bytes();
     }
 
     /**
@@ -250,8 +285,11 @@ public class PublicationTransportHandler {
         private final ClusterState newState;
         private final ClusterState previousState;
         private final boolean sendFullVersion;
-        private final Map<Version, BytesReference> serializedStates = new HashMap<>();
-        private final Map<Version, BytesReference> serializedDiffs = new HashMap<>();
+
+        private final Object mutex = new Object(); // protects access to the following three fields
+        private boolean serializedStatesReleased;
+        private final Map<Version, ReleasableBytesReference> serializedStates = new HashMap<>();
+        private final Map<Version, ReleasableBytesReference> serializedDiffs = new HashMap<>();
 
         PublicationContext(ClusterChangedEvent clusterChangedEvent) {
             discoveryNodes = clusterChangedEvent.state().nodes();
@@ -261,27 +299,32 @@ public class PublicationTransportHandler {
         }
 
         void buildDiffAndSerializeStates() {
-            Diff<ClusterState> diff = null;
-            for (DiscoveryNode node : discoveryNodes) {
-                try {
-                    if (sendFullVersion || previousState.nodes().nodeExists(node) == false) {
-                        if (serializedStates.containsKey(node.getVersion()) == false) {
-                            serializedStates.put(node.getVersion(), serializeFullClusterState(newState, node.getVersion()));
-                        }
-                    } else {
-                        // will send a diff
-                        if (diff == null) {
-                            diff = newState.diff(previousState);
-                        }
-                        if (serializedDiffs.containsKey(node.getVersion()) == false) {
-                            final BytesReference serializedDiff = serializeDiffClusterState(diff, node.getVersion());
-                            serializedDiffs.put(node.getVersion(), serializedDiff);
-                            logger.trace("serialized cluster state diff for version [{}] in for node version [{}] with size [{}]",
-                                newState.version(), node.getVersion(), serializedDiff.length());
+            synchronized (mutex) {
+                Diff<ClusterState> diff = null;
+                for (DiscoveryNode node : discoveryNodes) {
+                    try {
+                        if (sendFullVersion || previousState.nodes().nodeExists(node) == false) {
+                            if (serializedStates.containsKey(node.getVersion()) == false) {
+                                final ReleasableBytesReference previousBytes
+                                        = serializedStates.put(node.getVersion(), serializeFullClusterState(newState, node.getVersion()));
+                                assert previousBytes == null : "leaked a bytes ref";
+                            }
+                        } else {
+                            // will send a diff
+                            if (diff == null) {
+                                diff = newState.diff(previousState);
+                            }
+                            if (serializedDiffs.containsKey(node.getVersion()) == false) {
+                                final ReleasableBytesReference serializedDiff = serializeDiffClusterState(diff, node.getVersion());
+                                final ReleasableBytesReference previousDiff = serializedDiffs.put(node.getVersion(), serializedDiff);
+                                assert previousDiff == null : "leaked a bytes ref";
+                                logger.trace("serialized cluster state diff for version [{}] in for node version [{}] with size [{}]",
+                                        newState.version(), node.getVersion(), serializedDiff.length());
+                            }
                         }
+                    } catch (IOException e) {
+                        throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, node);
                     }
-                } catch (IOException e) {
-                    throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, node);
                 }
             }
         }
@@ -351,29 +394,88 @@ public class PublicationTransportHandler {
         }
 
         private void sendFullClusterState(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
-            BytesReference bytes = serializedStates.get(destination.getVersion());
+            final boolean alreadyReleasedWhenReadingCache;
+            ReleasableBytesReference bytes;
+            synchronized (mutex) {
+                alreadyReleasedWhenReadingCache = serializedStatesReleased;
+                if (alreadyReleasedWhenReadingCache) {
+                    bytes = null; // not used
+                } else {
+                    bytes = serializedStates.get(destination.getVersion());
+                    if (bytes != null) {
+                        bytes.retain();
+                    }
+                }
+            }
+
+            if (alreadyReleasedWhenReadingCache) {
+                listener.onFailure(
+                        new ElasticsearchException("publication of cluster state version [" + newState.version() + "] has completed"));
+                return;
+            }
+
             if (bytes == null) {
+                // we weren't expecting to send a full state to this node, but the diff didn't work, so serialize the full cluster state...
                 try {
                     bytes = serializeFullClusterState(newState, destination.getVersion());
-                    serializedStates.put(destination.getVersion(), bytes);
                 } catch (Exception e) {
                     logger.warn(() -> new ParameterizedMessage(
                         "failed to serialize cluster state before publishing it to node {}", destination), e);
                     listener.onFailure(e);
                     return;
                 }
+
+                // ... and keep hold of it in case another node needs it too
+                final boolean alreadyReleasedWhenWritingCache;
+                synchronized (mutex) {
+                    alreadyReleasedWhenWritingCache = serializedStatesReleased;
+                    if (alreadyReleasedWhenWritingCache == false) {
+                        final ReleasableBytesReference existingBytes = serializedStates.putIfAbsent(destination.getVersion(), bytes);
+                        if (existingBytes != null) {
+                            // another thread got there first; discard the work we've done and use the cached value
+                            bytes.close();
+                            bytes = existingBytes;
+                        }
+                        bytes.retain();
+                    }
+                }
+                if (alreadyReleasedWhenWritingCache) {
+                    listener.onFailure(
+                            new ElasticsearchException("publication of cluster state version [" + newState.version() + "] has completed"));
+                    return;
+                }
             }
-            sendClusterState(destination, bytes, false, listener);
+
+            //noinspection ConstantConditions this assertion is always true but it's here for the benefit of readers
+            assert bytes != null;
+            sendClusterState(destination, bytes, false, listener); // releases retained bytes on completion
         }
 
         private void sendClusterStateDiff(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
-            final BytesReference bytes = serializedDiffs.get(destination.getVersion());
-            assert bytes != null
-                : "failed to find serialized diff for node " + destination + " of version [" + destination.getVersion() + "]";
-            sendClusterState(destination, bytes, true, listener);
+            final ReleasableBytesReference bytes;
+            final boolean alreadyReleased;
+            synchronized (mutex) {
+                alreadyReleased = serializedStatesReleased;
+                if (alreadyReleased) {
+                    bytes = null; // not used
+                } else {
+                    bytes = serializedDiffs.get(destination.getVersion());
+                    assert bytes != null
+                            : "failed to find serialized diff for node " + destination + " of version [" + destination.getVersion() + "]";
+                    bytes.retain();
+                }
+            }
+            if (alreadyReleased) {
+                listener.onFailure(
+                        new ElasticsearchException("publication of cluster state version [" + newState.version() + "] has completed"));
+            } else {
+                //noinspection ConstantConditions this assertion is always true but it's here for the benefit of readers
+                assert bytes != null;
+                sendClusterState(destination, bytes, true, listener); // releases retained bytes on completion
+            }
         }
 
-        private void sendClusterState(DiscoveryNode destination, BytesReference bytes, boolean retryWithFullClusterStateOnFailure,
+        private void sendClusterState(DiscoveryNode destination, ReleasableBytesReference bytes, boolean retryWithFullClusterStateOnFailure,
                                       ActionListener<PublishWithJoinResponse> listener) {
             try {
                 final BytesTransportRequest request = new BytesTransportRequest(bytes, destination.getVersion());
@@ -396,11 +498,13 @@ public class PublicationTransportHandler {
 
                         @Override
                         public void handleResponse(PublishWithJoinResponse response) {
+                            bytes.close();
                             listener.onResponse(response);
                         }
 
                         @Override
                         public void handleException(TransportException exp) {
+                            bytes.close();
                             transportExceptionHandler.accept(exp);
                         }
 
@@ -412,9 +516,21 @@ public class PublicationTransportHandler {
                 transportService.sendRequest(destination, PUBLISH_STATE_ACTION_NAME, request, stateRequestOptions, responseHandler);
             } catch (Exception e) {
                 logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", destination), e);
+                bytes.close();
                 listener.onFailure(e);
             }
         }
+
+        public void releaseSerializedStates() {
+            synchronized (mutex) {
+                assert serializedStatesReleased == false;
+                serializedStatesReleased = true;
+                serializedStates.values().forEach(ReleasableBytesReference::close);
+                serializedDiffs.values().forEach(ReleasableBytesReference::close);
+                serializedStates.clear();
+                serializedDiffs.clear();
+            }
+        }
     }
 
 }

+ 3 - 2
server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java

@@ -37,6 +37,7 @@ import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.gateway.GatewayMetaState;
 import org.elasticsearch.monitor.NodeHealthService;
 import org.elasticsearch.plugins.DiscoveryPlugin;
@@ -86,7 +87,7 @@ public class DiscoveryModule {
                            NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService,
                            ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
                            AllocationService allocationService, Path configFile, GatewayMetaState gatewayMetaState,
-                           RerouteService rerouteService, NodeHealthService nodeHealthService) {
+                           RerouteService rerouteService, NodeHealthService nodeHealthService, BigArrays bigArrays) {
         final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
         final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
         hostProviders.put("settings", () -> new SettingsBasedSeedHostsProvider(settings, transportService));
@@ -148,7 +149,7 @@ public class DiscoveryModule {
                 settings, clusterSettings,
                 transportService, namedWriteableRegistry, allocationService, masterService, gatewayMetaState::getPersistedState,
                 seedHostsProvider, clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), rerouteService,
-                electionStrategy, nodeHealthService);
+                electionStrategy, nodeHealthService, bigArrays);
         } else {
             throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
         }

+ 1 - 1
server/src/main/java/org/elasticsearch/node/Node.java

@@ -582,7 +582,7 @@ public class Node implements Closeable {
                 networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
                 clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
                 clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, rerouteService,
-                fsHealthService);
+                fsHealthService, bigArrays);
             this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
                 transportService, indicesService, pluginsService, circuitBreakerService, scriptService,
                 httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,

+ 2 - 1
server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java

@@ -36,6 +36,7 @@ import org.elasticsearch.cluster.service.MasterServiceTests;
 import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.BaseFuture;
 import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.monitor.NodeHealthService;
@@ -182,7 +183,7 @@ public class NodeJoinTests extends ESTestCase {
             () -> new InMemoryPersistedState(term, initialState), r -> emptyList(),
             new NoOpClusterApplier(),
             Collections.emptyList(),
-            random, (s, p, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE, nodeHealthService);
+            random, (s, p, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE, nodeHealthService, BigArrays.NON_RECYCLING_INSTANCE);
         transportService.start();
         transportService.acceptIncomingRequests();
         transport = capturingTransport;

+ 5 - 1
server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTransportHandlerTests.java

@@ -29,6 +29,9 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.MockBigArrays;
+import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.transport.CapturingTransport;
@@ -52,8 +55,9 @@ public class PublicationTransportHandlerTests extends ESTestCase {
             TransportService.NOOP_TRANSPORT_INTERCEPTOR,
             x -> localNode,
             clusterSettings, Collections.emptySet());
+        final BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, null);
         final PublicationTransportHandler handler = new PublicationTransportHandler(transportService,
-            writableRegistry(), pu -> null, (pu, l) -> {});
+            writableRegistry(), pu -> null, (pu, l) -> {}, bigArrays);
         transportService.start();
         transportService.acceptIncomingRequests();
 

+ 2 - 1
server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java

@@ -29,6 +29,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.network.NetworkService;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.gateway.GatewayMetaState;
 import org.elasticsearch.plugins.DiscoveryPlugin;
@@ -87,7 +88,7 @@ public class DiscoveryModuleTests extends ESTestCase {
     private DiscoveryModule newModule(Settings settings, List<DiscoveryPlugin> plugins) {
         return new DiscoveryModule(settings, transportService, namedWriteableRegistry, null, masterService,
             clusterApplier, clusterSettings, plugins, null, createTempDir().toAbsolutePath(), gatewayMetaState,
-            mock(RerouteService.class), null);
+            mock(RerouteService.class), null, BigArrays.NON_RECYCLING_INSTANCE);
     }
 
     public void testDefaults() {

+ 3 - 2
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -142,6 +142,7 @@ import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.MockBigArrays;
 import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
@@ -1502,7 +1503,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 rerouteServiceSetOnce.set(rerouteService);
                 final IndexScopedSettings indexScopedSettings =
                     new IndexScopedSettings(settings, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS);
-                bigArrays = new BigArrays(new PageCacheRecycler(settings), null, "test");
+                bigArrays = new MockBigArrays(new PageCacheRecycler(settings), null);
                 final MapperRegistry mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry();
                 indicesService = new IndicesService(
                     settings,
@@ -1727,7 +1728,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                         .map(n -> n.node.getAddress()).collect(Collectors.toList()),
                     clusterService.getClusterApplierService(), Collections.emptyList(), random(),
                     rerouteService, ElectionStrategy.DEFAULT_INSTANCE,
-                    () -> new StatusInfo(HEALTHY, "healthy-info"));
+                    () -> new StatusInfo(HEALTHY, "healthy-info"), bigArrays);
                 masterService.setClusterStatePublisher(coordinator);
                 coordinator.start();
                 clusterService.getClusterApplierService().setNodeConnectionsService(nodeConnectionsService);

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

@@ -974,7 +974,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                 coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
                     allocationService, masterService, this::getPersistedState,
                     Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), (s, p, r) -> {},
-                    getElectionStrategy(), nodeHealthService);
+                    getElectionStrategy(), nodeHealthService, bigArrays);
                 masterService.setClusterStatePublisher(coordinator);
                 final GatewayService gatewayService
                     = new GatewayService(settings, allocationService, clusterService, threadPool, coordinator, null);