|
@@ -21,6 +21,7 @@ import org.elasticsearch.client.RestClient;
|
|
|
import org.elasticsearch.client.RestClientBuilder;
|
|
|
import org.elasticsearch.client.sniff.ElasticsearchNodesSniffer;
|
|
|
import org.elasticsearch.client.sniff.Sniffer;
|
|
|
+import org.elasticsearch.cluster.ClusterStateListener;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.Nullable;
|
|
|
import org.elasticsearch.common.Strings;
|
|
@@ -363,7 +364,7 @@ public class HttpExporter extends Exporter {
|
|
|
private static final ConcurrentHashMap<String, SecureString> SECURE_AUTH_PASSWORDS = new ConcurrentHashMap<>();
|
|
|
private final ThreadContext threadContext;
|
|
|
private final DateFormatter dateTimeFormatter;
|
|
|
-
|
|
|
+ private final ClusterStateListener onLocalMasterListener;
|
|
|
/**
|
|
|
* Create an {@link HttpExporter}.
|
|
|
*
|
|
@@ -424,6 +425,14 @@ public class HttpExporter extends Exporter {
|
|
|
|
|
|
// mark resources as dirty after any node failure or license change
|
|
|
listener.setResource(resource);
|
|
|
+
|
|
|
+ //for a mixed cluster upgrade, ensure that if master changes and this is the master, allow the resources to re-publish
|
|
|
+ onLocalMasterListener = clusterChangedEvent -> {
|
|
|
+ if (clusterChangedEvent.nodesDelta().masterNodeChanged() && clusterChangedEvent.localNodeMaster()) {
|
|
|
+ resource.markDirty();
|
|
|
+ }
|
|
|
+ };
|
|
|
+ config.clusterService().addListener(onLocalMasterListener);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -864,9 +873,11 @@ public class HttpExporter extends Exporter {
|
|
|
@Override
|
|
|
public void doClose() {
|
|
|
try {
|
|
|
+ config.clusterService().removeListener(onLocalMasterListener);
|
|
|
if (sniffer != null) {
|
|
|
sniffer.close();
|
|
|
}
|
|
|
+
|
|
|
} catch (Exception e) {
|
|
|
logger.error("an error occurred while closing the internal client sniffer", e);
|
|
|
} finally {
|