|
@@ -12,11 +12,13 @@ import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
import org.elasticsearch.ResourceAlreadyExistsException;
|
|
|
import org.elasticsearch.ResourceNotFoundException;
|
|
|
+import org.elasticsearch.Version;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.client.internal.Client;
|
|
|
import org.elasticsearch.client.internal.OriginSettingClient;
|
|
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
|
|
import org.elasticsearch.cluster.ClusterStateListener;
|
|
|
+import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.settings.Setting;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
@@ -29,6 +31,7 @@ import org.elasticsearch.persistent.PersistentTasksExecutor;
|
|
|
import org.elasticsearch.persistent.PersistentTasksService;
|
|
|
import org.elasticsearch.tasks.TaskId;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
+import org.elasticsearch.transport.RemoteTransportException;
|
|
|
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
@@ -128,14 +131,18 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
|
|
|
// wait for state recovered
|
|
|
return;
|
|
|
}
|
|
|
- // bootstrap downloader after first cluster start
|
|
|
+
|
|
|
+ DiscoveryNode masterNode = event.state().nodes().getMasterNode();
|
|
|
+ if (masterNode == null || masterNode.getVersion().before(Version.V_7_14_0)) {
|
|
|
+ // wait for master to be upgraded so it understands geoip task
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
clusterService.removeListener(this);
|
|
|
- if (event.localNodeMaster()) {
|
|
|
- if (ENABLED_SETTING.get(event.state().getMetadata().settings(), settings)) {
|
|
|
- startTask(() -> clusterService.addListener(this));
|
|
|
- } else {
|
|
|
- stopTask(() -> clusterService.addListener(this));
|
|
|
- }
|
|
|
+ if (ENABLED_SETTING.get(event.state().getMetadata().settings(), settings)) {
|
|
|
+ startTask(() -> clusterService.addListener(this));
|
|
|
+ } else {
|
|
|
+ stopTask(() -> clusterService.addListener(this));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -144,8 +151,9 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
|
|
|
GEOIP_DOWNLOADER,
|
|
|
GEOIP_DOWNLOADER,
|
|
|
new GeoIpTaskParams(),
|
|
|
- ActionListener.wrap(r -> {}, e -> {
|
|
|
- if (e instanceof ResourceAlreadyExistsException == false) {
|
|
|
+ ActionListener.wrap(r -> logger.debug("Started geoip downloader task"), e -> {
|
|
|
+ Throwable t = e instanceof RemoteTransportException ? e.getCause() : e;
|
|
|
+ if (t instanceof ResourceAlreadyExistsException == false) {
|
|
|
logger.error("failed to create geoip downloader task", e);
|
|
|
onFailure.run();
|
|
|
}
|
|
@@ -154,18 +162,23 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
|
|
|
}
|
|
|
|
|
|
private void stopTask(Runnable onFailure) {
|
|
|
- ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = ActionListener.wrap(r -> {}, e -> {
|
|
|
- if (e instanceof ResourceNotFoundException == false) {
|
|
|
- logger.error("failed to remove geoip downloader task", e);
|
|
|
- onFailure.run();
|
|
|
+ ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = ActionListener.wrap(
|
|
|
+ r -> logger.debug("Stopped geoip downloader task"),
|
|
|
+ e -> {
|
|
|
+ Throwable t = e instanceof RemoteTransportException ? e.getCause() : e;
|
|
|
+ if (t instanceof ResourceNotFoundException == false) {
|
|
|
+ logger.error("failed to remove geoip downloader task", e);
|
|
|
+ onFailure.run();
|
|
|
+ }
|
|
|
}
|
|
|
- });
|
|
|
+ );
|
|
|
persistentTasksService.sendRemoveRequest(
|
|
|
GEOIP_DOWNLOADER,
|
|
|
ActionListener.runAfter(
|
|
|
listener,
|
|
|
() -> client.admin().indices().prepareDelete(DATABASES_INDEX).execute(ActionListener.wrap(rr -> {}, e -> {
|
|
|
- if (e instanceof ResourceNotFoundException == false) {
|
|
|
+ Throwable t = e instanceof RemoteTransportException ? e.getCause() : e;
|
|
|
+ if (t instanceof ResourceNotFoundException == false) {
|
|
|
logger.warn("failed to remove " + DATABASES_INDEX, e);
|
|
|
}
|
|
|
}))
|