|
@@ -10,14 +10,15 @@ package org.elasticsearch.xpack.profiling;
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
+import org.elasticsearch.action.search.SearchRequest;
|
|
|
import org.elasticsearch.action.support.ActionFilters;
|
|
|
+import org.elasticsearch.action.support.IndicesOptions;
|
|
|
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
|
|
+import org.elasticsearch.client.internal.node.NodeClient;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.ClusterStateObserver;
|
|
|
import org.elasticsearch.cluster.block.ClusterBlockException;
|
|
|
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
|
|
-import org.elasticsearch.cluster.metadata.DataStream;
|
|
|
-import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
|
|
import org.elasticsearch.cluster.metadata.Metadata;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
@@ -26,11 +27,8 @@ import org.elasticsearch.common.inject.Inject;
|
|
|
import org.elasticsearch.common.settings.Setting;
|
|
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
-import org.elasticsearch.index.Index;
|
|
|
-import org.elasticsearch.index.IndexService;
|
|
|
-import org.elasticsearch.index.shard.IndexShard;
|
|
|
-import org.elasticsearch.indices.IndicesService;
|
|
|
import org.elasticsearch.node.NodeClosedException;
|
|
|
+import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|
|
import org.elasticsearch.tasks.Task;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
@@ -45,7 +43,7 @@ public class TransportGetStatusAction extends TransportMasterNodeAction<GetStatu
|
|
|
public TransportGetStatusAction(
|
|
|
TransportService transportService,
|
|
|
ClusterService clusterService,
|
|
|
- IndicesService indicesService,
|
|
|
+ NodeClient nodeClient,
|
|
|
ThreadPool threadPool,
|
|
|
ActionFilters actionFilters,
|
|
|
IndexNameExpressionResolver indexNameExpressionResolver
|
|
@@ -61,7 +59,7 @@ public class TransportGetStatusAction extends TransportMasterNodeAction<GetStatu
|
|
|
GetStatusAction.Response::new,
|
|
|
EsExecutors.DIRECT_EXECUTOR_SERVICE
|
|
|
);
|
|
|
- this.resolver = new StatusResolver(clusterService, indicesService);
|
|
|
+ this.resolver = new StatusResolver(clusterService, nodeClient);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -74,7 +72,7 @@ public class TransportGetStatusAction extends TransportMasterNodeAction<GetStatu
|
|
|
if (request.waitForResourcesCreated()) {
|
|
|
createAndRegisterListener(listener, request.timeout());
|
|
|
} else {
|
|
|
- listener.onResponse(resolver.getResponse(state));
|
|
|
+ resolver.execute(state, listener);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -84,7 +82,7 @@ public class TransportGetStatusAction extends TransportMasterNodeAction<GetStatu
|
|
|
clusterService,
|
|
|
threadPool.getThreadContext(),
|
|
|
new StatusListener(listener, localNode, clusterService, resolver),
|
|
|
- clusterState -> resolver.getResponse(clusterState).isResourcesCreated(),
|
|
|
+ resolver::isResourcesCreated,
|
|
|
timeout,
|
|
|
log
|
|
|
);
|
|
@@ -98,7 +96,6 @@ public class TransportGetStatusAction extends TransportMasterNodeAction<GetStatu
|
|
|
private static class StatusListener implements ClusterStateObserver.Listener {
|
|
|
private final ActionListener<GetStatusAction.Response> listener;
|
|
|
private final DiscoveryNode localNode;
|
|
|
-
|
|
|
private final ClusterService clusterService;
|
|
|
private final StatusResolver resolver;
|
|
|
|
|
@@ -116,7 +113,7 @@ public class TransportGetStatusAction extends TransportMasterNodeAction<GetStatu
|
|
|
|
|
|
@Override
|
|
|
public void onNewClusterState(ClusterState state) {
|
|
|
- listener.onResponse(resolver.getResponse(state));
|
|
|
+ resolver.execute(state, listener);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -126,62 +123,64 @@ public class TransportGetStatusAction extends TransportMasterNodeAction<GetStatu
|
|
|
|
|
|
@Override
|
|
|
public void onTimeout(TimeValue timeout) {
|
|
|
- GetStatusAction.Response response = resolver.getResponse(clusterService.state());
|
|
|
- response.setTimedOut(true);
|
|
|
- listener.onResponse(response);
|
|
|
+ resolver.execute(clusterService.state(), ActionListener.wrap(response -> {
|
|
|
+ response.setTimedOut(true);
|
|
|
+ listener.onResponse(response);
|
|
|
+ }, listener::onFailure));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private static class StatusResolver {
|
|
|
private final ClusterService clusterService;
|
|
|
- private final IndicesService indicesService;
|
|
|
+ private final NodeClient nodeClient;
|
|
|
|
|
|
- private StatusResolver(ClusterService clusterService, IndicesService indicesService) {
|
|
|
+ private StatusResolver(ClusterService clusterService, NodeClient nodeClient) {
|
|
|
this.clusterService = clusterService;
|
|
|
- this.indicesService = indicesService;
|
|
|
+ this.nodeClient = nodeClient;
|
|
|
}
|
|
|
|
|
|
- private GetStatusAction.Response getResponse(ClusterState state) {
|
|
|
- IndexStateResolver indexStateResolver = new IndexStateResolver(
|
|
|
- getValue(state, ProfilingPlugin.PROFILING_CHECK_OUTDATED_INDICES)
|
|
|
- );
|
|
|
-
|
|
|
- boolean pluginEnabled = getValue(state, XPackSettings.PROFILING_ENABLED);
|
|
|
- boolean resourceManagementEnabled = getValue(state, ProfilingPlugin.PROFILING_TEMPLATES_ENABLED);
|
|
|
-
|
|
|
+ private boolean isResourcesCreated(ClusterState state) {
|
|
|
+ IndexStateResolver indexStateResolver = indexStateResolver(state);
|
|
|
boolean templatesCreated = ProfilingIndexTemplateRegistry.isAllResourcesCreated(state, clusterService.getSettings());
|
|
|
boolean indicesCreated = ProfilingIndexManager.isAllResourcesCreated(state, indexStateResolver);
|
|
|
boolean dataStreamsCreated = ProfilingDataStreamManager.isAllResourcesCreated(state, indexStateResolver);
|
|
|
- boolean resourcesCreated = templatesCreated && indicesCreated && dataStreamsCreated;
|
|
|
+ return templatesCreated && indicesCreated && dataStreamsCreated;
|
|
|
+ }
|
|
|
|
|
|
+ private boolean isAnyPre891Data(ClusterState state) {
|
|
|
+ IndexStateResolver indexStateResolver = indexStateResolver(state);
|
|
|
boolean indicesPre891 = ProfilingIndexManager.isAnyResourceTooOld(state, indexStateResolver);
|
|
|
boolean dataStreamsPre891 = ProfilingDataStreamManager.isAnyResourceTooOld(state, indexStateResolver);
|
|
|
- boolean anyPre891Data = indicesPre891 || dataStreamsPre891;
|
|
|
+ return indicesPre891 || dataStreamsPre891;
|
|
|
+ }
|
|
|
|
|
|
- return new GetStatusAction.Response(pluginEnabled, resourceManagementEnabled, resourcesCreated, anyPre891Data, hasData(state));
|
|
|
+ private IndexStateResolver indexStateResolver(ClusterState state) {
|
|
|
+ return new IndexStateResolver(getValue(state, ProfilingPlugin.PROFILING_CHECK_OUTDATED_INDICES));
|
|
|
}
|
|
|
|
|
|
- private boolean hasData(ClusterState state) {
|
|
|
- DataStream dataStream = state.metadata().dataStreams().get(EventsIndex.FULL_INDEX.getName());
|
|
|
- if (dataStream == null) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- for (Index index : dataStream.getIndices()) {
|
|
|
- IndexMetadata meta = state.metadata().index(index);
|
|
|
- if (meta == null) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- // It should not happen that we have index metadata but no corresponding index service. Be extra defensive and skip.
|
|
|
- IndexService indexService = indicesService.indexService(meta.getIndex());
|
|
|
- if (indexService != null) {
|
|
|
- for (IndexShard indexShard : indexService) {
|
|
|
- if (indexShard.isReadAllowed() && indexShard.docStats().getCount() > 0L) {
|
|
|
- return true;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ private void execute(ClusterState state, ActionListener<GetStatusAction.Response> listener) {
|
|
|
+ boolean pluginEnabled = getValue(state, XPackSettings.PROFILING_ENABLED);
|
|
|
+ boolean resourceManagementEnabled = getValue(state, ProfilingPlugin.PROFILING_TEMPLATES_ENABLED);
|
|
|
+ boolean resourcesCreated = isResourcesCreated(state);
|
|
|
+ boolean anyPre891Data = isAnyPre891Data(state);
|
|
|
+ // only issue a search if there is any chance that we have data
|
|
|
+ if (resourcesCreated) {
|
|
|
+ SearchRequest countRequest = new SearchRequest(EventsIndex.FULL_INDEX.getName());
|
|
|
+ countRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
|
|
|
+ countRequest.allowPartialSearchResults(true);
|
|
|
+ // we don't need an exact hit count, just whether there are any data at all
|
|
|
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0).trackTotalHits(true).trackTotalHitsUpTo(1);
|
|
|
+ countRequest.source(searchSourceBuilder);
|
|
|
+
|
|
|
+ nodeClient.search(countRequest, ActionListener.wrap(searchResponse -> {
|
|
|
+ boolean hasData = searchResponse.getHits().getTotalHits().value > 0;
|
|
|
+ listener.onResponse(
|
|
|
+ new GetStatusAction.Response(pluginEnabled, resourceManagementEnabled, resourcesCreated, anyPre891Data, hasData)
|
|
|
+ );
|
|
|
+ }, listener::onFailure));
|
|
|
+ } else {
|
|
|
+ listener.onResponse(new GetStatusAction.Response(pluginEnabled, resourceManagementEnabled, false, anyPre891Data, false));
|
|
|
}
|
|
|
- return false;
|
|
|
}
|
|
|
|
|
|
private boolean getValue(ClusterState state, Setting<Boolean> setting) {
|