Browse Source

Network community_id processor for ingest pipelines (#66534)

Dan Hermann 4 years ago
parent
commit
1bb90a7626

+ 562 - 0
x-pack/plugin/ingest/src/main/java/org/elasticsearch/xpack/ingest/CommunityIdProcessor.java

@@ -0,0 +1,562 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.ingest;
+
+import org.elasticsearch.common.network.InetAddresses;
+import org.elasticsearch.ingest.AbstractProcessor;
+import org.elasticsearch.ingest.ConfigurationUtils;
+import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.Processor;
+
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
+import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
+
+public final class CommunityIdProcessor extends AbstractProcessor {
+
+    public static final String TYPE = "community_id";
+
+    private final String sourceIpField;
+    private final String sourcePortField;
+    private final String destinationIpField;
+    private final String destinationPortField;
+    private final String ianaNumberField;
+    private final String transportField;
+    private final String icmpTypeField;
+    private final String icmpCodeField;
+    private final String targetField;
+    private final MessageDigest messageDigest;
+    private final byte[] seed;
+    private final boolean ignoreMissing;
+
+    CommunityIdProcessor(
+        String tag,
+        String description,
+        String sourceIpField,
+        String sourcePortField,
+        String destinationIpField,
+        String destinationPortField,
+        String ianaNumberField,
+        String transportField,
+        String icmpTypeField,
+        String icmpCodeField,
+        String targetField,
+        MessageDigest messageDigest,
+        byte[] seed,
+        boolean ignoreMissing
+    ) {
+        super(tag, description);
+        this.sourceIpField = sourceIpField;
+        this.sourcePortField = sourcePortField;
+        this.destinationIpField = destinationIpField;
+        this.destinationPortField = destinationPortField;
+        this.ianaNumberField = ianaNumberField;
+        this.transportField = transportField;
+        this.icmpTypeField = icmpTypeField;
+        this.icmpCodeField = icmpCodeField;
+        this.targetField = targetField;
+        this.messageDigest = messageDigest;
+        this.seed = seed;
+        this.ignoreMissing = ignoreMissing;
+    }
+
+    public String getSourceIpField() {
+        return sourceIpField;
+    }
+
+    public String getSourcePortField() {
+        return sourcePortField;
+    }
+
+    public String getDestinationIpField() {
+        return destinationIpField;
+    }
+
+    public String getDestinationPortField() {
+        return destinationPortField;
+    }
+
+    public String getIanaNumberField() {
+        return ianaNumberField;
+    }
+
+    public String getTransportField() {
+        return transportField;
+    }
+
+    public String getIcmpTypeField() {
+        return icmpTypeField;
+    }
+
+    public String getIcmpCodeField() {
+        return icmpCodeField;
+    }
+
+    public String getTargetField() {
+        return targetField;
+    }
+
+    public MessageDigest getMessageDigest() {
+        return messageDigest;
+    }
+
+    public byte[] getSeed() {
+        return seed;
+    }
+
+    public boolean getIgnoreMissing() {
+        return ignoreMissing;
+    }
+
+    @Override
+    public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
+        Flow flow = buildFlow(ingestDocument);
+        if (flow == null) {
+            if (ignoreMissing) {
+                return ingestDocument;
+            } else {
+                throw new IllegalArgumentException("unable to construct flow from document");
+            }
+        }
+
+        ingestDocument.setFieldValue(targetField, flow.toCommunityId(messageDigest, seed));
+        return ingestDocument;
+    }
+
+    private Flow buildFlow(IngestDocument d) {
+        String sourceIpAddrString = d.getFieldValue(sourceIpField, String.class, ignoreMissing);
+        if (sourceIpAddrString == null) {
+            return null;
+        }
+
+        String destIpAddrString = d.getFieldValue(destinationIpField, String.class, ignoreMissing);
+        if (destIpAddrString == null) {
+            return null;
+        }
+
+        Flow flow = new Flow();
+        flow.source = InetAddresses.forString(sourceIpAddrString);
+        flow.destination = InetAddresses.forString(destIpAddrString);
+
+        Object protocol = d.getFieldValue(ianaNumberField, Object.class, true);
+        if (protocol == null) {
+            protocol = d.getFieldValue(transportField, Object.class, ignoreMissing);
+            if (protocol == null) {
+                return null;
+            }
+        }
+        flow.protocol = Transport.fromObject(protocol);
+
+        switch (flow.protocol) {
+            case Tcp:
+            case Udp:
+            case Sctp:
+                flow.sourcePort = parseIntFromObjectOrString(d.getFieldValue(sourcePortField, Object.class, ignoreMissing), "source port");
+                if (flow.sourcePort == 0) {
+                    throw new IllegalArgumentException("invalid source port [0]");
+                }
+
+                flow.destinationPort = parseIntFromObjectOrString(
+                    d.getFieldValue(destinationPortField, Object.class, ignoreMissing),
+                    "destination port"
+                );
+                if (flow.destinationPort == 0) {
+                    throw new IllegalArgumentException("invalid destination port [0]");
+                }
+                break;
+            case Icmp:
+            case IcmpIpV6:
+                // tolerate missing or invalid ICMP types and codes
+                flow.icmpType = parseIntFromObjectOrString(d.getFieldValue(icmpTypeField, Object.class, true), "icmp type");
+                flow.icmpCode = parseIntFromObjectOrString(d.getFieldValue(icmpCodeField, Object.class, true), "icmp code");
+                break;
+        }
+
+        return flow;
+    }
+
+    @Override
+    public String getType() {
+        return TYPE;
+    }
+
+    /**
+     * Converts an integer in the range of an unsigned 16-bit integer to a big-endian byte pair
+     */
+    static byte[] toUint16(int num) {
+        if (num < 0 || num > 65535) {
+            throw new IllegalStateException("number [" + num + "] must be a value between 0 and 65535");
+        }
+        return new byte[] { (byte) (num >> 8), (byte) num };
+    }
+
+    /**
+     * Attempts to coerce an object to an integer
+     */
+    static int parseIntFromObjectOrString(Object o, String fieldName) {
+        if (o == null) {
+            return 0;
+        } else if (o instanceof Number) {
+            return (int) o;
+        } else if (o instanceof String) {
+            try {
+                return Integer.parseInt((String) o);
+            } catch (NumberFormatException e) {
+                // fall through to IllegalArgumentException below
+            }
+        }
+        throw new IllegalArgumentException("unable to parse " + fieldName + " [" + o + "]");
+    }
+
+    public static final class Factory implements Processor.Factory {
+
+        static final String DEFAULT_SOURCE_IP = "source.ip";
+        static final String DEFAULT_SOURCE_PORT = "source.port";
+        static final String DEFAULT_DEST_IP = "destination.ip";
+        static final String DEFAULT_DEST_PORT = "destination.port";
+        static final String DEFAULT_IANA_NUMBER = "network.iana_number";
+        static final String DEFAULT_TRANSPORT = "network.transport";
+        static final String DEFAULT_ICMP_TYPE = "icmp.type";
+        static final String DEFAULT_ICMP_CODE = "icmp.code";
+        static final String DEFAULT_TARGET = "network.community_id";
+
+        @Override
+        public CommunityIdProcessor create(
+            Map<String, Processor.Factory> registry,
+            String processorTag,
+            String description,
+            Map<String, Object> config
+        ) throws Exception {
+            String sourceIpField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "source_ip", DEFAULT_SOURCE_IP);
+            String sourcePortField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "source_port", DEFAULT_SOURCE_PORT);
+            String destIpField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "destination_ip", DEFAULT_DEST_IP);
+            String destPortField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "destination_port", DEFAULT_DEST_PORT);
+            String ianaNumberField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "iana_number", DEFAULT_IANA_NUMBER);
+            String transportField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "transport", DEFAULT_TRANSPORT);
+            String icmpTypeField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "icmp_type", DEFAULT_ICMP_TYPE);
+            String icmpCodeField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "icmp_code", DEFAULT_ICMP_CODE);
+            String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", DEFAULT_TARGET);
+            int seedInt = ConfigurationUtils.readIntProperty(TYPE, processorTag, config, "seed", 0);
+            if (seedInt < 0 || seedInt > 65535) {
+                throw newConfigurationException(TYPE, processorTag, "seed", "must be a value between 0 and 65535");
+            }
+            MessageDigest messageDigest;
+            try {
+                messageDigest = MessageDigest.getInstance("SHA-1");
+            } catch (NoSuchAlgorithmException e) {
+                throw new IllegalStateException("unable to obtain SHA-1 hasher", e);
+            }
+
+            boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", true);
+            return new CommunityIdProcessor(
+                processorTag,
+                description,
+                sourceIpField,
+                sourcePortField,
+                destIpField,
+                destPortField,
+                ianaNumberField,
+                transportField,
+                icmpTypeField,
+                icmpCodeField,
+                targetField,
+                messageDigest,
+                toUint16(seedInt),
+                ignoreMissing
+            );
+        }
+    }
+
+    /**
+     * Represents flow data per https://github.com/corelight/community-id-spec
+     */
+    public static final class Flow {
+
+        private static final List<Transport> TRANSPORTS_WITH_PORTS = List.of(
+            Transport.Tcp,
+            Transport.Udp,
+            Transport.Sctp,
+            Transport.Icmp,
+            Transport.IcmpIpV6
+        );
+
+        InetAddress source;
+        InetAddress destination;
+        Transport protocol;
+        int sourcePort;
+        int destinationPort;
+        int icmpType;
+        int icmpCode;
+
+        /**
+         * @return true iff the source address/port is numerically less than the destination address/port as described
+         * at https://github.com/corelight/community-id-spec
+         */
+        boolean isOrdered() {
+            int result = new BigInteger(1, source.getAddress()).compareTo(new BigInteger(1, destination.getAddress()));
+            return result < 0 || (result == 0 && sourcePort < destinationPort);
+        }
+
+        byte[] toBytes() {
+            boolean hasPort = TRANSPORTS_WITH_PORTS.contains(protocol);
+            int len = source.getAddress().length + destination.getAddress().length + 2 + (hasPort ? 4 : 0);
+            ByteBuffer bb = ByteBuffer.allocate(len);
+
+            boolean isOneWay = false;
+            if (protocol == Transport.Icmp || protocol == Transport.IcmpIpV6) {
+                // ICMP protocols populate port fields with ICMP data
+                Integer equivalent = IcmpType.codeEquivalent(icmpType, protocol == Transport.IcmpIpV6);
+                isOneWay = equivalent == null;
+                sourcePort = icmpType;
+                destinationPort = equivalent == null ? icmpCode : equivalent;
+            }
+
+            boolean keepOrder = isOrdered() || ((protocol == Transport.Icmp || protocol == Transport.IcmpIpV6) && isOneWay);
+            bb.put(keepOrder ? source.getAddress() : destination.getAddress());
+            bb.put(keepOrder ? destination.getAddress() : source.getAddress());
+            bb.put(toUint16(protocol.getTransportNumber() << 8));
+
+            if (hasPort) {
+                bb.put(keepOrder ? toUint16(sourcePort) : toUint16(destinationPort));
+                bb.put(keepOrder ? toUint16(destinationPort) : toUint16(sourcePort));
+            }
+
+            return bb.array();
+        }
+
+        String toCommunityId(MessageDigest md, byte[] seed) {
+            md.update(seed);
+            byte[] encodedBytes = Base64.getEncoder().encode(md.digest(toBytes()));
+            return "1:" + new String(encodedBytes, StandardCharsets.UTF_8);
+        }
+    }
+
+    public enum Transport {
+        Icmp(1),
+        Igmp(2),
+        Tcp(6),
+        Udp(17),
+        Gre(47),
+        IcmpIpV6(58),
+        Eigrp(88),
+        Ospf(89),
+        Pim(103),
+        Sctp(132);
+
+        private final int transportNumber;
+
+        private static final Map<String, Transport> TRANSPORT_NAMES;
+
+        static {
+            TRANSPORT_NAMES = new HashMap<>();
+            TRANSPORT_NAMES.put("icmp", Icmp);
+            TRANSPORT_NAMES.put("igmp", Igmp);
+            TRANSPORT_NAMES.put("tcp", Tcp);
+            TRANSPORT_NAMES.put("udp", Udp);
+            TRANSPORT_NAMES.put("gre", Gre);
+            TRANSPORT_NAMES.put("ipv6-icmp", IcmpIpV6);
+            TRANSPORT_NAMES.put("icmpv6", IcmpIpV6);
+            TRANSPORT_NAMES.put("eigrp", Eigrp);
+            TRANSPORT_NAMES.put("ospf", Ospf);
+            TRANSPORT_NAMES.put("pim", Pim);
+            TRANSPORT_NAMES.put("sctp", Sctp);
+        }
+
+        Transport(int transportNumber) {
+            this.transportNumber = transportNumber;
+        }
+
+        public int getTransportNumber() {
+            return transportNumber;
+        }
+
+        public static Transport fromNumber(int transportNumber) {
+            switch (transportNumber) {
+                case 1:
+                    return Icmp;
+                case 2:
+                    return Igmp;
+                case 6:
+                    return Tcp;
+                case 17:
+                    return Udp;
+                case 47:
+                    return Gre;
+                case 58:
+                    return IcmpIpV6;
+                case 88:
+                    return Eigrp;
+                case 89:
+                    return Ospf;
+                case 103:
+                    return Pim;
+                case 132:
+                    return Sctp;
+                default:
+                    throw new IllegalArgumentException("unknown transport protocol number [" + transportNumber + "]");
+            }
+        }
+
+        public static Transport fromObject(Object o) {
+            if (o instanceof Number) {
+                return fromNumber(((Number) o).intValue());
+            } else if (o instanceof String) {
+                String protocolStr = (String) o;
+
+                // check if matches protocol name
+                if (TRANSPORT_NAMES.containsKey(protocolStr.toLowerCase(Locale.ROOT))) {
+                    return TRANSPORT_NAMES.get(protocolStr.toLowerCase(Locale.ROOT));
+                }
+
+                // check if convertible to protocol number
+                try {
+                    int protocolNumber = Integer.parseInt(protocolStr);
+                    return fromNumber(protocolNumber);
+                } catch (NumberFormatException e) {
+                    // fall through to IllegalArgumentException
+                }
+
+                throw new IllegalArgumentException("could not convert string [" + protocolStr + "] to transport protocol");
+            } else {
+                throw new IllegalArgumentException(
+                    "could not convert value of type [" + o.getClass().getName() + "] to transport protocol"
+                );
+            }
+        }
+    }
+
+    public enum IcmpType {
+        EchoReply(0),
+        EchoRequest(8),
+        RouterAdvertisement(9),
+        RouterSolicitation(10),
+        TimestampRequest(13),
+        TimestampReply(14),
+        InfoRequest(15),
+        InfoReply(16),
+        AddressMaskRequest(17),
+        AddressMaskReply(18),
+        V6EchoRequest(128),
+        V6EchoReply(129),
+        V6RouterSolicitation(133),
+        V6RouterAdvertisement(134),
+        V6NeighborSolicitation(135),
+        V6NeighborAdvertisement(136),
+        V6MLDv1MulticastListenerQueryMessage(130),
+        V6MLDv1MulticastListenerReportMessage(131),
+        V6WhoAreYouRequest(139),
+        V6WhoAreYouReply(140),
+        V6HomeAddressDiscoveryRequest(144),
+        V6HomeAddressDiscoveryResponse(145);
+
+        private static final Map<Integer, Integer> ICMP_V4_CODE_EQUIVALENTS;
+        private static final Map<Integer, Integer> ICMP_V6_CODE_EQUIVALENTS;
+
+        static {
+            ICMP_V4_CODE_EQUIVALENTS = new HashMap<>();
+            ICMP_V4_CODE_EQUIVALENTS.put(EchoRequest.getType(), EchoReply.getType());
+            ICMP_V4_CODE_EQUIVALENTS.put(EchoReply.getType(), EchoRequest.getType());
+            ICMP_V4_CODE_EQUIVALENTS.put(TimestampRequest.getType(), TimestampReply.getType());
+            ICMP_V4_CODE_EQUIVALENTS.put(TimestampReply.getType(), TimestampRequest.getType());
+            ICMP_V4_CODE_EQUIVALENTS.put(InfoRequest.getType(), InfoReply.getType());
+            ICMP_V4_CODE_EQUIVALENTS.put(RouterSolicitation.getType(), RouterAdvertisement.getType());
+            ICMP_V4_CODE_EQUIVALENTS.put(RouterAdvertisement.getType(), RouterSolicitation.getType());
+            ICMP_V4_CODE_EQUIVALENTS.put(AddressMaskRequest.getType(), AddressMaskReply.getType());
+            ICMP_V4_CODE_EQUIVALENTS.put(AddressMaskReply.getType(), AddressMaskRequest.getType());
+
+            ICMP_V6_CODE_EQUIVALENTS = new HashMap<>();
+            ICMP_V6_CODE_EQUIVALENTS.put(V6EchoRequest.getType(), V6EchoReply.getType());
+            ICMP_V6_CODE_EQUIVALENTS.put(V6EchoReply.getType(), V6EchoRequest.getType());
+            ICMP_V6_CODE_EQUIVALENTS.put(V6RouterSolicitation.getType(), V6RouterAdvertisement.getType());
+            ICMP_V6_CODE_EQUIVALENTS.put(V6RouterAdvertisement.getType(), V6RouterSolicitation.getType());
+            ICMP_V6_CODE_EQUIVALENTS.put(V6NeighborAdvertisement.getType(), V6NeighborSolicitation.getType());
+            ICMP_V6_CODE_EQUIVALENTS.put(V6NeighborSolicitation.getType(), V6NeighborAdvertisement.getType());
+            ICMP_V6_CODE_EQUIVALENTS.put(V6MLDv1MulticastListenerQueryMessage.getType(), V6MLDv1MulticastListenerReportMessage.getType());
+            ICMP_V6_CODE_EQUIVALENTS.put(V6WhoAreYouRequest.getType(), V6WhoAreYouReply.getType());
+            ICMP_V6_CODE_EQUIVALENTS.put(V6WhoAreYouReply.getType(), V6WhoAreYouRequest.getType());
+            ICMP_V6_CODE_EQUIVALENTS.put(V6HomeAddressDiscoveryRequest.getType(), V6HomeAddressDiscoveryResponse.getType());
+            ICMP_V6_CODE_EQUIVALENTS.put(V6HomeAddressDiscoveryResponse.getType(), V6HomeAddressDiscoveryRequest.getType());
+        }
+
+        private final int type;
+
+        IcmpType(int type) {
+            this.type = type;
+        }
+
+        public int getType() {
+            return type;
+        }
+
+        public static IcmpType fromNumber(int type) {
+            switch (type) {
+                case 0:
+                    return EchoReply;
+                case 8:
+                    return EchoRequest;
+                case 9:
+                    return RouterAdvertisement;
+                case 10:
+                    return RouterSolicitation;
+                case 13:
+                    return TimestampRequest;
+                case 14:
+                    return TimestampReply;
+                case 15:
+                    return InfoRequest;
+                case 16:
+                    return InfoReply;
+                case 17:
+                    return AddressMaskRequest;
+                case 18:
+                    return AddressMaskReply;
+                case 128:
+                    return V6EchoRequest;
+                case 129:
+                    return V6EchoReply;
+                case 133:
+                    return V6RouterSolicitation;
+                case 134:
+                    return V6RouterAdvertisement;
+                case 135:
+                    return V6NeighborSolicitation;
+                case 136:
+                    return V6NeighborAdvertisement;
+                case 130:
+                    return V6MLDv1MulticastListenerQueryMessage;
+                case 131:
+                    return V6MLDv1MulticastListenerReportMessage;
+                case 139:
+                    return V6WhoAreYouRequest;
+                case 140:
+                    return V6WhoAreYouReply;
+                case 144:
+                    return V6HomeAddressDiscoveryRequest;
+                case 145:
+                    return V6HomeAddressDiscoveryResponse;
+                default:
+                    // don't fail if the type is unknown
+                    return EchoReply;
+            }
+        }
+
+        public static Integer codeEquivalent(int icmpType, boolean isIpV6) {
+            return isIpV6 ? ICMP_V6_CODE_EQUIVALENTS.get(icmpType) : ICMP_V4_CODE_EQUIVALENTS.get(icmpType);
+        }
+    }
+}

