|
@@ -28,10 +28,13 @@ import org.elasticsearch.common.Nullable;
|
|
|
import org.elasticsearch.common.bytes.BytesArray;
|
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
|
import org.elasticsearch.common.regex.Regex;
|
|
|
+import org.elasticsearch.common.settings.Setting;
|
|
|
+import org.elasticsearch.common.settings.Setting.Property;
|
|
|
import org.elasticsearch.common.time.DateFormatter;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
|
|
import org.elasticsearch.common.xcontent.XContentType;
|
|
|
+import org.elasticsearch.gateway.GatewayService;
|
|
|
import org.elasticsearch.index.IndexNotFoundException;
|
|
|
import org.elasticsearch.ingest.IngestMetadata;
|
|
|
import org.elasticsearch.ingest.PipelineConfiguration;
|
|
@@ -86,6 +89,16 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
|
|
|
|
|
|
public static final String TYPE = "local";
|
|
|
|
|
|
+ /**
|
|
|
+ * Time to wait for the master node to setup local exporter for monitoring.
|
|
|
+ * After that, the non-master nodes will warn the user for possible missing configuration.
|
|
|
+ */
|
|
|
+ public static final Setting.AffixSetting<TimeValue> WAIT_MASTER_TIMEOUT_SETTING = Setting.affixKeySetting(
|
|
|
+ "xpack.monitoring.exporters.",
|
|
|
+ "wait_master.timeout",
|
|
|
+ (key) -> Setting.timeSetting(key, TimeValue.timeValueSeconds(30), Property.Dynamic, Property.NodeScope)
|
|
|
+ );
|
|
|
+
|
|
|
private final Client client;
|
|
|
private final ClusterService clusterService;
|
|
|
private final XPackLicenseState licenseState;
|
|
@@ -96,8 +109,10 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
|
|
|
|
|
|
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED);
|
|
|
private final AtomicBoolean installingSomething = new AtomicBoolean(false);
|
|
|
- private final AtomicBoolean waitedForSetup = new AtomicBoolean(false);
|
|
|
private final AtomicBoolean watcherSetup = new AtomicBoolean(false);
|
|
|
+ private final AtomicBoolean stateInitialized = new AtomicBoolean(false);
|
|
|
+
|
|
|
+ private long stateInitializedTime;
|
|
|
|
|
|
public LocalExporter(Exporter.Config config, Client client, CleanerService cleanerService) {
|
|
|
super(config);
|
|
@@ -116,6 +131,13 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
|
|
|
|
|
|
@Override
|
|
|
public void clusterChanged(ClusterChangedEvent event) {
|
|
|
+ // Save the time right after the cluster state is initialized/recovered
|
|
|
+ // to use it later for LocalExporter#WAIT_MASTER_TIMEOUT_SETTING
|
|
|
+ if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) == false) {
|
|
|
+ if (stateInitialized.getAndSet(true) == false) {
|
|
|
+ stateInitializedTime = client.threadPool().relativeTimeInMillis();
|
|
|
+ }
|
|
|
+ }
|
|
|
if (state.get() == State.INITIALIZED) {
|
|
|
resolveBulk(event.state(), true);
|
|
|
}
|
|
@@ -144,6 +166,17 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
|
|
|
@Override
|
|
|
public void openBulk(final ActionListener<ExportBulk> listener) {
|
|
|
if (state.get() != State.RUNNING) {
|
|
|
+ // wait for some time before informing the user for possible missing x-pack configuration on master
|
|
|
+ final TimeValue masterTimeout = WAIT_MASTER_TIMEOUT_SETTING.getConcreteSettingForNamespace(config.name())
|
|
|
+ .get(config.settings());
|
|
|
+ TimeValue timeElapsed = TimeValue.timeValueMillis(client.threadPool().relativeTimeInMillis() - stateInitializedTime);
|
|
|
+ if (timeElapsed.compareTo(masterTimeout) > 0) {
|
|
|
+ logger.info(
|
|
|
+ "waiting for elected master node [{}] to setup local exporter [{}] (does it have x-pack installed?)",
|
|
|
+ clusterService.state().nodes().getMasterNode(),
|
|
|
+ config.name()
|
|
|
+ );
|
|
|
+ }
|
|
|
listener.onResponse(null);
|
|
|
} else {
|
|
|
try {
|
|
@@ -179,14 +212,8 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
|
|
|
// elected master node needs to setup templates; non-master nodes need to wait for it to be setup
|
|
|
if (clusterService.state().nodes().isLocalNodeElectedMaster()) {
|
|
|
setup = setupIfElectedMaster(clusterState, templates, clusterStateChange);
|
|
|
- } else if (setupIfNotElectedMaster(clusterState, templates.keySet()) == false) {
|
|
|
- // the first pass will be false so that we don't bother users if the master took one-go to setup
|
|
|
- if (waitedForSetup.getAndSet(true)) {
|
|
|
- logger.info("waiting for elected master node [{}] to setup local exporter [{}] (does it have x-pack installed?)",
|
|
|
- clusterService.state().nodes().getMasterNode(), config.name());
|
|
|
- }
|
|
|
-
|
|
|
- setup = false;
|
|
|
+ } else {
|
|
|
+ setup = setupIfNotElectedMaster(clusterState, templates.keySet());
|
|
|
}
|
|
|
|
|
|
// any failure/delay to setup the local exporter stops it until the next pass (10s by default)
|
|
@@ -651,4 +678,8 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
|
|
|
|
|
|
}
|
|
|
|
|
|
+ public static List<Setting.AffixSetting<?>> getSettings() {
|
|
|
+ return List.of(WAIT_MASTER_TIMEOUT_SETTING);
|
|
|
+ }
|
|
|
+
|
|
|
}
|