|
@@ -24,7 +24,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|
import org.elasticsearch.action.ActionListener;
|
|
import org.elasticsearch.action.ActionListener;
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
import org.elasticsearch.common.CheckedBiConsumer;
|
|
import org.elasticsearch.common.CheckedBiConsumer;
|
|
-import org.elasticsearch.common.component.Lifecycle;
|
|
|
|
import org.elasticsearch.common.lease.Releasable;
|
|
import org.elasticsearch.common.lease.Releasable;
|
|
import org.elasticsearch.common.settings.Settings;
|
|
import org.elasticsearch.common.settings.Settings;
|
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
@@ -58,8 +57,7 @@ public class ConnectionManager implements Closeable {
|
|
private final Transport transport;
|
|
private final Transport transport;
|
|
private final ThreadPool threadPool;
|
|
private final ThreadPool threadPool;
|
|
private final ConnectionProfile defaultProfile;
|
|
private final ConnectionProfile defaultProfile;
|
|
- private final Lifecycle lifecycle = new Lifecycle();
|
|
|
|
- private final AtomicBoolean closed = new AtomicBoolean(false);
|
|
|
|
|
|
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
|
|
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
|
|
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
|
|
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();
|
|
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();
|
|
|
|
|
|
@@ -71,7 +69,6 @@ public class ConnectionManager implements Closeable {
|
|
this.transport = transport;
|
|
this.transport = transport;
|
|
this.threadPool = threadPool;
|
|
this.threadPool = threadPool;
|
|
this.defaultProfile = connectionProfile;
|
|
this.defaultProfile = connectionProfile;
|
|
- this.lifecycle.moveToStarted();
|
|
|
|
}
|
|
}
|
|
|
|
|
|
public void addListener(TransportConnectionListener listener) {
|
|
public void addListener(TransportConnectionListener listener) {
|
|
@@ -187,8 +184,7 @@ public class ConnectionManager implements Closeable {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void close() {
|
|
public void close() {
|
|
- if (closed.compareAndSet(false, true)) {
|
|
|
|
- lifecycle.moveToStopped();
|
|
|
|
|
|
+ if (isClosed.compareAndSet(false, true)) {
|
|
CountDownLatch latch = new CountDownLatch(1);
|
|
CountDownLatch latch = new CountDownLatch(1);
|
|
|
|
|
|
// TODO: Consider moving all read/write lock (in Transport and this class) to the TransportService
|
|
// TODO: Consider moving all read/write lock (in Transport and this class) to the TransportService
|
|
@@ -213,14 +209,10 @@ public class ConnectionManager implements Closeable {
|
|
});
|
|
});
|
|
|
|
|
|
try {
|
|
try {
|
|
- try {
|
|
|
|
- latch.await(30, TimeUnit.SECONDS);
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- Thread.currentThread().interrupt();
|
|
|
|
- // ignore
|
|
|
|
- }
|
|
|
|
- } finally {
|
|
|
|
- lifecycle.moveToClosed();
|
|
|
|
|
|
+ latch.await(30, TimeUnit.SECONDS);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
+ // ignore
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -239,7 +231,7 @@ public class ConnectionManager implements Closeable {
|
|
}
|
|
}
|
|
|
|
|
|
private void ensureOpen() {
|
|
private void ensureOpen() {
|
|
- if (lifecycle.started() == false) {
|
|
|
|
|
|
+ if (isClosed.get()) {
|
|
throw new IllegalStateException("connection manager is closed");
|
|
throw new IllegalStateException("connection manager is closed");
|
|
}
|
|
}
|
|
}
|
|
}
|