|
@@ -31,18 +31,15 @@ import org.elasticsearch.cluster.IncompatibleClusterStateVersionException;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
|
-import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
|
|
import org.elasticsearch.common.compress.Compressor;
|
|
|
import org.elasticsearch.common.compress.CompressorFactory;
|
|
|
-import org.elasticsearch.common.io.Streams;
|
|
|
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
|
|
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
|
|
|
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
|
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
|
|
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
|
|
|
-import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
|
|
import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
|
|
-import org.elasticsearch.common.util.BigArrays;
|
|
|
import org.elasticsearch.core.internal.io.IOUtils;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.transport.BytesTransportRequest;
|
|
@@ -72,7 +69,6 @@ public class PublicationTransportHandler {
|
|
|
private final TransportService transportService;
|
|
|
private final NamedWriteableRegistry namedWriteableRegistry;
|
|
|
private final Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest;
|
|
|
- private final BigArrays bigArrays;
|
|
|
|
|
|
private final AtomicReference<ClusterState> lastSeenClusterState = new AtomicReference<>();
|
|
|
|
|
@@ -93,12 +89,10 @@ public class PublicationTransportHandler {
|
|
|
|
|
|
public PublicationTransportHandler(TransportService transportService, NamedWriteableRegistry namedWriteableRegistry,
|
|
|
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
|
|
|
- BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit,
|
|
|
- BigArrays bigArrays) {
|
|
|
+ BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit) {
|
|
|
this.transportService = transportService;
|
|
|
this.namedWriteableRegistry = namedWriteableRegistry;
|
|
|
this.handlePublishRequest = handlePublishRequest;
|
|
|
- this.bigArrays = bigArrays;
|
|
|
|
|
|
transportService.registerRequestHandler(PUBLISH_STATE_ACTION_NAME, ThreadPool.Names.GENERIC, false, false,
|
|
|
BytesTransportRequest::new, (request, channel, task) -> channel.sendResponse(handleIncomingPublishRequest(request)));
|
|
@@ -218,60 +212,31 @@ public class PublicationTransportHandler {
|
|
|
// Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
|
|
|
// straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
|
|
|
// therefore serializing it) if the diff-based publication fails.
|
|
|
- boolean success = false;
|
|
|
- try {
|
|
|
- publicationContext.buildDiffAndSerializeStates();
|
|
|
- success = true;
|
|
|
- return publicationContext;
|
|
|
- } finally {
|
|
|
- if (success == false) {
|
|
|
- publicationContext.releaseSerializedStates();
|
|
|
- }
|
|
|
- }
|
|
|
+ publicationContext.buildDiffAndSerializeStates();
|
|
|
+ return publicationContext;
|
|
|
}
|
|
|
|
|
|
- private ReleasableBytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
|
|
|
- boolean success = false;
|
|
|
- final ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
|
|
|
- try {
|
|
|
- try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(
|
|
|
- Streams.flushOnCloseStream(bStream)))) {
|
|
|
- stream.setVersion(nodeVersion);
|
|
|
- stream.writeBoolean(true);
|
|
|
- clusterState.writeTo(stream);
|
|
|
- }
|
|
|
- final BytesReference serializedState = bStream.bytes();
|
|
|
- logger.trace("serialized full cluster state version [{}] for node version [{}] with size [{}]",
|
|
|
- clusterState.version(), nodeVersion, serializedState.length());
|
|
|
- final ReleasableBytesReference releasableBytesReference = new ReleasableBytesReference(serializedState, bStream);
|
|
|
- success = true;
|
|
|
- return releasableBytesReference;
|
|
|
- } finally {
|
|
|
- if (success == false) {
|
|
|
- IOUtils.closeWhileHandlingException(bStream);
|
|
|
- }
|
|
|
+ private static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
|
|
|
+ final BytesStreamOutput bStream = new BytesStreamOutput();
|
|
|
+ try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) {
|
|
|
+ stream.setVersion(nodeVersion);
|
|
|
+ stream.writeBoolean(true);
|
|
|
+ clusterState.writeTo(stream);
|
|
|
}
|
|
|
+ final BytesReference serializedState = bStream.bytes();
|
|
|
+ logger.trace("serialized full cluster state version [{}] for node version [{}] with size [{}]",
|
|
|
+ clusterState.version(), nodeVersion, serializedState.length());
|
|
|
+ return serializedState;
|
|
|
}
|
|
|
|
|
|
- private ReleasableBytesReference serializeDiffClusterState(Diff<ClusterState> diff, Version nodeVersion) throws IOException {
|
|
|
- boolean success = false;
|
|
|
- final ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
|
|
|
- try {
|
|
|
- try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(
|
|
|
- Streams.flushOnCloseStream(bStream)))) {
|
|
|
- stream.setVersion(nodeVersion);
|
|
|
- stream.writeBoolean(false);
|
|
|
- diff.writeTo(stream);
|
|
|
- }
|
|
|
- final BytesReference serializedDiff = bStream.bytes();
|
|
|
- final ReleasableBytesReference releasableBytesReference = new ReleasableBytesReference(serializedDiff, bStream);
|
|
|
- success = true;
|
|
|
- return releasableBytesReference;
|
|
|
- } finally {
|
|
|
- if (success == false) {
|
|
|
- IOUtils.closeWhileHandlingException(bStream);
|
|
|
- }
|
|
|
+ private static BytesReference serializeDiffClusterState(Diff<ClusterState> diff, Version nodeVersion) throws IOException {
|
|
|
+ final BytesStreamOutput bStream = new BytesStreamOutput();
|
|
|
+ try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream))) {
|
|
|
+ stream.setVersion(nodeVersion);
|
|
|
+ stream.writeBoolean(false);
|
|
|
+ diff.writeTo(stream);
|
|
|
}
|
|
|
+ return bStream.bytes();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -285,11 +250,8 @@ public class PublicationTransportHandler {
|
|
|
private final ClusterState newState;
|
|
|
private final ClusterState previousState;
|
|
|
private final boolean sendFullVersion;
|
|
|
-
|
|
|
- private final Object mutex = new Object(); // protects access to the following three fields
|
|
|
- private boolean serializedStatesReleased;
|
|
|
- private final Map<Version, ReleasableBytesReference> serializedStates = new HashMap<>();
|
|
|
- private final Map<Version, ReleasableBytesReference> serializedDiffs = new HashMap<>();
|
|
|
+ private final Map<Version, BytesReference> serializedStates = new HashMap<>();
|
|
|
+ private final Map<Version, BytesReference> serializedDiffs = new HashMap<>();
|
|
|
|
|
|
PublicationContext(ClusterChangedEvent clusterChangedEvent) {
|
|
|
discoveryNodes = clusterChangedEvent.state().nodes();
|
|
@@ -299,32 +261,27 @@ public class PublicationTransportHandler {
|
|
|
}
|
|
|
|
|
|
void buildDiffAndSerializeStates() {
|
|
|
- synchronized (mutex) {
|
|
|
- Diff<ClusterState> diff = null;
|
|
|
- for (DiscoveryNode node : discoveryNodes) {
|
|
|
- try {
|
|
|
- if (sendFullVersion || previousState.nodes().nodeExists(node) == false) {
|
|
|
- if (serializedStates.containsKey(node.getVersion()) == false) {
|
|
|
- final ReleasableBytesReference previousBytes
|
|
|
- = serializedStates.put(node.getVersion(), serializeFullClusterState(newState, node.getVersion()));
|
|
|
- assert previousBytes == null : "leaked a bytes ref";
|
|
|
- }
|
|
|
- } else {
|
|
|
- // will send a diff
|
|
|
- if (diff == null) {
|
|
|
- diff = newState.diff(previousState);
|
|
|
- }
|
|
|
- if (serializedDiffs.containsKey(node.getVersion()) == false) {
|
|
|
- final ReleasableBytesReference serializedDiff = serializeDiffClusterState(diff, node.getVersion());
|
|
|
- final ReleasableBytesReference previousDiff = serializedDiffs.put(node.getVersion(), serializedDiff);
|
|
|
- assert previousDiff == null : "leaked a bytes ref";
|
|
|
- logger.trace("serialized cluster state diff for version [{}] in for node version [{}] with size [{}]",
|
|
|
- newState.version(), node.getVersion(), serializedDiff.length());
|
|
|
- }
|
|
|
+ Diff<ClusterState> diff = null;
|
|
|
+ for (DiscoveryNode node : discoveryNodes) {
|
|
|
+ try {
|
|
|
+ if (sendFullVersion || previousState.nodes().nodeExists(node) == false) {
|
|
|
+ if (serializedStates.containsKey(node.getVersion()) == false) {
|
|
|
+ serializedStates.put(node.getVersion(), serializeFullClusterState(newState, node.getVersion()));
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // will send a diff
|
|
|
+ if (diff == null) {
|
|
|
+ diff = newState.diff(previousState);
|
|
|
+ }
|
|
|
+ if (serializedDiffs.containsKey(node.getVersion()) == false) {
|
|
|
+ final BytesReference serializedDiff = serializeDiffClusterState(diff, node.getVersion());
|
|
|
+ serializedDiffs.put(node.getVersion(), serializedDiff);
|
|
|
+ logger.trace("serialized cluster state diff for version [{}] in for node version [{}] with size [{}]",
|
|
|
+ newState.version(), node.getVersion(), serializedDiff.length());
|
|
|
}
|
|
|
- } catch (IOException e) {
|
|
|
- throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, node);
|
|
|
}
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, node);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -394,88 +351,29 @@ public class PublicationTransportHandler {
|
|
|
}
|
|
|
|
|
|
private void sendFullClusterState(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
|
|
|
- final boolean alreadyReleasedWhenReadingCache;
|
|
|
- ReleasableBytesReference bytes;
|
|
|
- synchronized (mutex) {
|
|
|
- alreadyReleasedWhenReadingCache = serializedStatesReleased;
|
|
|
- if (alreadyReleasedWhenReadingCache) {
|
|
|
- bytes = null; // not used
|
|
|
- } else {
|
|
|
- bytes = serializedStates.get(destination.getVersion());
|
|
|
- if (bytes != null) {
|
|
|
- bytes.retain();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (alreadyReleasedWhenReadingCache) {
|
|
|
- listener.onFailure(
|
|
|
- new ElasticsearchException("publication of cluster state version [" + newState.version() + "] has completed"));
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
+ BytesReference bytes = serializedStates.get(destination.getVersion());
|
|
|
if (bytes == null) {
|
|
|
- // we weren't expecting to send a full state to this node, but the diff didn't work, so serialize the full cluster state...
|
|
|
try {
|
|
|
bytes = serializeFullClusterState(newState, destination.getVersion());
|
|
|
+ serializedStates.put(destination.getVersion(), bytes);
|
|
|
} catch (Exception e) {
|
|
|
logger.warn(() -> new ParameterizedMessage(
|
|
|
"failed to serialize cluster state before publishing it to node {}", destination), e);
|
|
|
listener.onFailure(e);
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
- // ... and keep hold of it in case another node needs it too
|
|
|
- final boolean alreadyReleasedWhenWritingCache;
|
|
|
- synchronized (mutex) {
|
|
|
- alreadyReleasedWhenWritingCache = serializedStatesReleased;
|
|
|
- if (alreadyReleasedWhenWritingCache == false) {
|
|
|
- final ReleasableBytesReference existingBytes = serializedStates.putIfAbsent(destination.getVersion(), bytes);
|
|
|
- if (existingBytes != null) {
|
|
|
- // another thread got there first; discard the work we've done and use the cached value
|
|
|
- bytes.close();
|
|
|
- bytes = existingBytes;
|
|
|
- }
|
|
|
- bytes.retain();
|
|
|
- }
|
|
|
- }
|
|
|
- if (alreadyReleasedWhenWritingCache) {
|
|
|
- listener.onFailure(
|
|
|
- new ElasticsearchException("publication of cluster state version [" + newState.version() + "] has completed"));
|
|
|
- return;
|
|
|
- }
|
|
|
}
|
|
|
-
|
|
|
- //noinspection ConstantConditions this assertion is always true but it's here for the benefit of readers
|
|
|
- assert bytes != null;
|
|
|
- sendClusterState(destination, bytes, false, listener); // releases retained bytes on completion
|
|
|
+ sendClusterState(destination, bytes, false, listener);
|
|
|
}
|
|
|
|
|
|
private void sendClusterStateDiff(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
|
|
|
- final ReleasableBytesReference bytes;
|
|
|
- final boolean alreadyReleased;
|
|
|
- synchronized (mutex) {
|
|
|
- alreadyReleased = serializedStatesReleased;
|
|
|
- if (alreadyReleased) {
|
|
|
- bytes = null; // not used
|
|
|
- } else {
|
|
|
- bytes = serializedDiffs.get(destination.getVersion());
|
|
|
- assert bytes != null
|
|
|
- : "failed to find serialized diff for node " + destination + " of version [" + destination.getVersion() + "]";
|
|
|
- bytes.retain();
|
|
|
- }
|
|
|
- }
|
|
|
- if (alreadyReleased) {
|
|
|
- listener.onFailure(
|
|
|
- new ElasticsearchException("publication of cluster state version [" + newState.version() + "] has completed"));
|
|
|
- } else {
|
|
|
- //noinspection ConstantConditions this assertion is always true but it's here for the benefit of readers
|
|
|
- assert bytes != null;
|
|
|
- sendClusterState(destination, bytes, true, listener); // releases retained bytes on completion
|
|
|
- }
|
|
|
+ final BytesReference bytes = serializedDiffs.get(destination.getVersion());
|
|
|
+ assert bytes != null
|
|
|
+ : "failed to find serialized diff for node " + destination + " of version [" + destination.getVersion() + "]";
|
|
|
+ sendClusterState(destination, bytes, true, listener);
|
|
|
}
|
|
|
|
|
|
- private void sendClusterState(DiscoveryNode destination, ReleasableBytesReference bytes, boolean retryWithFullClusterStateOnFailure,
|
|
|
+ private void sendClusterState(DiscoveryNode destination, BytesReference bytes, boolean retryWithFullClusterStateOnFailure,
|
|
|
ActionListener<PublishWithJoinResponse> listener) {
|
|
|
try {
|
|
|
final BytesTransportRequest request = new BytesTransportRequest(bytes, destination.getVersion());
|
|
@@ -498,13 +396,11 @@ public class PublicationTransportHandler {
|
|
|
|
|
|
@Override
|
|
|
public void handleResponse(PublishWithJoinResponse response) {
|
|
|
- bytes.close();
|
|
|
listener.onResponse(response);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void handleException(TransportException exp) {
|
|
|
- bytes.close();
|
|
|
transportExceptionHandler.accept(exp);
|
|
|
}
|
|
|
|
|
@@ -516,21 +412,9 @@ public class PublicationTransportHandler {
|
|
|
transportService.sendRequest(destination, PUBLISH_STATE_ACTION_NAME, request, stateRequestOptions, responseHandler);
|
|
|
} catch (Exception e) {
|
|
|
logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", destination), e);
|
|
|
- bytes.close();
|
|
|
listener.onFailure(e);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- public void releaseSerializedStates() {
|
|
|
- synchronized (mutex) {
|
|
|
- assert serializedStatesReleased == false;
|
|
|
- serializedStatesReleased = true;
|
|
|
- serializedStates.values().forEach(ReleasableBytesReference::close);
|
|
|
- serializedDiffs.values().forEach(ReleasableBytesReference::close);
|
|
|
- serializedStates.clear();
|
|
|
- serializedDiffs.clear();
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
}
|