+ 6 - 1
x-pack/plugin/ingest/src/main/java/org/elasticsearch/xpack/ingest/IngestPlugin.java

@@ -15,6 +15,11 @@ public class IngestPlugin extends Plugin implements org.elasticsearch.plugins.In
 
     @Override
     public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
-        return Map.of(UriPartsProcessor.TYPE, new UriPartsProcessor.Factory());
+        return Map.of(
+            UriPartsProcessor.TYPE,
+            new UriPartsProcessor.Factory(),
+            CommunityIdProcessor.TYPE,
+            new CommunityIdProcessor.Factory()
+        );
     }
 }

+ 121 - 0
x-pack/plugin/ingest/src/test/java/org/elasticsearch/xpack/ingest/CommunityIdProcessorFactoryTests.java

@@ -0,0 +1,121 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.ingest;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.test.ESTestCase;
+import org.junit.Before;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_DEST_IP;
+import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_DEST_PORT;
+import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_IANA_NUMBER;
+import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_ICMP_CODE;
+import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_ICMP_TYPE;
+import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_SOURCE_IP;
+import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_SOURCE_PORT;
+import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_TARGET;
+import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_TRANSPORT;
+import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.toUint16;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+
+public class CommunityIdProcessorFactoryTests extends ESTestCase {
+
+    private CommunityIdProcessor.Factory factory;
+
+    @Before
+    public void init() {
+        factory = new CommunityIdProcessor.Factory();
+    }
+
+    public void testCreate() throws Exception {
+        Map<String, Object> config = new HashMap<>();
+
+        String sourceIpField = randomAlphaOfLength(6);
+        config.put("source_ip", sourceIpField);
+        String sourcePortField = randomAlphaOfLength(6);
+        config.put("source_port", sourcePortField);
+        String destIpField = randomAlphaOfLength(6);
+        config.put("destination_ip", destIpField);
+        String destPortField = randomAlphaOfLength(6);
+        config.put("destination_port", destPortField);
+        String ianaNumberField = randomAlphaOfLength(6);
+        config.put("iana_number", ianaNumberField);
+        String transportField = randomAlphaOfLength(6);
+        config.put("transport", transportField);
+        String icmpTypeField = randomAlphaOfLength(6);
+        config.put("icmp_type", icmpTypeField);
+        String icmpCodeField = randomAlphaOfLength(6);
+        config.put("icmp_code", icmpCodeField);
+        String targetField = randomAlphaOfLength(6);
+        config.put("target_field", targetField);
+        int seedInt = randomIntBetween(0, 65535);
+        config.put("seed", Integer.toString(seedInt));
+        boolean ignoreMissing = randomBoolean();
+        config.put("ignore_missing", ignoreMissing);
+
+        String processorTag = randomAlphaOfLength(10);
+        CommunityIdProcessor communityIdProcessor = factory.create(null, processorTag, null, config);
+        assertThat(communityIdProcessor.getTag(), equalTo(processorTag));
+        assertThat(communityIdProcessor.getSourceIpField(), equalTo(sourceIpField));
+        assertThat(communityIdProcessor.getSourcePortField(), equalTo(sourcePortField));
+        assertThat(communityIdProcessor.getDestinationIpField(), equalTo(destIpField));
+        assertThat(communityIdProcessor.getDestinationPortField(), equalTo(destPortField));
+        assertThat(communityIdProcessor.getIanaNumberField(), equalTo(ianaNumberField));
+        assertThat(communityIdProcessor.getTransportField(), equalTo(transportField));
+        assertThat(communityIdProcessor.getIcmpTypeField(), equalTo(icmpTypeField));
+        assertThat(communityIdProcessor.getIcmpCodeField(), equalTo(icmpCodeField));
+        assertThat(communityIdProcessor.getTargetField(), equalTo(targetField));
+        assertThat(communityIdProcessor.getSeed(), equalTo(toUint16(seedInt)));
+        assertThat(communityIdProcessor.getIgnoreMissing(), equalTo(ignoreMissing));
+    }
+
+    public void testSeed() throws Exception {
+        Map<String, Object> config = new HashMap<>();
+        String processorTag = randomAlphaOfLength(10);
+
+        // negative seeds are rejected
+        int tooSmallSeed = randomIntBetween(Integer.MIN_VALUE, -1);
+        config.put("seed", Integer.toString(tooSmallSeed));
+        ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> factory.create(null, processorTag, null, config));
+        assertThat(e.getMessage(), containsString("must be a value between 0 and 65535"));
+
+        // seeds >= 2^16 are rejected
+        int tooBigSeed = randomIntBetween(65536, Integer.MAX_VALUE);
+        config.put("seed", Integer.toString(tooBigSeed));
+        e = expectThrows(ElasticsearchException.class, () -> factory.create(null, processorTag, null, config));
+        assertThat(e.getMessage(), containsString("must be a value between 0 and 65535"));
+
+        // seeds between 0 and 2^16-1 are accepted
+        int justRightSeed = randomIntBetween(0, 65535);
+        byte[] expectedSeed = new byte[] { (byte) (justRightSeed >> 8), (byte) justRightSeed };
+        config.put("seed", Integer.toString(justRightSeed));
+        CommunityIdProcessor communityIdProcessor = factory.create(null, processorTag, null, config);
+        assertThat(communityIdProcessor.getSeed(), equalTo(expectedSeed));
+    }
+
+    public void testRequiredFields() throws Exception {
+        HashMap<String, Object> config = new HashMap<>();
+        String processorTag = randomAlphaOfLength(10);
+        CommunityIdProcessor communityIdProcessor = factory.create(null, processorTag, null, config);
+        assertThat(communityIdProcessor.getTag(), equalTo(processorTag));
+        assertThat(communityIdProcessor.getSourceIpField(), equalTo(DEFAULT_SOURCE_IP));
+        assertThat(communityIdProcessor.getSourcePortField(), equalTo(DEFAULT_SOURCE_PORT));
+        assertThat(communityIdProcessor.getDestinationIpField(), equalTo(DEFAULT_DEST_IP));
+        assertThat(communityIdProcessor.getDestinationPortField(), equalTo(DEFAULT_DEST_PORT));
+        assertThat(communityIdProcessor.getIanaNumberField(), equalTo(DEFAULT_IANA_NUMBER));
+        assertThat(communityIdProcessor.getTransportField(), equalTo(DEFAULT_TRANSPORT));
+        assertThat(communityIdProcessor.getIcmpTypeField(), equalTo(DEFAULT_ICMP_TYPE));
+        assertThat(communityIdProcessor.getIcmpCodeField(), equalTo(DEFAULT_ICMP_CODE));
+        assertThat(communityIdProcessor.getTargetField(), equalTo(DEFAULT_TARGET));
+        assertThat(communityIdProcessor.getSeed(), equalTo(toUint16(0)));
+        assertThat(communityIdProcessor.getIgnoreMissing(), equalTo(true));
+    }
+}

