|  | @@ -16,6 +16,7 @@ import org.elasticsearch.action.search.SearchResponse;
 | 
	
		
			
				|  |  |  import org.elasticsearch.client.internal.Client;
 | 
	
		
			
				|  |  |  import org.elasticsearch.client.internal.OriginSettingClient;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.ClusterState;
 | 
	
		
			
				|  |  | +import org.elasticsearch.cluster.ProjectState;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.metadata.IndexAbstraction;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.metadata.ProjectId;
 | 
	
		
			
				|  |  |  import org.elasticsearch.cluster.metadata.ProjectMetadata;
 | 
	
	
		
			
				|  | @@ -288,104 +289,102 @@ public final class DatabaseNodeService implements IpDatabaseProvider {
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          // Optimization: only load the .geoip_databases for projects that are allocated to this node
 | 
	
		
			
				|  |  | -        for (ProjectMetadata projectMetadata : state.metadata().projects().values()) {
 | 
	
		
			
				|  |  | -            ProjectId projectId = projectMetadata.id();
 | 
	
		
			
				|  |  | +        state.forEachProject(this::checkDatabases);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            PersistentTasksCustomMetadata persistentTasks = projectMetadata.custom(PersistentTasksCustomMetadata.TYPE);
 | 
	
		
			
				|  |  | -            if (persistentTasks == null) {
 | 
	
		
			
				|  |  | -                logger.trace("Not checking databases for project [{}] because persistent tasks are null", projectId);
 | 
	
		
			
				|  |  | -                continue;
 | 
	
		
			
				|  |  | -            }
 | 
	
		
			
				|  |  | +    void checkDatabases(ProjectState projectState) {
 | 
	
		
			
				|  |  | +        ProjectId projectId = projectState.projectId();
 | 
	
		
			
				|  |  | +        ProjectMetadata projectMetadata = projectState.metadata();
 | 
	
		
			
				|  |  | +        PersistentTasksCustomMetadata persistentTasks = projectMetadata.custom(PersistentTasksCustomMetadata.TYPE);
 | 
	
		
			
				|  |  | +        if (persistentTasks == null) {
 | 
	
		
			
				|  |  | +            logger.trace("Not checking databases for project [{}] because persistent tasks are null", projectId);
 | 
	
		
			
				|  |  | +            return;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            IndexAbstraction databasesAbstraction = projectMetadata.getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX);
 | 
	
		
			
				|  |  | -            if (databasesAbstraction == null) {
 | 
	
		
			
				|  |  | -                logger.trace("Not checking databases because geoip databases index does not exist for project [{}]", projectId);
 | 
	
		
			
				|  |  | +        IndexAbstraction databasesAbstraction = projectMetadata.getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX);
 | 
	
		
			
				|  |  | +        if (databasesAbstraction == null) {
 | 
	
		
			
				|  |  | +            logger.trace("Not checking databases because geoip databases index does not exist for project [{}]", projectId);
 | 
	
		
			
				|  |  | +            return;
 | 
	
		
			
				|  |  | +        } else {
 | 
	
		
			
				|  |  | +            // regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index
 | 
	
		
			
				|  |  | +            Index databasesIndex = databasesAbstraction.getWriteIndex();
 | 
	
		
			
				|  |  | +            IndexRoutingTable databasesIndexRT = projectState.routingTable().index(databasesIndex);
 | 
	
		
			
				|  |  | +            if (databasesIndexRT == null || databasesIndexRT.allPrimaryShardsActive() == false) {
 | 
	
		
			
				|  |  | +                logger.trace(
 | 
	
		
			
				|  |  | +                    "Not checking databases because geoip databases index does not have all active primary shards for project [{}]",
 | 
	
		
			
				|  |  | +                    projectId
 | 
	
		
			
				|  |  | +                );
 | 
	
		
			
				|  |  |                  return;
 | 
	
		
			
				|  |  | -            } else {
 | 
	
		
			
				|  |  | -                // regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index
 | 
	
		
			
				|  |  | -                Index databasesIndex = databasesAbstraction.getWriteIndex();
 | 
	
		
			
				|  |  | -                IndexRoutingTable databasesIndexRT = state.routingTable(projectId).index(databasesIndex);
 | 
	
		
			
				|  |  | -                if (databasesIndexRT == null || databasesIndexRT.allPrimaryShardsActive() == false) {
 | 
	
		
			
				|  |  | -                    logger.trace(
 | 
	
		
			
				|  |  | -                        "Not checking databases because geoip databases index does not have all active primary shards for"
 | 
	
		
			
				|  |  | -                            + " project [{}]",
 | 
	
		
			
				|  |  | -                        projectId
 | 
	
		
			
				|  |  | -                    );
 | 
	
		
			
				|  |  | -                    return;
 | 
	
		
			
				|  |  | -                }
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            // we'll consult each of the geoip downloaders to build up a list of database metadatas to work with
 | 
	
		
			
				|  |  | -            List<Tuple<String, GeoIpTaskState.Metadata>> validMetadatas = new ArrayList<>();
 | 
	
		
			
				|  |  | +        // we'll consult each of the geoip downloaders to build up a list of database metadatas to work with
 | 
	
		
			
				|  |  | +        List<Tuple<String, GeoIpTaskState.Metadata>> validMetadatas = new ArrayList<>();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            // process the geoip task state for the (ordinary) geoip downloader
 | 
	
		
			
				|  |  | -            {
 | 
	
		
			
				|  |  | -                GeoIpTaskState taskState = getGeoIpTaskState(
 | 
	
		
			
				|  |  | -                    projectMetadata,
 | 
	
		
			
				|  |  | -                    getTaskId(projectId, projectResolver.supportsMultipleProjects())
 | 
	
		
			
				|  |  | -                );
 | 
	
		
			
				|  |  | -                if (taskState == null) {
 | 
	
		
			
				|  |  | -                    // Note: an empty state will purge stale entries in databases map
 | 
	
		
			
				|  |  | -                    taskState = GeoIpTaskState.EMPTY;
 | 
	
		
			
				|  |  | -                }
 | 
	
		
			
				|  |  | -                validMetadatas.addAll(
 | 
	
		
			
				|  |  | -                    taskState.getDatabases()
 | 
	
		
			
				|  |  | -                        .entrySet()
 | 
	
		
			
				|  |  | -                        .stream()
 | 
	
		
			
				|  |  | -                        .filter(e -> e.getValue().isNewEnough(state.getMetadata().settings()))
 | 
	
		
			
				|  |  | -                        .map(entry -> Tuple.tuple(entry.getKey(), entry.getValue()))
 | 
	
		
			
				|  |  | -                        .toList()
 | 
	
		
			
				|  |  | -                );
 | 
	
		
			
				|  |  | +        // process the geoip task state for the (ordinary) geoip downloader
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            GeoIpTaskState taskState = getGeoIpTaskState(projectMetadata, getTaskId(projectId, projectResolver.supportsMultipleProjects()));
 | 
	
		
			
				|  |  | +            if (taskState == null) {
 | 
	
		
			
				|  |  | +                // Note: an empty state will purge stale entries in databases map
 | 
	
		
			
				|  |  | +                taskState = GeoIpTaskState.EMPTY;
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | +            validMetadatas.addAll(
 | 
	
		
			
				|  |  | +                taskState.getDatabases()
 | 
	
		
			
				|  |  | +                    .entrySet()
 | 
	
		
			
				|  |  | +                    .stream()
 | 
	
		
			
				|  |  | +                    .filter(e -> e.getValue().isNewEnough(projectState.cluster().metadata().settings()))
 | 
	
		
			
				|  |  | +                    .map(entry -> Tuple.tuple(entry.getKey(), entry.getValue()))
 | 
	
		
			
				|  |  | +                    .toList()
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            // process the geoip task state for the enterprise geoip downloader
 | 
	
		
			
				|  |  | -            {
 | 
	
		
			
				|  |  | -                EnterpriseGeoIpTaskState taskState = getEnterpriseGeoIpTaskState(state);
 | 
	
		
			
				|  |  | -                if (taskState == null) {
 | 
	
		
			
				|  |  | -                    // Note: an empty state will purge stale entries in databases map
 | 
	
		
			
				|  |  | -                    taskState = EnterpriseGeoIpTaskState.EMPTY;
 | 
	
		
			
				|  |  | -                }
 | 
	
		
			
				|  |  | -                validMetadatas.addAll(
 | 
	
		
			
				|  |  | -                    taskState.getDatabases()
 | 
	
		
			
				|  |  | -                        .entrySet()
 | 
	
		
			
				|  |  | -                        .stream()
 | 
	
		
			
				|  |  | -                        .filter(e -> e.getValue().isNewEnough(state.getMetadata().settings()))
 | 
	
		
			
				|  |  | -                        .map(entry -> Tuple.tuple(entry.getKey(), entry.getValue()))
 | 
	
		
			
				|  |  | -                        .toList()
 | 
	
		
			
				|  |  | -                );
 | 
	
		
			
				|  |  | +        // process the geoip task state for the enterprise geoip downloader
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            EnterpriseGeoIpTaskState taskState = getEnterpriseGeoIpTaskState(projectState.metadata());
 | 
	
		
			
				|  |  | +            if (taskState == null) {
 | 
	
		
			
				|  |  | +                // Note: an empty state will purge stale entries in databases map
 | 
	
		
			
				|  |  | +                taskState = EnterpriseGeoIpTaskState.EMPTY;
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | +            validMetadatas.addAll(
 | 
	
		
			
				|  |  | +                taskState.getDatabases()
 | 
	
		
			
				|  |  | +                    .entrySet()
 | 
	
		
			
				|  |  | +                    .stream()
 | 
	
		
			
				|  |  | +                    .filter(e -> e.getValue().isNewEnough(projectState.cluster().metadata().settings()))
 | 
	
		
			
				|  |  | +                    .map(entry -> Tuple.tuple(entry.getKey(), entry.getValue()))
 | 
	
		
			
				|  |  | +                    .toList()
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            // run through all the valid metadatas, regardless of source, and retrieve them if the persistent downloader task
 | 
	
		
			
				|  |  | -            // has downloaded a new version of the databases
 | 
	
		
			
				|  |  | -            validMetadatas.forEach(e -> {
 | 
	
		
			
				|  |  | -                String name = e.v1();
 | 
	
		
			
				|  |  | -                GeoIpTaskState.Metadata metadata = e.v2();
 | 
	
		
			
				|  |  | -                DatabaseReaderLazyLoader reference = getProjectLazyLoader(projectId, name);
 | 
	
		
			
				|  |  | -                String remoteMd5 = metadata.md5();
 | 
	
		
			
				|  |  | -                String localMd5 = reference != null ? reference.getMd5() : null;
 | 
	
		
			
				|  |  | -                if (Objects.equals(localMd5, remoteMd5)) {
 | 
	
		
			
				|  |  | -                    logger.debug("[{}] is up to date [{}] with cluster state [{}]", name, localMd5, remoteMd5);
 | 
	
		
			
				|  |  | -                    return;
 | 
	
		
			
				|  |  | -                }
 | 
	
		
			
				|  |  | +        // run through all the valid metadatas, regardless of source, and retrieve them if the persistent downloader task
 | 
	
		
			
				|  |  | +        // has downloaded a new version of the databases
 | 
	
		
			
				|  |  | +        validMetadatas.forEach(e -> {
 | 
	
		
			
				|  |  | +            String name = e.v1();
 | 
	
		
			
				|  |  | +            GeoIpTaskState.Metadata metadata = e.v2();
 | 
	
		
			
				|  |  | +            DatabaseReaderLazyLoader reference = getProjectLazyLoader(projectId, name);
 | 
	
		
			
				|  |  | +            String remoteMd5 = metadata.md5();
 | 
	
		
			
				|  |  | +            String localMd5 = reference != null ? reference.getMd5() : null;
 | 
	
		
			
				|  |  | +            if (Objects.equals(localMd5, remoteMd5)) {
 | 
	
		
			
				|  |  | +                logger.debug("[{}] is up to date [{}] with cluster state [{}]", name, localMd5, remoteMd5);
 | 
	
		
			
				|  |  | +                return;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                try {
 | 
	
		
			
				|  |  | -                    retrieveAndUpdateDatabase(projectId, name, metadata);
 | 
	
		
			
				|  |  | -                } catch (Exception ex) {
 | 
	
		
			
				|  |  | -                    logger.error(() -> "failed to retrieve database [" + name + "]", ex);
 | 
	
		
			
				|  |  | -                }
 | 
	
		
			
				|  |  | -            });
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -            // TODO perhaps we need to handle the license flap persistent task state better than we do
 | 
	
		
			
				|  |  | -            // i think the ideal end state is that we *do not* drop the files that the enterprise downloader
 | 
	
		
			
				|  |  | -            // handled if they fall out -- which means we need to track that in the databases map itself
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -            // start with the list of all databases we currently know about in this service,
 | 
	
		
			
				|  |  | -            // then drop the ones that didn't check out as valid from the task states
 | 
	
		
			
				|  |  | -            if (databases.containsKey(projectId)) {
 | 
	
		
			
				|  |  | -                Set<String> staleDatabases = new HashSet<>(databases.get(projectId).keySet());
 | 
	
		
			
				|  |  | -                staleDatabases.removeAll(validMetadatas.stream().map(Tuple::v1).collect(Collectors.toSet()));
 | 
	
		
			
				|  |  | -                removeStaleEntries(projectId, staleDatabases);
 | 
	
		
			
				|  |  | +            try {
 | 
	
		
			
				|  |  | +                retrieveAndUpdateDatabase(projectId, name, metadata);
 | 
	
		
			
				|  |  | +            } catch (Exception ex) {
 | 
	
		
			
				|  |  | +                logger.error(() -> "failed to retrieve database [" + name + "]", ex);
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | +        });
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // TODO perhaps we need to handle the license flap persistent task state better than we do
 | 
	
		
			
				|  |  | +        // i think the ideal end state is that we *do not* drop the files that the enterprise downloader
 | 
	
		
			
				|  |  | +        // handled if they fall out -- which means we need to track that in the databases map itself
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // start with the list of all databases we currently know about in this service,
 | 
	
		
			
				|  |  | +        // then drop the ones that didn't check out as valid from the task states
 | 
	
		
			
				|  |  | +        if (databases.containsKey(projectId)) {
 | 
	
		
			
				|  |  | +            Set<String> staleDatabases = new HashSet<>(databases.get(projectId).keySet());
 | 
	
		
			
				|  |  | +            staleDatabases.removeAll(validMetadatas.stream().map(Tuple::v1).collect(Collectors.toSet()));
 | 
	
		
			
				|  |  | +            removeStaleEntries(projectId, staleDatabases);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 |