|
@@ -29,6 +29,7 @@ import org.elasticsearch.Assertions;
|
|
import org.elasticsearch.ElasticsearchException;
|
|
import org.elasticsearch.ElasticsearchException;
|
|
import org.elasticsearch.ElasticsearchTimeoutException;
|
|
import org.elasticsearch.ElasticsearchTimeoutException;
|
|
import org.elasticsearch.ExceptionsHelper;
|
|
import org.elasticsearch.ExceptionsHelper;
|
|
|
|
+import org.elasticsearch.Version;
|
|
import org.elasticsearch.action.ActionListener;
|
|
import org.elasticsearch.action.ActionListener;
|
|
import org.elasticsearch.action.ActionRequest;
|
|
import org.elasticsearch.action.ActionRequest;
|
|
import org.elasticsearch.action.ActionResponse;
|
|
import org.elasticsearch.action.ActionResponse;
|
|
@@ -47,8 +48,11 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
|
|
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
|
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
|
+import org.elasticsearch.transport.TaskTransportChannel;
|
|
import org.elasticsearch.transport.TcpChannel;
|
|
import org.elasticsearch.transport.TcpChannel;
|
|
|
|
+import org.elasticsearch.transport.TcpTransportChannel;
|
|
import org.elasticsearch.transport.Transport;
|
|
import org.elasticsearch.transport.Transport;
|
|
|
|
+import org.elasticsearch.transport.TransportChannel;
|
|
import org.elasticsearch.transport.TransportService;
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
@@ -56,6 +60,7 @@ import java.util.ArrayList;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
|
|
+import java.util.HashSet;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
@@ -66,6 +71,7 @@ import java.util.concurrent.Semaphore;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.function.BiConsumer;
|
|
import java.util.function.BiConsumer;
|
|
|
|
+import java.util.function.Consumer;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.StreamSupport;
|
|
import java.util.stream.StreamSupport;
|
|
|
|
|
|
@@ -92,7 +98,7 @@ public class TaskManager implements ClusterStateApplier {
|
|
|
|
|
|
private final AtomicLong taskIdGenerator = new AtomicLong();
|
|
private final AtomicLong taskIdGenerator = new AtomicLong();
|
|
|
|
|
|
- private final Map<TaskId, String> banedParents = new ConcurrentHashMap<>();
|
|
|
|
|
|
+ private final Map<TaskId, Ban> bannedParents = new ConcurrentHashMap<>();
|
|
|
|
|
|
private TaskResultsService taskResultsService;
|
|
private TaskResultsService taskResultsService;
|
|
|
|
|
|
@@ -196,13 +202,13 @@ public class TaskManager implements ClusterStateApplier {
|
|
CancellableTaskHolder oldHolder = cancellableTasks.put(task.getId(), holder);
|
|
CancellableTaskHolder oldHolder = cancellableTasks.put(task.getId(), holder);
|
|
assert oldHolder == null;
|
|
assert oldHolder == null;
|
|
// Check if this task was banned before we start it. The empty check is used to avoid
|
|
// Check if this task was banned before we start it. The empty check is used to avoid
|
|
- // computing the hash code of the parent taskId as most of the time banedParents is empty.
|
|
|
|
- if (task.getParentTaskId().isSet() && banedParents.isEmpty() == false) {
|
|
|
|
- String reason = banedParents.get(task.getParentTaskId());
|
|
|
|
- if (reason != null) {
|
|
|
|
|
|
+ // computing the hash code of the parent taskId as most of the time bannedParents is empty.
|
|
|
|
+ if (task.getParentTaskId().isSet() && bannedParents.isEmpty() == false) {
|
|
|
|
+ final Ban ban = bannedParents.get(task.getParentTaskId());
|
|
|
|
+ if (ban != null) {
|
|
try {
|
|
try {
|
|
- holder.cancel(reason);
|
|
|
|
- throw new TaskCancelledException("Task cancelled before it started: " + reason);
|
|
|
|
|
|
+ holder.cancel(ban.reason);
|
|
|
|
+ throw new TaskCancelledException("Task cancelled before it started: " + ban.reason);
|
|
} finally {
|
|
} finally {
|
|
// let's clean up the registration
|
|
// let's clean up the registration
|
|
unregister(task);
|
|
unregister(task);
|
|
@@ -383,7 +389,7 @@ public class TaskManager implements ClusterStateApplier {
|
|
* Will be used in task manager stats and for debugging.
|
|
* Will be used in task manager stats and for debugging.
|
|
*/
|
|
*/
|
|
public int getBanCount() {
|
|
public int getBanCount() {
|
|
- return banedParents.size();
|
|
|
|
|
|
+ return bannedParents.size();
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -392,14 +398,27 @@ public class TaskManager implements ClusterStateApplier {
|
|
* This method is called when a parent task that has children is cancelled.
|
|
* This method is called when a parent task that has children is cancelled.
|
|
* @return a list of pending cancellable child tasks
|
|
* @return a list of pending cancellable child tasks
|
|
*/
|
|
*/
|
|
- public List<CancellableTask> setBan(TaskId parentTaskId, String reason) {
|
|
|
|
|
|
+ public List<CancellableTask> setBan(TaskId parentTaskId, String reason, TransportChannel channel) {
|
|
logger.trace("setting ban for the parent task {} {}", parentTaskId, reason);
|
|
logger.trace("setting ban for the parent task {} {}", parentTaskId, reason);
|
|
-
|
|
|
|
- // Set the ban first, so the newly created tasks cannot be registered
|
|
|
|
- synchronized (banedParents) {
|
|
|
|
- if (lastDiscoveryNodes.nodeExists(parentTaskId.getNodeId())) {
|
|
|
|
- // Only set the ban if the node is the part of the cluster
|
|
|
|
- banedParents.put(parentTaskId, reason);
|
|
|
|
|
|
+ synchronized (bannedParents) {
|
|
|
|
+ if (channel.getVersion().onOrAfter(Version.V_8_0_0)) {
|
|
|
|
+ final Ban ban = bannedParents.computeIfAbsent(parentTaskId, k -> new Ban(reason, true));
|
|
|
|
+ assert ban.perChannel : "not a ban per channel";
|
|
|
|
+ while (channel instanceof TaskTransportChannel) {
|
|
|
|
+ channel = ((TaskTransportChannel) channel).getChannel();
|
|
|
|
+ }
|
|
|
|
+ if (channel instanceof TcpTransportChannel) {
|
|
|
|
+ startTrackingChannel(((TcpTransportChannel) channel).getChannel(), ban::registerChannel);
|
|
|
|
+ } else {
|
|
|
|
+ assert channel.getChannelType().equals("direct") : "expect direct channel; got [" + channel + "]";
|
|
|
|
+ ban.registerChannel(DIRECT_CHANNEL_TRACKER);
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ if (lastDiscoveryNodes.nodeExists(parentTaskId.getNodeId())) {
|
|
|
|
+ // Only set the ban if the node is the part of the cluster
|
|
|
|
+ final Ban existing = bannedParents.put(parentTaskId, new Ban(reason, false));
|
|
|
|
+ assert existing == null || existing.perChannel == false : "not a ban per node";
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return cancellableTasks.values().stream()
|
|
return cancellableTasks.values().stream()
|
|
@@ -415,12 +434,52 @@ public class TaskManager implements ClusterStateApplier {
|
|
*/
|
|
*/
|
|
public void removeBan(TaskId parentTaskId) {
|
|
public void removeBan(TaskId parentTaskId) {
|
|
logger.trace("removing ban for the parent task {}", parentTaskId);
|
|
logger.trace("removing ban for the parent task {}", parentTaskId);
|
|
- banedParents.remove(parentTaskId);
|
|
|
|
|
|
+ bannedParents.remove(parentTaskId);
|
|
}
|
|
}
|
|
|
|
|
|
// for testing
|
|
// for testing
|
|
public Set<TaskId> getBannedTaskIds() {
|
|
public Set<TaskId> getBannedTaskIds() {
|
|
- return Collections.unmodifiableSet(banedParents.keySet());
|
|
|
|
|
|
+ return Collections.unmodifiableSet(bannedParents.keySet());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private class Ban {
|
|
|
|
+ final String reason;
|
|
|
|
+ final boolean perChannel; // TODO: Remove this in 8.0
|
|
|
|
+ final Set<ChannelPendingTaskTracker> channels;
|
|
|
|
+
|
|
|
|
+ Ban(String reason, boolean perChannel) {
|
|
|
|
+ assert Thread.holdsLock(bannedParents);
|
|
|
|
+ this.reason = reason;
|
|
|
|
+ this.perChannel = perChannel;
|
|
|
|
+ if (perChannel) {
|
|
|
|
+ this.channels = new HashSet<>();
|
|
|
|
+ } else {
|
|
|
|
+ this.channels = Set.of();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ void registerChannel(ChannelPendingTaskTracker channel) {
|
|
|
|
+ assert Thread.holdsLock(bannedParents);
|
|
|
|
+ assert perChannel : "not a ban per channel";
|
|
|
|
+ channels.add(channel);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ boolean unregisterChannel(ChannelPendingTaskTracker channel) {
|
|
|
|
+ assert Thread.holdsLock(bannedParents);
|
|
|
|
+ assert perChannel : "not a ban per channel";
|
|
|
|
+ return channels.remove(channel);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ int registeredChannels() {
|
|
|
|
+ assert Thread.holdsLock(bannedParents);
|
|
|
|
+ assert perChannel : "not a ban per channel";
|
|
|
|
+ return channels.size();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public String toString() {
|
|
|
|
+ return "Ban{" + "reason='" + reason + '\'' + ", perChannel=" + perChannel + ", channels=" + channels + '}';
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -444,15 +503,15 @@ public class TaskManager implements ClusterStateApplier {
|
|
public void applyClusterState(ClusterChangedEvent event) {
|
|
public void applyClusterState(ClusterChangedEvent event) {
|
|
lastDiscoveryNodes = event.state().getNodes();
|
|
lastDiscoveryNodes = event.state().getNodes();
|
|
if (event.nodesRemoved()) {
|
|
if (event.nodesRemoved()) {
|
|
- synchronized (banedParents) {
|
|
|
|
|
|
+ synchronized (bannedParents) {
|
|
lastDiscoveryNodes = event.state().getNodes();
|
|
lastDiscoveryNodes = event.state().getNodes();
|
|
// Remove all bans that were registered by nodes that are no longer in the cluster state
|
|
// Remove all bans that were registered by nodes that are no longer in the cluster state
|
|
- Iterator<TaskId> banIterator = banedParents.keySet().iterator();
|
|
|
|
|
|
+ final Iterator<Map.Entry<TaskId, Ban>> banIterator = bannedParents.entrySet().iterator();
|
|
while (banIterator.hasNext()) {
|
|
while (banIterator.hasNext()) {
|
|
- TaskId taskId = banIterator.next();
|
|
|
|
- if (lastDiscoveryNodes.nodeExists(taskId.getNodeId()) == false) {
|
|
|
|
- logger.debug("Removing ban for the parent [{}] on the node [{}], reason: the parent node is gone", taskId,
|
|
|
|
- event.state().getNodes().getLocalNode());
|
|
|
|
|
|
+ final Map.Entry<TaskId, Ban> ban = banIterator.next();
|
|
|
|
+ if (ban.getValue().perChannel == false && lastDiscoveryNodes.nodeExists(ban.getKey().getNodeId()) == false) {
|
|
|
|
+ logger.debug("Removing ban for the parent [{}] on the node [{}], reason: the parent node is gone",
|
|
|
|
+ ban.getKey(), event.state().getNodes().getLocalNode());
|
|
banIterator.remove();
|
|
banIterator.remove();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -619,11 +678,16 @@ public class TaskManager implements ClusterStateApplier {
|
|
*/
|
|
*/
|
|
public Releasable startTrackingCancellableChannelTask(TcpChannel channel, CancellableTask task) {
|
|
public Releasable startTrackingCancellableChannelTask(TcpChannel channel, CancellableTask task) {
|
|
assert cancellableTasks.containsKey(task.getId()) : "task [" + task.getId() + "] is not registered yet";
|
|
assert cancellableTasks.containsKey(task.getId()) : "task [" + task.getId() + "] is not registered yet";
|
|
|
|
+ final ChannelPendingTaskTracker tracker = startTrackingChannel(channel, trackerChannel -> trackerChannel.addTask(task));
|
|
|
|
+ return () -> tracker.removeTask(task);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private ChannelPendingTaskTracker startTrackingChannel(TcpChannel channel, Consumer<ChannelPendingTaskTracker> onRegister) {
|
|
final ChannelPendingTaskTracker tracker = channelPendingTaskTrackers.compute(channel, (k, curr) -> {
|
|
final ChannelPendingTaskTracker tracker = channelPendingTaskTrackers.compute(channel, (k, curr) -> {
|
|
if (curr == null) {
|
|
if (curr == null) {
|
|
curr = new ChannelPendingTaskTracker();
|
|
curr = new ChannelPendingTaskTracker();
|
|
}
|
|
}
|
|
- curr.addTask(task);
|
|
|
|
|
|
+ onRegister.accept(curr);
|
|
return curr;
|
|
return curr;
|
|
});
|
|
});
|
|
if (tracker.registered.compareAndSet(false, true)) {
|
|
if (tracker.registered.compareAndSet(false, true)) {
|
|
@@ -631,13 +695,13 @@ public class TaskManager implements ClusterStateApplier {
|
|
r -> {
|
|
r -> {
|
|
final ChannelPendingTaskTracker removedTracker = channelPendingTaskTrackers.remove(channel);
|
|
final ChannelPendingTaskTracker removedTracker = channelPendingTaskTrackers.remove(channel);
|
|
assert removedTracker == tracker;
|
|
assert removedTracker == tracker;
|
|
- cancelTasksOnChannelClosed(tracker.drainTasks());
|
|
|
|
|
|
+ onChannelClosed(tracker);
|
|
},
|
|
},
|
|
e -> {
|
|
e -> {
|
|
assert false : new AssertionError("must not be here", e);
|
|
assert false : new AssertionError("must not be here", e);
|
|
}));
|
|
}));
|
|
}
|
|
}
|
|
- return () -> tracker.removeTask(task);
|
|
|
|
|
|
+ return tracker;
|
|
}
|
|
}
|
|
|
|
|
|
// for testing
|
|
// for testing
|
|
@@ -645,6 +709,8 @@ public class TaskManager implements ClusterStateApplier {
|
|
return channelPendingTaskTrackers.size();
|
|
return channelPendingTaskTrackers.size();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static final ChannelPendingTaskTracker DIRECT_CHANNEL_TRACKER = new ChannelPendingTaskTracker();
|
|
|
|
+
|
|
private static class ChannelPendingTaskTracker {
|
|
private static class ChannelPendingTaskTracker {
|
|
final AtomicBoolean registered = new AtomicBoolean();
|
|
final AtomicBoolean registered = new AtomicBoolean();
|
|
final Semaphore permits = Assertions.ENABLED ? new Semaphore(Integer.MAX_VALUE) : null;
|
|
final Semaphore permits = Assertions.ENABLED ? new Semaphore(Integer.MAX_VALUE) : null;
|
|
@@ -678,7 +744,8 @@ public class TaskManager implements ClusterStateApplier {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private void cancelTasksOnChannelClosed(Set<CancellableTask> tasks) {
|
|
|
|
|
|
+ private void onChannelClosed(ChannelPendingTaskTracker channel) {
|
|
|
|
+ final Set<CancellableTask> tasks = channel.drainTasks();
|
|
if (tasks.isEmpty() == false) {
|
|
if (tasks.isEmpty() == false) {
|
|
threadPool.generic().execute(new AbstractRunnable() {
|
|
threadPool.generic().execute(new AbstractRunnable() {
|
|
@Override
|
|
@Override
|
|
@@ -694,6 +761,20 @@ public class TaskManager implements ClusterStateApplier {
|
|
}
|
|
}
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ // Unregister the closing channel and remove bans whose has no registered channels
|
|
|
|
+ synchronized (bannedParents) {
|
|
|
|
+ final Iterator<Map.Entry<TaskId, Ban>> iterator = bannedParents.entrySet().iterator();
|
|
|
|
+ while (iterator.hasNext()) {
|
|
|
|
+ final Map.Entry<TaskId, Ban> entry = iterator.next();
|
|
|
|
+ final Ban ban = entry.getValue();
|
|
|
|
+ if (ban.perChannel) {
|
|
|
|
+ if (ban.unregisterChannel(channel) && entry.getValue().registeredChannels() == 0) {
|
|
|
|
+ iterator.remove();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
public void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitForCompletion, ActionListener<Void> listener) {
|
|
public void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitForCompletion, ActionListener<Void> listener) {
|