|
|
@@ -17,11 +17,13 @@ import com.maxmind.geoip2.record.Continent;
|
|
|
import com.maxmind.geoip2.record.Country;
|
|
|
import com.maxmind.geoip2.record.Location;
|
|
|
import com.maxmind.geoip2.record.Subdivision;
|
|
|
+
|
|
|
import org.elasticsearch.ElasticsearchParseException;
|
|
|
import org.elasticsearch.ResourceNotFoundException;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.CheckedSupplier;
|
|
|
+import org.elasticsearch.common.logging.HeaderWarning;
|
|
|
import org.elasticsearch.common.network.InetAddresses;
|
|
|
import org.elasticsearch.common.network.NetworkAddress;
|
|
|
import org.elasticsearch.ingest.AbstractProcessor;
|
|
|
@@ -65,10 +67,11 @@ public final class GeoIpProcessor extends AbstractProcessor {
|
|
|
|
|
|
/**
|
|
|
* Construct a geo-IP processor.
|
|
|
+ *
|
|
|
* @param tag the processor tag
|
|
|
* @param description the processor description
|
|
|
* @param field the source field to geo-IP map
|
|
|
- * @param supplier a supplier of a geo-IP database reader; ideally this is lazily-loaded once on first use
|
|
|
+ * @param supplier a supplier of a geo-IP database reader; ideally this is lazily-loaded once on first use
|
|
|
* @param isValid
|
|
|
* @param targetField the target field
|
|
|
* @param properties the properties; ideally this is lazily-loaded once on first use
|
|
|
@@ -104,7 +107,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
|
|
|
Object ip = ingestDocument.getFieldValue(field, Object.class, ignoreMissing);
|
|
|
|
|
|
if (isValid.get() == false) {
|
|
|
- ingestDocument.appendFieldValue("tags","_geoip_expired_database", false);
|
|
|
+ ingestDocument.appendFieldValue("tags", "_geoip_expired_database", false);
|
|
|
return ingestDocument;
|
|
|
} else if (ip == null && ignoreMissing) {
|
|
|
return ingestDocument;
|
|
|
@@ -367,9 +370,9 @@ public final class GeoIpProcessor extends AbstractProcessor {
|
|
|
|
|
|
@Override
|
|
|
public GeoIpProcessor create(
|
|
|
- final Map<String, Processor.Factory> registry,
|
|
|
- final String processorTag,
|
|
|
- final String description, final Map<String, Object> config) throws IOException {
|
|
|
+ final Map<String, Processor.Factory> registry,
|
|
|
+ final String processorTag,
|
|
|
+ final String description, final Map<String, Object> config) throws IOException {
|
|
|
String ipField = readStringProperty(TYPE, processorTag, config, "field");
|
|
|
String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "geoip");
|
|
|
String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb");
|
|
|
@@ -438,7 +441,17 @@ public final class GeoIpProcessor extends AbstractProcessor {
|
|
|
GeoIpTaskState state = (GeoIpTaskState) task.getState();
|
|
|
GeoIpTaskState.Metadata metadata = state.getDatabases().get(databaseFile);
|
|
|
// we never remove metadata from cluster state, if metadata is null we deal with built-in database, which is always valid
|
|
|
- return metadata == null || metadata.isValid(currentState.metadata().settings());
|
|
|
+ if (metadata == null) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean valid = metadata.isValid(currentState.metadata().settings());
|
|
|
+ if (valid && metadata.isCloseToExpiration()) {
|
|
|
+ HeaderWarning.addWarning("database [{}] was not updated for over 25 days, geoip processor will stop working if there " +
|
|
|
+ "is no update for 30 days", databaseFile);
|
|
|
+ }
|
|
|
+
|
|
|
+ return valid;
|
|
|
};
|
|
|
return new GeoIpProcessor(processorTag, description, ipField, supplier, isValid, targetField, properties, ignoreMissing,
|
|
|
firstOnly);
|