+ 316 - 0
x-pack/plugin/ingest/src/test/java/org/elasticsearch/xpack/ingest/CommunityIdProcessorTests.java

@@ -0,0 +1,316 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.ingest;
+
+import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.test.ESTestCase;
+import org.junit.Before;
+
+import java.security.MessageDigest;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_DEST_IP;
+import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_DEST_PORT;
+import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_IANA_NUMBER;
+import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_ICMP_CODE;
+import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_ICMP_TYPE;
+import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_SOURCE_IP;
+import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_SOURCE_PORT;
+import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_TARGET;
+import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_TRANSPORT;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+
+public class CommunityIdProcessorTests extends ESTestCase {
+
+    // NOTE: all test methods beginning with "testBeats" are intended to duplicate the unit tests for the Beats
+    // community_id processor (see Github link below) to ensure that this processor produces the same values. To
+    // the extent possible, these tests should be kept in sync.
+    //
+    // https://github.com/elastic/beats/blob/master/libbeat/processors/communityid/communityid_test.go
+
+    private Map<String, Object> event;
+    private MessageDigest messageDigest;
+
+    @Before
+    public void setup() throws Exception {
+        messageDigest = MessageDigest.getInstance("SHA-1");
+        event = buildEvent();
+    }
+
+    private Map<String, Object> buildEvent() {
+        event = new HashMap<>();
+        var source = new HashMap<String, Object>();
+        source.put("ip", "128.232.110.120");
+        source.put("port", 34855);
+        event.put("source", source);
+        var destination = new HashMap<String, Object>();
+        destination.put("ip", "66.35.250.204");
+        destination.put("port", 80);
+        event.put("destination", destination);
+        var network = new HashMap<String, Object>();
+        network.put("transport", "TCP");
+        event.put("network", network);
+        return event;
+    }
+
+    public void testBeatsValid() throws Exception {
+        testCommunityIdProcessor(event, "1:LQU9qZlK+B5F3KDmev6m5PMibrg=");
+    }
+
+    public void testBeatsSeed() throws Exception {
+        testCommunityIdProcessor(event, 123, "1:hTSGlFQnR58UCk+NfKRZzA32dPg=");
+    }
+
+    public void testBeatsInvalidSourceIp() throws Exception {
+        @SuppressWarnings("unchecked")
+        var source = (Map<String, Object>) event.get("source");
+        source.put("ip", 2162716280L);
+        IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> testCommunityIdProcessor(event, null));
+        assertThat(e.getMessage(), containsString("field [source.ip] of type [java.lang.Long] cannot be cast to [java.lang.String]"));
+    }
+
+    public void testBeatsInvalidSourcePort() throws Exception {
+        @SuppressWarnings("unchecked")
+        var source = (Map<String, Object>) event.get("source");
+        source.put("port", 0);
+        IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> testCommunityIdProcessor(event, null));
+        assertThat(e.getMessage(), containsString("invalid source port"));
+    }
+
+    public void testBeatsInvalidDestinationIp() throws Exception {
+        @SuppressWarnings("unchecked")
+        var destination = (Map<String, Object>) event.get("destination");
+        String invalidIp = "308.111.1.2.3";
+        destination.put("ip", invalidIp);
+        IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> testCommunityIdProcessor(event, null));
+        assertThat(e.getMessage(), containsString("'" + invalidIp + "' is not an IP string literal"));
+    }
+
+    public void testBeatsInvalidDestinationPort() throws Exception {
+        @SuppressWarnings("unchecked")
+        var destination = (Map<String, Object>) event.get("destination");
+        destination.put("port", null);
+        IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> testCommunityIdProcessor(event, null));
+        assertThat(e.getMessage(), containsString("invalid destination port [0]"));
+    }
+
+    public void testBeatsUnknownProtocol() throws Exception {
+        @SuppressWarnings("unchecked")
+        var network = (Map<String, Object>) event.get("network");
+        network.put("transport", "xyz");
+        IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> testCommunityIdProcessor(event, null));
+        assertThat(e.getMessage(), containsString("could not convert string [xyz] to transport protocol"));
+    }
+
+    public void testBeatsIcmp() throws Exception {
+        @SuppressWarnings("unchecked")
+        var network = (Map<String, Object>) event.get("network");
+        network.put("transport", "icmp");
+        var icmp = new HashMap<String, Object>();
+        icmp.put("type", 3);
+        icmp.put("code", 3);
+        event.put("icmp", icmp);
+        testCommunityIdProcessor(event, "1:KF3iG9XD24nhlSy4r1TcYIr5mfE=");
+    }
+
+    public void testBeatsIcmpWithoutTypeOrCode() throws Exception {
+        @SuppressWarnings("unchecked")
+        var network = (Map<String, Object>) event.get("network");
+        network.put("transport", "icmp");
+        testCommunityIdProcessor(event, "1:PAE85ZfR4SbNXl5URZwWYyDehwU=");
+    }
+
+    public void testBeatsIgmp() throws Exception {
+        @SuppressWarnings("unchecked")
+        var network = (Map<String, Object>) event.get("network");
+        network.put("transport", "igmp");
+        @SuppressWarnings("unchecked")
+        var source = (Map<String, Object>) event.get("source");
+        source.remove("port");
+        @SuppressWarnings("unchecked")
+        var destination = (Map<String, Object>) event.get("destination");
+        destination.remove("port");
+        testCommunityIdProcessor(event, "1:D3t8Q1aFA6Ev0A/AO4i9PnU3AeI=");
+    }
+
+    public void testBeatsProtocolNumberAsString() throws Exception {
+        @SuppressWarnings("unchecked")
+        var source = (Map<String, Object>) event.get("source");
+        source.remove("port");
+        @SuppressWarnings("unchecked")
+        var destination = (Map<String, Object>) event.get("destination");
+        destination.remove("port");
+        @SuppressWarnings("unchecked")
+        var network = (Map<String, Object>) event.get("network");
+        network.put("transport", "2");
+        testCommunityIdProcessor(event, "1:D3t8Q1aFA6Ev0A/AO4i9PnU3AeI=");
+    }
+
+    public void testBeatsProtocolNumber() throws Exception {
+        @SuppressWarnings("unchecked")
+        var source = (Map<String, Object>) event.get("source");
+        source.remove("port");
+        @SuppressWarnings("unchecked")
+        var destination = (Map<String, Object>) event.get("destination");
+        destination.remove("port");
+        @SuppressWarnings("unchecked")
+        var network = (Map<String, Object>) event.get("network");
+        network.put("transport", 2);
+        testCommunityIdProcessor(event, "1:D3t8Q1aFA6Ev0A/AO4i9PnU3AeI=");
+    }
+
+    public void testBeatsIanaNumber() throws Exception {
+        @SuppressWarnings("unchecked")
+        var network = (Map<String, Object>) event.get("network");
+        network.remove("transport");
+        network.put("iana_number", CommunityIdProcessor.Transport.Tcp.getTransportNumber());
+        testCommunityIdProcessor(event, "1:LQU9qZlK+B5F3KDmev6m5PMibrg=");
+    }
+
+    public void testIpv6() throws Exception {
+        @SuppressWarnings("unchecked")
+        var source = (Map<String, Object>) event.get("source");
+        source.put("ip", "2001:0db8:85a3:0000:0000:8a2e:0370:7334");
+        @SuppressWarnings("unchecked")
+        var destination = (Map<String, Object>) event.get("destination");
+        destination.put("ip", "2001:0:9d38:6ab8:1c48:3a1c:a95a:b1c2");
+        testCommunityIdProcessor(event, "1:YC1+javPJ2LpK5xVyw1udfT83Qs=");
+    }
+
+    public void testIcmpWithCodeEquivalent() throws Exception {
+        @SuppressWarnings("unchecked")
+        var network = (Map<String, Object>) event.get("network");
+        network.put("transport", "icmp");
+        var icmp = new HashMap<String, Object>();
+        icmp.put("type", 10);
+        icmp.put("code", 3);
+        event.put("icmp", icmp);
+        testCommunityIdProcessor(event, "1:L8wnzpmRHIESLqLBy+zTqW3Pmqs=");
+    }
+
+    public void testStringAndNumber() throws Exception {
+        // iana
+        event = buildEvent();
+        @SuppressWarnings("unchecked")
+        var network = (Map<String, Object>) event.get("network");
+        network.remove("transport");
+        network.put("iana_number", CommunityIdProcessor.Transport.Tcp.getTransportNumber());
+        testCommunityIdProcessor(event, "1:LQU9qZlK+B5F3KDmev6m5PMibrg=");
+
+        network.put("iana_number", Integer.toString(CommunityIdProcessor.Transport.Tcp.getTransportNumber()));
+        testCommunityIdProcessor(event, "1:LQU9qZlK+B5F3KDmev6m5PMibrg=");
+
+        // protocol number
+        event = buildEvent();
+        @SuppressWarnings("unchecked")
+        var source = (Map<String, Object>) event.get("source");
+        source.remove("port");
+        @SuppressWarnings("unchecked")
+        var destination = (Map<String, Object>) event.get("destination");
+        destination.remove("port");
+        @SuppressWarnings("unchecked")
+        var network2 = (Map<String, Object>) event.get("network");
+        network2.put("transport", 2);
+        testCommunityIdProcessor(event, "1:D3t8Q1aFA6Ev0A/AO4i9PnU3AeI=");
+
+        network2.put("transport", "2");
+        testCommunityIdProcessor(event, "1:D3t8Q1aFA6Ev0A/AO4i9PnU3AeI=");
+
+        // source port
+        event = buildEvent();
+        @SuppressWarnings("unchecked")
+        var source2 = (Map<String, Object>) event.get("source");
+        source2.put("port", 34855);
+        testCommunityIdProcessor(event, "1:LQU9qZlK+B5F3KDmev6m5PMibrg=");
+
+        source2.put("port", "34855");
+        testCommunityIdProcessor(event, "1:LQU9qZlK+B5F3KDmev6m5PMibrg=");
+
+        // dest port
+        event = buildEvent();
+        @SuppressWarnings("unchecked")
+        var dest2 = (Map<String, Object>) event.get("destination");
+        dest2.put("port", 80);
+        testCommunityIdProcessor(event, "1:LQU9qZlK+B5F3KDmev6m5PMibrg=");
+
+        dest2.put("port", "80");
+        testCommunityIdProcessor(event, "1:LQU9qZlK+B5F3KDmev6m5PMibrg=");
+
+        // icmp type and code
+        event = buildEvent();
+        @SuppressWarnings("unchecked")
+        var network3 = (Map<String, Object>) event.get("network");
+        network3.put("transport", "icmp");
+        var icmp = new HashMap<String, Object>();
+        icmp.put("type", 3);
+        icmp.put("code", 3);
+        event.put("icmp", icmp);
+        testCommunityIdProcessor(event, "1:KF3iG9XD24nhlSy4r1TcYIr5mfE=");
+
+        icmp = new HashMap<String, Object>();
+        icmp.put("type", "3");
+        icmp.put("code", "3");
+        event.put("icmp", icmp);
+        testCommunityIdProcessor(event, "1:KF3iG9XD24nhlSy4r1TcYIr5mfE=");
+    }
+
+    public void testIgnoreMissing() throws Exception {
+        @SuppressWarnings("unchecked")
+        var network = (Map<String, Object>) event.get("network");
+        network.remove("transport");
+        testCommunityIdProcessor(event, 0, null, true);
+    }
+
+    private void testCommunityIdProcessor(Map<String, Object> source, String expectedHash) throws Exception {
+        testCommunityIdProcessor(source, 0, expectedHash);
+    }
+
+    private void testCommunityIdProcessor(Map<String, Object> source, int seed, String expectedHash) throws Exception {
+        testCommunityIdProcessor(source, seed, expectedHash, false);
+    }
+
+    private void testCommunityIdProcessor(Map<String, Object> source, int seed, String expectedHash, boolean ignoreMissing)
+        throws Exception {
+
+        var processor = new CommunityIdProcessor(
+            null,
+            null,
+            DEFAULT_SOURCE_IP,
+            DEFAULT_SOURCE_PORT,
+            DEFAULT_DEST_IP,
+            DEFAULT_DEST_PORT,
+            DEFAULT_IANA_NUMBER,
+            DEFAULT_TRANSPORT,
+            DEFAULT_ICMP_TYPE,
+            DEFAULT_ICMP_CODE,
+            DEFAULT_TARGET,
+            messageDigest,
+            CommunityIdProcessor.toUint16(seed),
+            ignoreMissing
+        );
+
+        IngestDocument input = new IngestDocument(source, Map.of());
+        IngestDocument output = processor.execute(input);
+
+        String hash = output.getFieldValue(DEFAULT_TARGET, String.class, ignoreMissing);
+        assertThat(hash, equalTo(expectedHash));
+    }
+
+    public void testTransportEnum() {
+        for (CommunityIdProcessor.Transport t : CommunityIdProcessor.Transport.values()) {
+            assertThat(CommunityIdProcessor.Transport.fromNumber(t.getTransportNumber()), equalTo(t));
+        }
+    }
+
+    public void testIcmpTypeEnum() {
+        for (CommunityIdProcessor.IcmpType i : CommunityIdProcessor.IcmpType.values()) {
+            assertThat(CommunityIdProcessor.IcmpType.fromNumber(i.getType()), equalTo(i));
+        }
+    }
+}