Selaa lähdekoodia

Compute master task batch summary lazily (#86210)

Today we construct a string description of every batch of master tasks
in case we need to log it, grouping the tasks by `source` and calling
`toString()` on each one. Yet, we almost never emit the description in
the logs so this effort is usually wasted. This commit makes the summary
computation lazy, deferring the effort until we discover it's actually
needed and avoiding it entirely in most cases.
David Turner 3 vuotta sitten
vanhempi
commit
a733f4287a

+ 5 - 0
docs/changelog/86210.yaml

@@ -0,0 +1,5 @@
+pr: 86210
+summary: Compute master task batch summary lazily
+area: Cluster Coordination
+type: enhancement
+issues: []

+ 5 - 3
server/src/main/java/org/elasticsearch/cluster/ClusterStatePublicationEvent.java

@@ -8,6 +8,8 @@
 
 package org.elasticsearch.cluster;
 
+import org.elasticsearch.cluster.service.BatchSummary;
+
 /**
  * Represents a cluster state update computed by the {@link org.elasticsearch.cluster.service.MasterService} for publication to the cluster.
  * If publication is successful then this creates a {@link ClusterChangedEvent} which is applied on every node.
@@ -19,7 +21,7 @@ public class ClusterStatePublicationEvent {
      */
     private static final long NOT_SET = -1L;
 
-    private final String summary;
+    private final BatchSummary summary;
     private final ClusterState oldState;
     private final ClusterState newState;
     private final long computationTimeMillis;
@@ -30,7 +32,7 @@ public class ClusterStatePublicationEvent {
     private volatile long masterApplyElapsedMillis = NOT_SET;
 
     public ClusterStatePublicationEvent(
-        String summary,
+        BatchSummary summary,
         ClusterState oldState,
         ClusterState newState,
         long computationTimeMillis,
@@ -43,7 +45,7 @@ public class ClusterStatePublicationEvent {
         this.publicationStartTimeMillis = publicationStartTimeMillis;
     }
 
-    public String getSummary() {
+    public BatchSummary getSummary() {
         return summary;
     }
 

+ 52 - 0
server/src/main/java/org/elasticsearch/cluster/service/BatchSummary.java

@@ -0,0 +1,52 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster.service;
+
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.util.LazyInitializable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class BatchSummary {
+
+    static final int MAX_TASK_DESCRIPTION_CHARS = 8 * 1024;
+
+    private final LazyInitializable<String, RuntimeException> lazyDescription;
+
+    public BatchSummary(TaskBatcher.BatchedTask firstTask, List<TaskBatcher.BatchedTask> allTasks) {
+        lazyDescription = new LazyInitializable<>(() -> {
+            final Map<String, List<TaskBatcher.BatchedTask>> processTasksBySource = new HashMap<>();
+            for (final var task : allTasks) {
+                processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
+            }
+            final StringBuilder output = new StringBuilder();
+            Strings.collectionToDelimitedStringWithLimit((Iterable<String>) () -> processTasksBySource.entrySet().stream().map(entry -> {
+                String tasks = firstTask.describeTasks(entry.getValue());
+                return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
+            }).filter(s -> s.isEmpty() == false).iterator(), ", ", "", "", MAX_TASK_DESCRIPTION_CHARS, output);
+            if (output.length() > MAX_TASK_DESCRIPTION_CHARS) {
+                output.append(" (").append(allTasks.size()).append(" tasks in total)");
+            }
+            return output.toString();
+        });
+    }
+
+    // for tests
+    public BatchSummary(String string) {
+        lazyDescription = new LazyInitializable<>(() -> string);
+    }
+
+    @Override
+    public String toString() {
+        return lazyDescription.getOrCompute();
+    }
+}

+ 6 - 6
server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

@@ -150,7 +150,7 @@ public class MasterService extends AbstractLifecycleComponent {
         }
 
         @Override
-        protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {
+        protected void run(Object batchingKey, List<? extends BatchedTask> tasks, BatchSummary tasksSummary) {
             runTasks((ClusterStateTaskExecutor<ClusterStateTaskListener>) batchingKey, (List<UpdateTask>) tasks, tasksSummary);
         }
 
@@ -231,7 +231,7 @@ public class MasterService extends AbstractLifecycleComponent {
     private void runTasks(
         ClusterStateTaskExecutor<ClusterStateTaskListener> executor,
         List<Batcher.UpdateTask> updateTasks,
-        String summary
+        BatchSummary summary
     ) {
         if (lifecycle.started() == false) {
             logger.debug("processing [{}]: ignoring, master service not started", summary);
@@ -422,7 +422,7 @@ public class MasterService extends AbstractLifecycleComponent {
         );
     }
 
-    private void handleException(String summary, long startTimeMillis, ClusterState newClusterState, Exception e) {
+    private void handleException(BatchSummary summary, long startTimeMillis, ClusterState newClusterState, Exception e) {
         final TimeValue executionTime = getTimeSince(startTimeMillis);
         final long version = newClusterState.version();
         final String stateUUID = newClusterState.stateUUID();
@@ -587,7 +587,7 @@ public class MasterService extends AbstractLifecycleComponent {
         return threadPoolExecutor.getMaxTaskWaitTime();
     }
 
-    private void logExecutionTime(TimeValue executionTime, String activity, String summary) {
+    private void logExecutionTime(TimeValue executionTime, String activity, BatchSummary summary) {
         if (executionTime.getMillis() > slowTaskLoggingThreshold.getMillis()) {
             logger.warn(
                 "took [{}/{}ms] to {} for [{}], which exceeds the warn threshold of [{}]",
@@ -898,7 +898,7 @@ public class MasterService extends AbstractLifecycleComponent {
         ClusterState previousClusterState,
         List<ExecutionResult<ClusterStateTaskListener>> executionResults,
         ClusterStateTaskExecutor<ClusterStateTaskListener> executor,
-        String summary
+        BatchSummary summary
     ) {
         final var resultingState = innerExecuteTasks(previousClusterState, executionResults, executor, summary);
         if (previousClusterState != resultingState
@@ -926,7 +926,7 @@ public class MasterService extends AbstractLifecycleComponent {
         ClusterState previousClusterState,
         List<ExecutionResult<ClusterStateTaskListener>> executionResults,
         ClusterStateTaskExecutor<ClusterStateTaskListener> executor,
-        String summary
+        BatchSummary summary
     ) {
         final var taskContexts = castTaskContexts(executionResults);
         try {

+ 2 - 24
server/src/main/java/org/elasticsearch/cluster/service/TaskBatcher.java

@@ -10,7 +10,6 @@ package org.elasticsearch.cluster.service;
 
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.common.Priority;
-import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
 import org.elasticsearch.core.Nullable;
@@ -18,7 +17,6 @@ import org.elasticsearch.core.TimeValue;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -92,7 +90,6 @@ public abstract class TaskBatcher {
         // to give other tasks with different batching key a chance to execute.
         if (updateTask.processed.get() == false) {
             final List<BatchedTask> toExecute = new ArrayList<>();
-            final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
             final Set<BatchedTask> pending = tasksPerBatchingKey.remove(updateTask.batchingKey);
             if (pending != null) {
                 // pending is a java.util.Collections.SynchronizedSet so we can safely iterate holding its mutex
@@ -102,7 +99,6 @@ public abstract class TaskBatcher {
                         if (task.processed.getAndSet(true) == false) {
                             logger.trace("will process {}", task);
                             toExecute.add(task);
-                            processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
                         } else {
                             logger.trace("skipping {}, already processed", task);
                         }
@@ -111,34 +107,16 @@ public abstract class TaskBatcher {
             }
 
             if (toExecute.isEmpty() == false) {
-                run(updateTask.batchingKey, toExecute, buildTasksDescription(updateTask, toExecute, processTasksBySource));
+                run(updateTask.batchingKey, toExecute, new BatchSummary(updateTask, toExecute));
             }
         }
     }
 
-    private static final int MAX_TASK_DESCRIPTION_CHARS = 8 * 1024;
-
-    private String buildTasksDescription(
-        BatchedTask updateTask,
-        List<BatchedTask> toExecute,
-        Map<String, List<BatchedTask>> processTasksBySource
-    ) {
-        final StringBuilder output = new StringBuilder();
-        Strings.collectionToDelimitedStringWithLimit((Iterable<String>) () -> processTasksBySource.entrySet().stream().map(entry -> {
-            String tasks = updateTask.describeTasks(entry.getValue());
-            return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
-        }).filter(s -> s.isEmpty() == false).iterator(), ", ", "", "", MAX_TASK_DESCRIPTION_CHARS, output);
-        if (output.length() > MAX_TASK_DESCRIPTION_CHARS) {
-            output.append(" (").append(toExecute.size()).append(" tasks in total)");
-        }
-        return output.toString();
-    }
-
     /**
      * Action to be implemented by the specific batching implementation
      * All tasks have the given batching key.
      */
-    protected abstract void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary);
+    protected abstract void run(Object batchingKey, List<? extends BatchedTask> tasks, BatchSummary tasksSummarySupplier);
 
     /**
      * Represents a runnable task that supports batching.

+ 5 - 2
server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTransportHandlerTests.java

@@ -19,6 +19,7 @@ import org.elasticsearch.cluster.coordination.CoordinationMetadata.VotingConfigu
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.service.BatchSummary;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.compress.Compressor;
 import org.elasticsearch.common.compress.CompressorFactory;
@@ -99,7 +100,9 @@ public class PublicationTransportHandlerTests extends ESTestCase {
 
         final ElasticsearchException e = expectThrows(
             ElasticsearchException.class,
-            () -> handler.newPublicationContext(new ClusterStatePublicationEvent("test", clusterState, unserializableClusterState, 0L, 0L))
+            () -> handler.newPublicationContext(
+                new ClusterStatePublicationEvent(new BatchSummary("test"), clusterState, unserializableClusterState, 0L, 0L)
+            )
         );
         assertNotNull(e.getCause());
         assertThat(e.getCause(), instanceOf(IOException.class));
@@ -273,7 +276,7 @@ public class PublicationTransportHandlerTests extends ESTestCase {
             final PublicationTransportHandler.PublicationContext context;
             try {
                 context = handler.newPublicationContext(
-                    new ClusterStatePublicationEvent("test", prevClusterState, nextClusterState, 0L, 0L)
+                    new ClusterStatePublicationEvent(new BatchSummary("test"), prevClusterState, nextClusterState, 0L, 0L)
                 );
             } catch (ElasticsearchException e) {
                 assertTrue(simulateFailures);

+ 143 - 2
server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java

@@ -11,6 +11,7 @@ package org.elasticsearch.cluster.service;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
@@ -1093,11 +1094,11 @@ public class MasterServiceTests extends ESTestCase {
             final AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialClusterState);
             masterService.setClusterStatePublisher((clusterStatePublicationEvent, publishListener, ackListener) -> {
                 ClusterServiceUtils.setAllElapsedMillis(clusterStatePublicationEvent);
-                if (clusterStatePublicationEvent.getSummary().contains("test5")) {
+                if (clusterStatePublicationEvent.getSummary().toString().contains("test5")) {
                     relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis()
                         + randomLongBetween(1, 1000000);
                 }
-                if (clusterStatePublicationEvent.getSummary().contains("test6")) {
+                if (clusterStatePublicationEvent.getSummary().toString().contains("test6")) {
                     relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis()
                         + randomLongBetween(1, 1000000);
                     throw new ElasticsearchException("simulated error during slow publication which should trigger logging");
@@ -1663,6 +1664,146 @@ public class MasterServiceTests extends ESTestCase {
         }
     }
 
+    @TestLogging(
+        value = "org.elasticsearch.cluster.service:DEBUG",
+        reason = "to ensure that we log the right batch description, which only happens at DEBUG level"
+    )
+    public void testBatchedUpdateSummaryLogging() throws Exception {
+        MockLogAppender mockAppender = new MockLogAppender();
+        mockAppender.start();
+
+        Logger masterServiceLogger = LogManager.getLogger(MasterService.class);
+        Loggers.addAppender(masterServiceLogger, mockAppender);
+        try (MasterService masterService = createMasterService(true)) {
+
+            final var barrier = new CyclicBarrier(2);
+            final var blockingTask = new ClusterStateUpdateTask() {
+                @Override
+                public ClusterState execute(ClusterState currentState) throws Exception {
+                    barrier.await(10, TimeUnit.SECONDS);
+                    barrier.await(10, TimeUnit.SECONDS);
+                    return currentState;
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    fail();
+                }
+            };
+
+            class Task implements ClusterStateTaskListener {
+                private final String description;
+
+                Task(String description) {
+                    this.description = description;
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    fail();
+                }
+
+                @Override
+                public String toString() {
+                    return description;
+                }
+            }
+
+            class Executor implements ClusterStateTaskExecutor<Task> {
+
+                final Semaphore semaphore = new Semaphore(0);
+
+                @Override
+                public ClusterState execute(ClusterState currentState, List<TaskContext<Task>> taskContexts) {
+                    for (final var taskContext : taskContexts) {
+                        taskContext.success(ActionListener.wrap(semaphore::release));
+                    }
+                    return currentState;
+                }
+            }
+
+            masterService.submitUnbatchedStateUpdateTask("block", blockingTask);
+            barrier.await(10, TimeUnit.SECONDS);
+
+            final var smallBatchExecutor = new Executor();
+            for (int source = 0; source < 2; source++) {
+                for (int task = 0; task < 2; task++) {
+                    masterService.submitStateUpdateTask(
+                        "source-" + source,
+                        new Task("task-" + task),
+                        ClusterStateTaskConfig.build(Priority.NORMAL),
+                        smallBatchExecutor
+                    );
+                }
+                mockAppender.addExpectation(
+                    new MockLogAppender.SeenEventExpectation(
+                        "mention of tasks source-" + source,
+                        MasterService.class.getCanonicalName(),
+                        Level.DEBUG,
+                        "executing cluster state update for [*source-" + source + "[task-0, task-1]*"
+                    )
+                );
+            }
+
+            final var manySourceExecutor = new Executor();
+            for (int source = 0; source < 1024; source++) {
+                for (int task = 0; task < 2; task++) {
+                    masterService.submitStateUpdateTask(
+                        "source-" + source,
+                        new Task("task-" + task),
+                        ClusterStateTaskConfig.build(Priority.NORMAL),
+                        manySourceExecutor
+                    );
+                }
+            }
+            mockAppender.addExpectation(
+                new MockLogAppender.SeenEventExpectation(
+                    "truncated description of batch with many sources",
+                    MasterService.class.getCanonicalName(),
+                    Level.DEBUG,
+                    "executing cluster state update for [* ... (1024 in total, *) (2048 tasks in total)]"
+                ) {
+                    @Override
+                    public boolean innerMatch(LogEvent event) {
+                        return event.getMessage().getFormattedMessage().length() < BatchSummary.MAX_TASK_DESCRIPTION_CHARS + 200;
+                    }
+                }
+            );
+
+            final var manyTasksPerSourceExecutor = new Executor();
+            for (int task = 0; task < 2048; task++) {
+                masterService.submitStateUpdateTask(
+                    "unique-source",
+                    new Task("task-" + task),
+                    ClusterStateTaskConfig.build(Priority.NORMAL),
+                    manyTasksPerSourceExecutor
+                );
+            }
+            mockAppender.addExpectation(
+                new MockLogAppender.SeenEventExpectation(
+                    "truncated description of batch with many tasks from a single source",
+                    MasterService.class.getCanonicalName(),
+                    Level.DEBUG,
+                    "executing cluster state update for [unique-source[task-0, task-1, task-2, task-3, task-4, * ... (2048 in total, *)]]"
+                ) {
+                    @Override
+                    public boolean innerMatch(LogEvent event) {
+                        return event.getMessage().getFormattedMessage().length() < 1500;
+                    }
+                }
+            );
+
+            barrier.await(10, TimeUnit.SECONDS);
+            assertTrue(smallBatchExecutor.semaphore.tryAcquire(4, 10, TimeUnit.SECONDS));
+            assertTrue(manySourceExecutor.semaphore.tryAcquire(2048, 10, TimeUnit.SECONDS));
+            assertTrue(manyTasksPerSourceExecutor.semaphore.tryAcquire(2048, 10, TimeUnit.SECONDS));
+            mockAppender.assertAllExpectationsMatched();
+        } finally {
+            Loggers.removeAppender(masterServiceLogger, mockAppender);
+            mockAppender.stop();
+        }
+    }
+
     /**
      * Returns the cluster state that the master service uses (and that is provided by the discovery layer)
      */

+ 1 - 1
server/src/test/java/org/elasticsearch/cluster/service/TaskBatcherTests.java

@@ -47,7 +47,7 @@ public class TaskBatcherTests extends TaskExecutorTests {
 
         @SuppressWarnings("unchecked")
         @Override
-        protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {
+        protected void run(Object batchingKey, List<? extends BatchedTask> tasks, BatchSummary tasksSummary) {
             List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;
             ((TestExecutor<Object>) batchingKey).execute(updateTasks.stream().map(t -> t.task).toList());
             updateTasks.forEach(updateTask -> updateTask.listener.processed());