|
|
@@ -9,7 +9,6 @@ package org.elasticsearch.ingest.geoip;
|
|
|
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
-import org.apache.logging.log4j.util.Supplier;
|
|
|
import org.elasticsearch.ResourceNotFoundException;
|
|
|
import org.elasticsearch.action.search.SearchRequest;
|
|
|
import org.elasticsearch.action.search.SearchResponse;
|
|
|
@@ -87,7 +86,7 @@ import static org.elasticsearch.persistent.PersistentTasksCustomMetadata.getTask
|
|
|
*/
|
|
|
public final class DatabaseNodeService implements GeoIpDatabaseProvider, Closeable {
|
|
|
|
|
|
- private static final Logger LOGGER = LogManager.getLogger(DatabaseNodeService.class);
|
|
|
+ private static final Logger logger = LogManager.getLogger(DatabaseNodeService.class);
|
|
|
|
|
|
private final Client client;
|
|
|
private final GeoIpCache cache;
|
|
|
@@ -145,10 +144,10 @@ public final class DatabaseNodeService implements GeoIpDatabaseProvider, Closeab
|
|
|
@Override
|
|
|
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
|
|
|
try {
|
|
|
- LOGGER.info("deleting stale file [{}]", file);
|
|
|
+ logger.info("deleting stale file [{}]", file);
|
|
|
Files.deleteIfExists(file);
|
|
|
} catch (IOException e) {
|
|
|
- LOGGER.warn("can't delete stale file [" + file + "]", e);
|
|
|
+ logger.warn("can't delete stale file [" + file + "]", e);
|
|
|
}
|
|
|
return FileVisitResult.CONTINUE;
|
|
|
}
|
|
|
@@ -156,7 +155,7 @@ public final class DatabaseNodeService implements GeoIpDatabaseProvider, Closeab
|
|
|
@Override
|
|
|
public FileVisitResult visitFileFailed(Path file, IOException e) {
|
|
|
if (e instanceof NoSuchFileException == false) {
|
|
|
- LOGGER.warn("can't delete stale file [" + file + "]", e);
|
|
|
+ logger.warn("can't delete stale file [" + file + "]", e);
|
|
|
}
|
|
|
return FileVisitResult.CONTINUE;
|
|
|
}
|
|
|
@@ -169,7 +168,7 @@ public final class DatabaseNodeService implements GeoIpDatabaseProvider, Closeab
|
|
|
if (Files.exists(geoipTmpDirectory) == false) {
|
|
|
Files.createDirectories(geoipTmpDirectory);
|
|
|
}
|
|
|
- LOGGER.debug("initialized database node service, using geoip-databases directory [{}]", geoipTmpDirectory);
|
|
|
+ logger.debug("initialized database node service, using geoip-databases directory [{}]", geoipTmpDirectory);
|
|
|
this.ingestService = ingestServiceArg;
|
|
|
clusterService.addListener(event -> checkDatabases(event.state()));
|
|
|
}
|
|
|
@@ -246,26 +245,26 @@ public final class DatabaseNodeService implements GeoIpDatabaseProvider, Closeab
|
|
|
|
|
|
DiscoveryNode localNode = state.nodes().getLocalNode();
|
|
|
if (localNode.isIngestNode() == false) {
|
|
|
- LOGGER.trace("Not checking databases because local node is not ingest node");
|
|
|
+ logger.trace("Not checking databases because local node is not ingest node");
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
PersistentTasksCustomMetadata persistentTasks = state.metadata().custom(PersistentTasksCustomMetadata.TYPE);
|
|
|
if (persistentTasks == null) {
|
|
|
- LOGGER.trace("Not checking databases because persistent tasks are null");
|
|
|
+ logger.trace("Not checking databases because persistent tasks are null");
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
IndexAbstraction databasesAbstraction = state.getMetadata().getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX);
|
|
|
if (databasesAbstraction == null) {
|
|
|
- LOGGER.trace("Not checking databases because geoip databases index does not exist");
|
|
|
+ logger.trace("Not checking databases because geoip databases index does not exist");
|
|
|
return;
|
|
|
} else {
|
|
|
// regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index
|
|
|
Index databasesIndex = databasesAbstraction.getWriteIndex();
|
|
|
IndexRoutingTable databasesIndexRT = state.getRoutingTable().index(databasesIndex);
|
|
|
if (databasesIndexRT == null || databasesIndexRT.allPrimaryShardsActive() == false) {
|
|
|
- LOGGER.trace("Not checking databases because geoip databases index does not have all active primary shards");
|
|
|
+ logger.trace("Not checking databases because geoip databases index does not have all active primary shards");
|
|
|
return;
|
|
|
}
|
|
|
}
|
|
|
@@ -284,14 +283,14 @@ public final class DatabaseNodeService implements GeoIpDatabaseProvider, Closeab
|
|
|
String remoteMd5 = metadata.md5();
|
|
|
String localMd5 = reference != null ? reference.getMd5() : null;
|
|
|
if (Objects.equals(localMd5, remoteMd5)) {
|
|
|
- LOGGER.debug("Current reference of [{}] is up to date [{}] with was recorded in CS [{}]", name, localMd5, remoteMd5);
|
|
|
+ logger.debug("Current reference of [{}] is up to date [{}] with was recorded in CS [{}]", name, localMd5, remoteMd5);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
retrieveAndUpdateDatabase(name, metadata);
|
|
|
} catch (Exception ex) {
|
|
|
- LOGGER.error((Supplier<?>) () -> "attempt to download database [" + name + "] failed", ex);
|
|
|
+ logger.error(() -> "attempt to download database [" + name + "] failed", ex);
|
|
|
}
|
|
|
});
|
|
|
|
|
|
@@ -308,7 +307,7 @@ public final class DatabaseNodeService implements GeoIpDatabaseProvider, Closeab
|
|
|
}
|
|
|
|
|
|
void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata metadata) throws IOException {
|
|
|
- LOGGER.trace("Retrieving database {}", databaseName);
|
|
|
+ logger.trace("Retrieving database {}", databaseName);
|
|
|
final String recordedMd5 = metadata.md5();
|
|
|
|
|
|
// This acts as a lock, if this method for a specific db is executed later and downloaded for this db is still ongoing then
|
|
|
@@ -318,7 +317,7 @@ public final class DatabaseNodeService implements GeoIpDatabaseProvider, Closeab
|
|
|
try {
|
|
|
databaseTmpGzFile = Files.createFile(geoipTmpDirectory.resolve(databaseName + ".tmp.gz"));
|
|
|
} catch (FileAlreadyExistsException e) {
|
|
|
- LOGGER.debug("database update [{}] already in progress, skipping...", databaseName);
|
|
|
+ logger.debug("database update [{}] already in progress, skipping...", databaseName);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
@@ -330,20 +329,20 @@ public final class DatabaseNodeService implements GeoIpDatabaseProvider, Closeab
|
|
|
// twice. This check is here to avoid this:
|
|
|
DatabaseReaderLazyLoader lazyLoader = databases.get(databaseName);
|
|
|
if (lazyLoader != null && recordedMd5.equals(lazyLoader.getMd5())) {
|
|
|
- LOGGER.debug("deleting tmp file because database [{}] has already been updated.", databaseName);
|
|
|
+ logger.debug("deleting tmp file because database [{}] has already been updated.", databaseName);
|
|
|
Files.delete(databaseTmpGzFile);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
final Path databaseTmpFile = Files.createFile(geoipTmpDirectory.resolve(databaseName + ".tmp"));
|
|
|
- LOGGER.debug("retrieve geoip database [{}] from [{}] to [{}]", databaseName, GeoIpDownloader.DATABASES_INDEX, databaseTmpGzFile);
|
|
|
+ logger.debug("retrieve geoip database [{}] from [{}] to [{}]", databaseName, GeoIpDownloader.DATABASES_INDEX, databaseTmpGzFile);
|
|
|
retrieveDatabase(
|
|
|
databaseName,
|
|
|
recordedMd5,
|
|
|
metadata,
|
|
|
bytes -> Files.write(databaseTmpGzFile, bytes, StandardOpenOption.APPEND),
|
|
|
() -> {
|
|
|
- LOGGER.debug("decompressing [{}]", databaseTmpGzFile.getFileName());
|
|
|
+ logger.debug("decompressing [{}]", databaseTmpGzFile.getFileName());
|
|
|
|
|
|
Path databaseFile = geoipTmpDirectory.resolve(databaseName);
|
|
|
// tarball contains <database_name>.mmdb, LICENSE.txt, COPYRIGHTS.txt and optional README.txt files.
|
|
|
@@ -369,19 +368,19 @@ public final class DatabaseNodeService implements GeoIpDatabaseProvider, Closeab
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- LOGGER.debug("moving database from [{}] to [{}]", databaseTmpFile, databaseFile);
|
|
|
+ logger.debug("moving database from [{}] to [{}]", databaseTmpFile, databaseFile);
|
|
|
Files.move(databaseTmpFile, databaseFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
|
|
|
updateDatabase(databaseName, recordedMd5, databaseFile);
|
|
|
Files.delete(databaseTmpGzFile);
|
|
|
},
|
|
|
failure -> {
|
|
|
- LOGGER.error((Supplier<?>) () -> "failed to retrieve database [" + databaseName + "]", failure);
|
|
|
+ logger.error(() -> "failed to retrieve database [" + databaseName + "]", failure);
|
|
|
try {
|
|
|
Files.deleteIfExists(databaseTmpFile);
|
|
|
Files.deleteIfExists(databaseTmpGzFile);
|
|
|
} catch (IOException ioe) {
|
|
|
ioe.addSuppressed(failure);
|
|
|
- LOGGER.error("Unable to delete tmp database file after failure", ioe);
|
|
|
+ logger.error("Unable to delete tmp database file after failure", ioe);
|
|
|
}
|
|
|
}
|
|
|
);
|
|
|
@@ -389,7 +388,7 @@ public final class DatabaseNodeService implements GeoIpDatabaseProvider, Closeab
|
|
|
|
|
|
void updateDatabase(String databaseFileName, String recordedMd5, Path file) {
|
|
|
try {
|
|
|
- LOGGER.debug("starting reload of changed geoip database file [{}]", file);
|
|
|
+ logger.debug("starting reload of changed geoip database file [{}]", file);
|
|
|
DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file, recordedMd5);
|
|
|
DatabaseReaderLazyLoader existing = databases.put(databaseFileName, loader);
|
|
|
if (existing != null) {
|
|
|
@@ -399,41 +398,41 @@ public final class DatabaseNodeService implements GeoIpDatabaseProvider, Closeab
|
|
|
Predicate<GeoIpProcessor.DatabaseUnavailableProcessor> predicate = p -> databaseFileName.equals(p.getDatabaseName());
|
|
|
var ids = ingestService.getPipelineWithProcessorType(GeoIpProcessor.DatabaseUnavailableProcessor.class, predicate);
|
|
|
if (ids.isEmpty() == false) {
|
|
|
- LOGGER.debug("pipelines [{}] found to reload", ids);
|
|
|
+ logger.debug("pipelines [{}] found to reload", ids);
|
|
|
for (var id : ids) {
|
|
|
try {
|
|
|
ingestService.reloadPipeline(id);
|
|
|
- LOGGER.trace(
|
|
|
+ logger.trace(
|
|
|
"successfully reloaded pipeline [{}] after downloading of database [{}] for the first time",
|
|
|
id,
|
|
|
databaseFileName
|
|
|
);
|
|
|
} catch (Exception e) {
|
|
|
- LOGGER.debug(
|
|
|
+ logger.debug(
|
|
|
() -> format("failed to reload pipeline [%s] after downloading of database [%s]", id, databaseFileName),
|
|
|
e
|
|
|
);
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
|
- LOGGER.debug("no pipelines found to reload");
|
|
|
+ logger.debug("no pipelines found to reload");
|
|
|
}
|
|
|
}
|
|
|
- LOGGER.info("successfully loaded geoip database file [{}]", file.getFileName());
|
|
|
+ logger.info("successfully loaded geoip database file [{}]", file.getFileName());
|
|
|
} catch (Exception e) {
|
|
|
- LOGGER.error((Supplier<?>) () -> "failed to update database [" + databaseFileName + "]", e);
|
|
|
+ logger.error(() -> "failed to update database [" + databaseFileName + "]", e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void removeStaleEntries(Collection<String> staleEntries) {
|
|
|
for (String staleEntry : staleEntries) {
|
|
|
try {
|
|
|
- LOGGER.debug("database [{}] no longer exists, cleaning up...", staleEntry);
|
|
|
+ logger.debug("database [{}] no longer exists, cleaning up...", staleEntry);
|
|
|
DatabaseReaderLazyLoader existing = databases.remove(staleEntry);
|
|
|
assert existing != null;
|
|
|
existing.close(true);
|
|
|
} catch (Exception e) {
|
|
|
- LOGGER.error((Supplier<?>) () -> "failed to clean database [" + staleEntry + "]", e);
|
|
|
+ logger.error(() -> "failed to clean database [" + staleEntry + "]", e);
|
|
|
}
|
|
|
}
|
|
|
}
|