|
@@ -19,17 +19,21 @@
|
|
|
|
|
|
package org.elasticsearch.cluster;
|
|
|
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.LatchedActionListener;
|
|
|
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
|
|
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
|
|
|
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
|
|
-import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction;
|
|
|
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
|
|
|
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
|
|
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
|
|
-import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction;
|
|
|
+import org.elasticsearch.client.node.NodeClient;
|
|
|
import org.elasticsearch.cluster.block.ClusterBlockException;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|
|
import org.elasticsearch.cluster.metadata.MetaData;
|
|
@@ -39,7 +43,6 @@ import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
|
|
import org.elasticsearch.common.component.AbstractComponent;
|
|
|
-import org.elasticsearch.common.inject.Inject;
|
|
|
import org.elasticsearch.common.settings.ClusterSettings;
|
|
|
import org.elasticsearch.common.settings.Setting;
|
|
|
import org.elasticsearch.common.settings.Setting.Property;
|
|
@@ -50,11 +53,6 @@ import org.elasticsearch.monitor.fs.FsInfo;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
|
|
|
|
|
|
-import java.util.List;
|
|
|
-import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
-import java.util.concurrent.CountDownLatch;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-
|
|
|
/**
|
|
|
* InternalClusterInfoService provides the ClusterInfoService interface,
|
|
|
* routinely updated on a timer. The timer can be dynamically changed by
|
|
@@ -84,29 +82,24 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
|
|
|
private volatile boolean isMaster = false;
|
|
|
private volatile boolean enabled;
|
|
|
private volatile TimeValue fetchTimeout;
|
|
|
- private final TransportNodesStatsAction transportNodesStatsAction;
|
|
|
- private final TransportIndicesStatsAction transportIndicesStatsAction;
|
|
|
private final ClusterService clusterService;
|
|
|
private final ThreadPool threadPool;
|
|
|
+ private final NodeClient client;
|
|
|
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
|
|
|
|
|
|
- @Inject
|
|
|
- public InternalClusterInfoService(Settings settings, ClusterSettings clusterSettings,
|
|
|
- TransportNodesStatsAction transportNodesStatsAction,
|
|
|
- TransportIndicesStatsAction transportIndicesStatsAction, ClusterService clusterService,
|
|
|
- ThreadPool threadPool) {
|
|
|
+ public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
|
|
|
super(settings);
|
|
|
this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
|
|
|
this.mostAvailableSpaceUsages = ImmutableOpenMap.of();
|
|
|
this.shardRoutingToDataPath = ImmutableOpenMap.of();
|
|
|
this.shardSizes = ImmutableOpenMap.of();
|
|
|
- this.transportNodesStatsAction = transportNodesStatsAction;
|
|
|
- this.transportIndicesStatsAction = transportIndicesStatsAction;
|
|
|
this.clusterService = clusterService;
|
|
|
this.threadPool = threadPool;
|
|
|
+ this.client = client;
|
|
|
this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings);
|
|
|
this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings);
|
|
|
this.enabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
|
|
|
+ ClusterSettings clusterSettings = clusterService.getClusterSettings();
|
|
|
clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING, this::setFetchTimeout);
|
|
|
clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, this::setUpdateFrequency);
|
|
|
clusterSettings.addSettingsUpdateConsumer(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled);
|
|
@@ -259,8 +252,7 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
|
|
|
nodesStatsRequest.clear();
|
|
|
nodesStatsRequest.fs(true);
|
|
|
nodesStatsRequest.timeout(fetchTimeout);
|
|
|
-
|
|
|
- transportNodesStatsAction.execute(nodesStatsRequest, new LatchedActionListener<>(listener, latch));
|
|
|
+ client.admin().cluster().nodesStats(nodesStatsRequest, new LatchedActionListener<>(listener, latch));
|
|
|
return latch;
|
|
|
}
|
|
|
|
|
@@ -274,7 +266,7 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
|
|
|
indicesStatsRequest.clear();
|
|
|
indicesStatsRequest.store(true);
|
|
|
|
|
|
- transportIndicesStatsAction.execute(indicesStatsRequest, new LatchedActionListener<>(listener, latch));
|
|
|
+ client.admin().indices().stats(indicesStatsRequest, new LatchedActionListener<>(listener, latch));
|
|
|
return latch;
|
|
|
}
|
|
|
|