浏览代码

Add toString() for various tasks (#90166)

There are various tasks involved in cluster coordination (i.e.
`CoordinatorTests`) which do not have a useful `toString()`
implementation. This obscures their meanings in trace logs, so with this
commit we add proper `toString()` implementations as appropriate.
David Turner 3 年之前
父节点
当前提交
289533ba39

+ 16 - 1
server/src/main/java/org/elasticsearch/action/ActionRunnable.java

@@ -44,7 +44,17 @@ public abstract class ActionRunnable<Response> extends AbstractRunnable {
      * @return Wrapped {@code Runnable}
      */
     public static <T> ActionRunnable<T> supply(ActionListener<T> listener, CheckedSupplier<T, Exception> supplier) {
-        return ActionRunnable.wrap(listener, l -> l.onResponse(supplier.get()));
+        return ActionRunnable.wrap(listener, new CheckedConsumer<>() {
+            @Override
+            public void accept(ActionListener<T> l) throws Exception {
+                l.onResponse(supplier.get());
+            }
+
+            @Override
+            public String toString() {
+                return supplier.toString();
+            }
+        });
     }
 
     /**
@@ -61,6 +71,11 @@ public abstract class ActionRunnable<Response> extends AbstractRunnable {
             protected void doRun() throws Exception {
                 consumer.accept(listener);
             }
+
+            @Override
+            public String toString() {
+                return "ActionRunnable#wrap[" + consumer + "]";
+            }
         };
     }
 

+ 22 - 2
server/src/main/java/org/elasticsearch/cluster/ClusterState.java

@@ -358,10 +358,30 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
      */
     public void initializeAsync(Executor executor) {
         if (routingNodes == null) {
-            executor.execute(this::getRoutingNodes);
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    getRoutingNodes();
+                }
+
+                @Override
+                public String toString() {
+                    return "async initialization of routing nodes for cluster state " + version();
+                }
+            });
         }
         if (metadata.indicesLookupInitialized() == false) {
-            executor.execute(metadata::getIndicesLookup);
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    metadata.getIndicesLookup();
+                }
+
+                @Override
+                public String toString() {
+                    return "async initialization of indices lookup for cluster state " + version();
+                }
+            });
         }
     }
 

+ 11 - 2
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

@@ -768,8 +768,17 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
         if (mode == Mode.LEADER && applierState.nodes().size() == 1) {
             if (singleNodeClusterChecker == null) {
                 // Make a single-node checker if none exists
-                singleNodeClusterChecker = transportService.getThreadPool()
-                    .scheduleWithFixedDelay(() -> { checkSingleNodeCluster(); }, this.singleNodeClusterSeedHostsCheckInterval, Names.SAME);
+                singleNodeClusterChecker = transportService.getThreadPool().scheduleWithFixedDelay(new Runnable() {
+                    @Override
+                    public void run() {
+                        Coordinator.this.checkSingleNodeCluster();
+                    }
+
+                    @Override
+                    public String toString() {
+                        return "single-node cluster checker";
+                    }
+                }, this.singleNodeClusterSeedHostsCheckInterval, Names.SAME);
             }
             return;
         }

+ 16 - 5
server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java

@@ -16,6 +16,7 @@ import org.elasticsearch.action.support.ChannelActionListener;
 import org.elasticsearch.cluster.coordination.Coordinator.Mode;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.common.CheckedSupplier;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Setting;
@@ -199,11 +200,21 @@ public class FollowersChecker {
             throw new CoordinationStateRejectedException("rejecting " + request + " since local state is " + this);
         }
 
-        transportService.getThreadPool().executor(Names.CLUSTER_COORDINATION).execute(ActionRunnable.supply(listener, () -> {
-            logger.trace("responding to {} on slow path", request);
-            handleRequestAndUpdateState.accept(request);
-            return Empty.INSTANCE;
-        }));
+        transportService.getThreadPool()
+            .executor(Names.CLUSTER_COORDINATION)
+            .execute(ActionRunnable.supply(listener, new CheckedSupplier<>() {
+                @Override
+                public Empty get() {
+                    logger.trace("responding to {} on slow path", request);
+                    handleRequestAndUpdateState.accept(request);
+                    return Empty.INSTANCE;
+                }
+
+                @Override
+                public String toString() {
+                    return "responding to [" + request + "] on slow path";
+                }
+            }));
     }
 
     /**

+ 29 - 15
server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java

@@ -21,6 +21,7 @@ import org.elasticsearch.cluster.IncompatibleClusterStateVersionException;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.common.CheckedSupplier;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.compress.Compressor;
 import org.elasticsearch.common.compress.CompressorFactory;
@@ -329,6 +330,7 @@ public class PublicationTransportHandler {
             assert refCount() > 0;
             assert publishRequest.getAcceptedState() == newState : "state got switched on us";
             assert transportService.getThreadPool().getThreadContext().isSystemContext();
+            final var newStateVersion = newState.version();
             if (destination.equals(discoveryNodes.getLocalNode())) {
 
                 // The transport service normally avoids serializing/deserializing requests to the local node but here we have special
@@ -339,26 +341,38 @@ public class PublicationTransportHandler {
                 // because it only makes sense on the local node (e.g. UnassignedInfo#unassignedTimeNanos).
 
                 final boolean isVotingOnlyNode = discoveryNodes.getLocalNode().getRoles().contains(DiscoveryNodeRole.VOTING_ONLY_NODE_ROLE);
-                logger.trace("handling cluster state version [{}] locally on [{}]", newState.version(), destination);
+                logger.trace("handling cluster state version [{}] locally on [{}]", newStateVersion, destination);
                 transportService.getThreadPool()
                     .executor(ThreadPool.Names.CLUSTER_COORDINATION)
-                    .execute(transportService.getThreadPool().getThreadContext().preserveContext(ActionRunnable.supply(listener, () -> {
-                        if (isVotingOnlyNode) {
-                            // Voting-only nodes publish their cluster state to other nodes in order to freshen the state held on other full
-                            // master nodes, but then fail the publication before committing. However there's no need to freshen our local
-                            // state so we can fail right away.
-                            throw new TransportException(
-                                new ElasticsearchException("voting-only node skipping local publication to " + destination)
-                            );
-                        } else {
-                            return handlePublishRequest.apply(publishRequest);
-                        }
-                    })));
+                    .execute(
+                        transportService.getThreadPool()
+                            .getThreadContext()
+                            .preserveContext(ActionRunnable.supply(listener, new CheckedSupplier<>() {
+                                @Override
+                                public PublishWithJoinResponse get() {
+                                    if (isVotingOnlyNode) {
+                                        // Voting-only nodes publish their cluster state to other nodes in order to freshen the state held
+                                        // on other full master nodes, but then fail the publication before committing. However there's no
+                                        // need to freshen our local state so we can fail right away.
+                                        throw new TransportException(
+                                            new ElasticsearchException("voting-only node skipping local publication to " + destination)
+                                        );
+                                    } else {
+                                        return handlePublishRequest.apply(publishRequest);
+                                    }
+                                }
+
+                                @Override
+                                public String toString() {
+                                    return "handling cluster state version [" + newStateVersion + "] locally on [" + destination + "]";
+                                }
+                            }))
+                    );
             } else if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
-                logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
+                logger.trace("sending full cluster state version [{}] to [{}]", newStateVersion, destination);
                 sendFullClusterState(destination, listener);
             } else {
-                logger.trace("sending cluster state diff for version [{}] to [{}]", newState.version(), destination);
+                logger.trace("sending cluster state diff for version [{}] to [{}]", newStateVersion, destination);
                 sendClusterStateDiff(destination, listener);
             }
         }