|
@@ -411,7 +411,8 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|
|
List<UpdateTask> existingTasks = updateTasksPerExecutor.computeIfAbsent(executor, k -> new ArrayList<>());
|
|
|
for (@SuppressWarnings("unchecked") UpdateTask<T> existing : existingTasks) {
|
|
|
if (tasksIdentity.containsKey(existing.task)) {
|
|
|
- throw new IllegalStateException("task [" + existing.task + "] with source [" + source + "] is already queued");
|
|
|
+ throw new IllegalStateException("task [" + executor.describeTasks(Collections.singletonList(existing.task)) +
|
|
|
+ "] with source [" + source + "] is already queued");
|
|
|
}
|
|
|
}
|
|
|
existingTasks.addAll(updateTasks);
|
|
@@ -517,11 +518,11 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|
|
if (pending != null) {
|
|
|
for (UpdateTask<T> task : pending) {
|
|
|
if (task.processed.getAndSet(true) == false) {
|
|
|
- logger.trace("will process [{}[{}]]", task.source, task.task);
|
|
|
+ logger.trace("will process {}", task.toString(executor));
|
|
|
toExecute.add(task);
|
|
|
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task.task);
|
|
|
} else {
|
|
|
- logger.trace("skipping [{}[{}]], already processed", task.source, task.task);
|
|
|
+ logger.trace("skipping {}, already processed", task.toString(executor));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -571,7 +572,8 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|
|
assert (assertsEnabled = true);
|
|
|
if (assertsEnabled) {
|
|
|
for (UpdateTask<T> updateTask : toExecute) {
|
|
|
- assert batchResult.executionResults.containsKey(updateTask.task) : "missing task result for [" + updateTask.task + "]";
|
|
|
+ assert batchResult.executionResults.containsKey(updateTask.task) :
|
|
|
+ "missing task result for " + updateTask.toString(executor);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -579,13 +581,13 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|
|
final ArrayList<UpdateTask<T>> proccessedListeners = new ArrayList<>();
|
|
|
// fail all tasks that have failed and extract those that are waiting for results
|
|
|
for (UpdateTask<T> updateTask : toExecute) {
|
|
|
- assert batchResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask.task.toString();
|
|
|
+ assert batchResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask.toString(executor);
|
|
|
final ClusterStateTaskExecutor.TaskResult executionResult =
|
|
|
batchResult.executionResults.get(updateTask.task);
|
|
|
executionResult.handle(
|
|
|
() -> proccessedListeners.add(updateTask),
|
|
|
ex -> {
|
|
|
- logger.debug("cluster state update task [{}] failed", ex, updateTask.source);
|
|
|
+ logger.debug("cluster state update task {} failed", ex, updateTask.toString(executor));
|
|
|
updateTask.listener.onFailure(updateTask.source, ex);
|
|
|
}
|
|
|
);
|
|
@@ -854,6 +856,15 @@ public class ClusterService extends AbstractLifecycleComponent {
|
|
|
public void run() {
|
|
|
runTasksForExecutor(executor);
|
|
|
}
|
|
|
+
|
|
|
+ public String toString(ClusterStateTaskExecutor<T> executor) {
|
|
|
+ String taskDescription = executor.describeTasks(Collections.singletonList(task));
|
|
|
+ if (taskDescription.isEmpty()) {
|
|
|
+ return "[" + source + "]";
|
|
|
+ } else {
|
|
|
+ return "[" + source + "[" + taskDescription + "]]";
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source) {
|