|
@@ -9,7 +9,6 @@
|
|
|
|
|
|
package org.elasticsearch.ingest.geoip.direct;
|
|
|
|
|
|
-import org.elasticsearch.ResourceNotFoundException;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.FailedNodeException;
|
|
|
import org.elasticsearch.action.support.ActionFilters;
|
|
@@ -19,19 +18,28 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.regex.Regex;
|
|
|
import org.elasticsearch.features.FeatureService;
|
|
|
+import org.elasticsearch.ingest.geoip.DatabaseNodeService;
|
|
|
+import org.elasticsearch.ingest.geoip.GeoIpTaskState;
|
|
|
import org.elasticsearch.ingest.geoip.IngestGeoIpMetadata;
|
|
|
import org.elasticsearch.injection.guice.Inject;
|
|
|
+import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
|
|
|
import org.elasticsearch.tasks.Task;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
+import java.util.Base64;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.Comparator;
|
|
|
import java.util.LinkedHashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Optional;
|
|
|
import java.util.Set;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
import static org.elasticsearch.ingest.IngestGeoIpFeatures.GET_DATABASE_CONFIGURATION_ACTION_MULTI_NODE;
|
|
|
|
|
@@ -43,6 +51,7 @@ public class TransportGetDatabaseConfigurationAction extends TransportNodesActio
|
|
|
List<DatabaseConfigurationMetadata>> {
|
|
|
|
|
|
private final FeatureService featureService;
|
|
|
+ private final DatabaseNodeService databaseNodeService;
|
|
|
|
|
|
@Inject
|
|
|
public TransportGetDatabaseConfigurationAction(
|
|
@@ -50,7 +59,8 @@ public class TransportGetDatabaseConfigurationAction extends TransportNodesActio
|
|
|
ClusterService clusterService,
|
|
|
ThreadPool threadPool,
|
|
|
ActionFilters actionFilters,
|
|
|
- FeatureService featureService
|
|
|
+ FeatureService featureService,
|
|
|
+ DatabaseNodeService databaseNodeService
|
|
|
) {
|
|
|
super(
|
|
|
GetDatabaseConfigurationAction.NAME,
|
|
@@ -61,6 +71,7 @@ public class TransportGetDatabaseConfigurationAction extends TransportNodesActio
|
|
|
threadPool.executor(ThreadPool.Names.MANAGEMENT)
|
|
|
);
|
|
|
this.featureService = featureService;
|
|
|
+ this.databaseNodeService = databaseNodeService;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -74,9 +85,19 @@ public class TransportGetDatabaseConfigurationAction extends TransportNodesActio
|
|
|
* TransportGetDatabaseConfigurationAction used to be a TransportMasterNodeAction, and not all nodes in the cluster have been
|
|
|
* updated. So we don't want to send node requests to the other nodes because they will blow up. Instead, we just return
|
|
|
* the information that we used to return from the master node (it doesn't make any difference that this might not be the master
|
|
|
- * node, because we're only reading the cluster state).
|
|
|
+ * node, because we're only reading the cluster state). Because older nodes only know about the Maxmind provider type, we filter
|
|
|
+ * out all others here to avoid causing problems on those nodes.
|
|
|
*/
|
|
|
- newResponseAsync(task, request, createActionContext(task, request), List.of(), List.of(), listener);
|
|
|
+ newResponseAsync(
|
|
|
+ task,
|
|
|
+ request,
|
|
|
+ createActionContext(task, request).stream()
|
|
|
+ .filter(database -> database.database().provider() instanceof DatabaseConfiguration.Maxmind)
|
|
|
+ .toList(),
|
|
|
+ List.of(),
|
|
|
+ List.of(),
|
|
|
+ listener
|
|
|
+ );
|
|
|
} else {
|
|
|
super.doExecute(task, request, listener);
|
|
|
}
|
|
@@ -97,28 +118,79 @@ public class TransportGetDatabaseConfigurationAction extends TransportNodesActio
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- final IngestGeoIpMetadata geoIpMeta = clusterService.state().metadata().custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
|
|
|
List<DatabaseConfigurationMetadata> results = new ArrayList<>();
|
|
|
-
|
|
|
+ PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(
|
|
|
+ clusterService.state()
|
|
|
+ );
|
|
|
for (String id : ids) {
|
|
|
- if (Regex.isSimpleMatchPattern(id)) {
|
|
|
- for (Map.Entry<String, DatabaseConfigurationMetadata> entry : geoIpMeta.getDatabases().entrySet()) {
|
|
|
- if (Regex.simpleMatch(id, entry.getKey())) {
|
|
|
- results.add(entry.getValue());
|
|
|
+ results.addAll(getWebDatabases(tasksMetadata, id));
|
|
|
+ results.addAll(getMaxmindDatabases(clusterService, id));
|
|
|
+ }
|
|
|
+ return results;
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * This returns read-only database information about the databases managed by the standard downloader
|
|
|
+ */
|
|
|
+ private static Collection<DatabaseConfigurationMetadata> getWebDatabases(PersistentTasksCustomMetadata tasksMetadata, String id) {
|
|
|
+ List<DatabaseConfigurationMetadata> webDatabases = new ArrayList<>();
|
|
|
+ if (tasksMetadata != null) {
|
|
|
+ PersistentTasksCustomMetadata.PersistentTask<?> maybeGeoIpTask = tasksMetadata.getTask("geoip-downloader");
|
|
|
+ if (maybeGeoIpTask != null) {
|
|
|
+ GeoIpTaskState geoIpTaskState = (GeoIpTaskState) maybeGeoIpTask.getState();
|
|
|
+ if (geoIpTaskState != null) {
|
|
|
+ Map<String, GeoIpTaskState.Metadata> databases = geoIpTaskState.getDatabases();
|
|
|
+ for (String databaseFileName : databases.keySet()) {
|
|
|
+ String databaseName = getDatabaseNameForFileName(databaseFileName);
|
|
|
+ String databaseId = getDatabaseIdForFileName(DatabaseConfiguration.Web.NAME, databaseFileName);
|
|
|
+ if ((Regex.isSimpleMatchPattern(id) && Regex.simpleMatch(id, databaseId)) || id.equals(databaseId)) {
|
|
|
+ webDatabases.add(
|
|
|
+ new DatabaseConfigurationMetadata(
|
|
|
+ new DatabaseConfiguration(databaseId, databaseName, new DatabaseConfiguration.Web()),
|
|
|
+ -1,
|
|
|
+ databases.get(databaseFileName).lastUpdate()
|
|
|
+ )
|
|
|
+ );
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- } else {
|
|
|
- DatabaseConfigurationMetadata meta = geoIpMeta.getDatabases().get(id);
|
|
|
- if (meta == null) {
|
|
|
- throw new ResourceNotFoundException("database configuration not found: {}", id);
|
|
|
- } else {
|
|
|
- results.add(meta);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return webDatabases;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String getDatabaseIdForFileName(String providerType, String databaseFileName) {
|
|
|
+ return "_" + providerType + "_" + Base64.getEncoder().encodeToString(databaseFileName.getBytes(StandardCharsets.UTF_8));
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String getDatabaseNameForFileName(String databaseFileName) {
|
|
|
+ return databaseFileName.endsWith(".mmdb")
|
|
|
+ ? databaseFileName.substring(0, databaseFileName.length() + 1 - ".mmmdb".length())
|
|
|
+ : databaseFileName;
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * This returns information about databases that are downloaded from maxmind.
|
|
|
+ */
|
|
|
+ private static Collection<DatabaseConfigurationMetadata> getMaxmindDatabases(ClusterService clusterService, String id) {
|
|
|
+ List<DatabaseConfigurationMetadata> maxmindDatabases = new ArrayList<>();
|
|
|
+ final IngestGeoIpMetadata geoIpMeta = clusterService.state().metadata().custom(IngestGeoIpMetadata.TYPE, IngestGeoIpMetadata.EMPTY);
|
|
|
+ if (Regex.isSimpleMatchPattern(id)) {
|
|
|
+ for (Map.Entry<String, DatabaseConfigurationMetadata> entry : geoIpMeta.getDatabases().entrySet()) {
|
|
|
+ if (Regex.simpleMatch(id, entry.getKey())) {
|
|
|
+ maxmindDatabases.add(entry.getValue());
|
|
|
}
|
|
|
}
|
|
|
+ } else {
|
|
|
+ DatabaseConfigurationMetadata meta = geoIpMeta.getDatabases().get(id);
|
|
|
+ if (meta != null) {
|
|
|
+ maxmindDatabases.add(meta);
|
|
|
+ }
|
|
|
}
|
|
|
- return results;
|
|
|
+ return maxmindDatabases;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
protected void newResponseAsync(
|
|
|
Task task,
|
|
|
GetDatabaseConfigurationAction.Request request,
|
|
@@ -127,13 +199,47 @@ public class TransportGetDatabaseConfigurationAction extends TransportNodesActio
|
|
|
List<FailedNodeException> failures,
|
|
|
ActionListener<GetDatabaseConfigurationAction.Response> listener
|
|
|
) {
|
|
|
- ActionListener.run(
|
|
|
- listener,
|
|
|
- l -> ActionListener.respondAndRelease(
|
|
|
+ ActionListener.run(listener, l -> {
|
|
|
+ List<DatabaseConfigurationMetadata> combinedResults = new ArrayList<>(results);
|
|
|
+ combinedResults.addAll(
|
|
|
+ deduplicateNodeResponses(responses, results.stream().map(result -> result.database().name()).collect(Collectors.toSet()))
|
|
|
+ );
|
|
|
+ ActionListener.respondAndRelease(
|
|
|
l,
|
|
|
- new GetDatabaseConfigurationAction.Response(results, clusterService.getClusterName(), responses, failures)
|
|
|
+ new GetDatabaseConfigurationAction.Response(combinedResults, clusterService.getClusterName(), responses, failures)
|
|
|
+ );
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * This deduplicates the nodeResponses by name, favoring the most recent. This is because each node is reporting the local databases
|
|
|
+ * that it has, and we don't want to report duplicates to the user. It also filters out any that already exist in the set of
|
|
|
+ * preExistingNames. This is because the non-local databases take precedence, so any local database with the same name as a non-local
|
|
|
+ * one will not be used.
|
|
|
+ * Non-private for unit testing
|
|
|
+ */
|
|
|
+ static Collection<DatabaseConfigurationMetadata> deduplicateNodeResponses(
|
|
|
+ List<GetDatabaseConfigurationAction.NodeResponse> nodeResponses,
|
|
|
+ Set<String> preExistingNames
|
|
|
+ ) {
|
|
|
+ /*
|
|
|
+ * Each node reports the list of databases that are in its config/ingest-geoip directory. For the sake of this API we assume all
|
|
|
+ * local databases with the same name are the same database, and deduplicate by name and just return the newest.
|
|
|
+ */
|
|
|
+ return nodeResponses.stream()
|
|
|
+ .flatMap(response -> response.getDatabases().stream())
|
|
|
+ .collect(
|
|
|
+ Collectors.groupingBy(
|
|
|
+ database -> database.database().name(),
|
|
|
+ Collectors.maxBy(Comparator.comparing(DatabaseConfigurationMetadata::modifiedDate))
|
|
|
+ )
|
|
|
)
|
|
|
- );
|
|
|
+ .values()
|
|
|
+ .stream()
|
|
|
+ .filter(Optional::isPresent)
|
|
|
+ .map(Optional::get)
|
|
|
+ .filter(database -> preExistingNames.contains(database.database().name()) == false)
|
|
|
+ .toList();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -157,7 +263,48 @@ public class TransportGetDatabaseConfigurationAction extends TransportNodesActio
|
|
|
|
|
|
@Override
|
|
|
protected GetDatabaseConfigurationAction.NodeResponse nodeOperation(GetDatabaseConfigurationAction.NodeRequest request, Task task) {
|
|
|
- return new GetDatabaseConfigurationAction.NodeResponse(transportService.getLocalNode(), List.of());
|
|
|
+ final Set<String> ids;
|
|
|
+ if (request.getDatabaseIds().length == 0) {
|
|
|
+ // if we did not ask for a specific name, then return all databases
|
|
|
+ ids = Set.of("*");
|
|
|
+ } else {
|
|
|
+ ids = new LinkedHashSet<>(Arrays.asList(request.getDatabaseIds()));
|
|
|
+ }
|
|
|
+ if (ids.size() > 1 && ids.stream().anyMatch(Regex::isSimpleMatchPattern)) {
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "wildcard only supports a single value, please use comma-separated values or a single wildcard value"
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ List<DatabaseConfigurationMetadata> results = new ArrayList<>();
|
|
|
+ for (String id : ids) {
|
|
|
+ results.addAll(getLocalDatabases(databaseNodeService, id));
|
|
|
+ }
|
|
|
+ return new GetDatabaseConfigurationAction.NodeResponse(transportService.getLocalNode(), results);
|
|
|
}
|
|
|
|
|
|
+ /*
|
|
|
+ * This returns information about the databases that users have put in the config/ingest-geoip directory on the node.
|
|
|
+ */
|
|
|
+ private static List<DatabaseConfigurationMetadata> getLocalDatabases(DatabaseNodeService databaseNodeService, String id) {
|
|
|
+ List<DatabaseConfigurationMetadata> localDatabases = new ArrayList<>();
|
|
|
+ Map<String, DatabaseNodeService.ConfigDatabaseDetail> configDatabases = databaseNodeService.getConfigDatabasesDetail();
|
|
|
+ for (DatabaseNodeService.ConfigDatabaseDetail configDatabase : configDatabases.values()) {
|
|
|
+ String databaseId = getDatabaseIdForFileName(DatabaseConfiguration.Local.NAME, configDatabase.name());
|
|
|
+ if ((Regex.isSimpleMatchPattern(id) && Regex.simpleMatch(id, databaseId)) || id.equals(databaseId)) {
|
|
|
+ localDatabases.add(
|
|
|
+ new DatabaseConfigurationMetadata(
|
|
|
+ new DatabaseConfiguration(
|
|
|
+ databaseId,
|
|
|
+ getDatabaseNameForFileName(configDatabase.name()),
|
|
|
+ new DatabaseConfiguration.Local(configDatabase.type())
|
|
|
+ ),
|
|
|
+ -1,
|
|
|
+ configDatabase.buildDateInMillis() == null ? -1 : configDatabase.buildDateInMillis()
|
|
|
+ )
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return localDatabases;
|
|
|
+ }
|
|
|
}
|