浏览代码

Network direction processor (#66644)

Andrew Stucki 4 年之前
父节点
当前提交
eb2f344fb4

+ 3 - 9
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/expression/function/scalar/string/CIDRUtils.java → x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/network/CIDRUtils.java

@@ -4,11 +4,10 @@
  * you may not use this file except in compliance with the Elastic License.
  */
 
-package org.elasticsearch.xpack.eql.expression.function.scalar.string;
+package org.elasticsearch.xpack.core.common.network;
 
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.network.InetAddresses;
-import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
 
 import java.net.InetAddress;
 import java.util.Arrays;
@@ -21,7 +20,6 @@ public class CIDRUtils {
     }
 
     public static boolean isInRange(String address, String... cidrAddresses) {
-        try {
             // Check if address is parsable first
             byte[] addr = InetAddresses.forString(address).getAddress();
 
@@ -42,10 +40,6 @@ public class CIDRUtils {
                 }
                 if (isBetween(addr, lower, upper)) return true;
             }
-        } catch (IllegalArgumentException e) {
-            throw new EqlIllegalArgumentException(e.getMessage());
-        }
-
         return false;
     }
 
@@ -55,7 +49,7 @@ public class CIDRUtils {
 
         if (prefixLength < 0 || prefixLength > 8 * value.getAddress().length) {
             throw new IllegalArgumentException("illegal prefixLength '" + prefixLength +
-                    "'. Must be 0-32 for IPv4 ranges, 0-128 for IPv6 ranges");
+                "'. Must be 0-32 for IPv4 ranges, 0-128 for IPv6 ranges");
         }
 
         byte[] lower = value.getAddress();
@@ -77,7 +71,7 @@ public class CIDRUtils {
             upper = encode(upper);
         }
         return Arrays.compareUnsigned(lower, addr) <= 0 &&
-                Arrays.compareUnsigned(upper, addr) >= 0;
+            Arrays.compareUnsigned(upper, addr) >= 0;
     }
 
     // Borrowed from Lucene to make this consistent IP fields matching for the mix of IPv4 and IPv6 values

+ 1 - 1
x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/expression/function/scalar/string/CIDRUtilsTests.java → x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/network/CIDRUtilsTests.java

@@ -4,7 +4,7 @@
  * you may not use this file except in compliance with the Elastic License.
  */
 
-package org.elasticsearch.xpack.eql.expression.function.scalar.string;
+package org.elasticsearch.xpack.core.common.network;
 
 import org.elasticsearch.test.ESTestCase;
 

+ 7 - 1
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/expression/function/scalar/string/CIDRMatchFunctionProcessor.java

@@ -7,7 +7,9 @@ package org.elasticsearch.xpack.eql.expression.function.scalar.string;
 
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
 import org.elasticsearch.xpack.ql.expression.gen.processor.Processor;
+import org.elasticsearch.xpack.core.common.network.CIDRUtils;
 import org.elasticsearch.xpack.ql.util.Check;
 
 import java.io.IOException;
@@ -66,7 +68,11 @@ public class CIDRMatchFunctionProcessor implements Processor {
             Check.isString(address);
             arr[i++] = (String)address;
         }
-        return CIDRUtils.isInRange((String)source, arr);
+        try {
+            return CIDRUtils.isInRange((String)source, arr);
+        } catch (IllegalArgumentException e) {
+            throw new EqlIllegalArgumentException(e.getMessage());
+        }
     }
 
     protected Processor source() {

+ 2 - 0
x-pack/plugin/ingest/src/main/java/org/elasticsearch/xpack/ingest/IngestPlugin.java

@@ -18,6 +18,8 @@ public class IngestPlugin extends Plugin implements org.elasticsearch.plugins.In
         return Map.of(
             UriPartsProcessor.TYPE,
             new UriPartsProcessor.Factory(),
+            NetworkDirectionProcessor.TYPE,
+            new NetworkDirectionProcessor.Factory(),
             CommunityIdProcessor.TYPE,
             new CommunityIdProcessor.Factory()
         );

+ 257 - 0
x-pack/plugin/ingest/src/main/java/org/elasticsearch/xpack/ingest/NetworkDirectionProcessor.java

@@ -0,0 +1,257 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.ingest;
+
+import org.elasticsearch.ingest.AbstractProcessor;
+import org.elasticsearch.ingest.ConfigurationUtils;
+import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.Processor;
+import org.elasticsearch.common.network.InetAddresses;
+import org.elasticsearch.xpack.core.common.network.CIDRUtils;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Arrays;
+
+import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
+
+public class NetworkDirectionProcessor extends AbstractProcessor {
+    static final byte[] UNDEFINED_IP4 = new byte[] { 0, 0, 0, 0 };
+    static final byte[] UNDEFINED_IP6 = new byte[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
+    static final byte[] BROADCAST_IP4 = new byte[] { (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff };
+
+    public static final String TYPE = "network_direction";
+
+    public static final String DIRECTION_INTERNAL = "internal";
+    public static final String DIRECTION_EXTERNAL = "external";
+    public static final String DIRECTION_INBOUND = "inbound";
+    public static final String DIRECTION_OUTBOUND = "outbound";
+
+    private static final String LOOPBACK_NAMED_NETWORK = "loopback";
+    private static final String GLOBAL_UNICAST_NAMED_NETWORK = "global_unicast";
+    private static final String UNICAST_NAMED_NETWORK = "unicast";
+    private static final String LINK_LOCAL_UNICAST_NAMED_NETWORK = "link_local_unicast";
+    private static final String INTERFACE_LOCAL_NAMED_NETWORK = "interface_local_multicast";
+    private static final String LINK_LOCAL_MULTICAST_NAMED_NETWORK = "link_local_multicast";
+    private static final String MULTICAST_NAMED_NETWORK = "multicast";
+    private static final String UNSPECIFIED_NAMED_NETWORK = "unspecified";
+    private static final String PRIVATE_NAMED_NETWORK = "private";
+    private static final String PUBLIC_NAMED_NETWORK = "public";
+
+    private final String sourceIpField;
+    private final String destinationIpField;
+    private final String targetField;
+    private final List<String> internalNetworks;
+    private final boolean ignoreMissing;
+
+    NetworkDirectionProcessor(
+        String tag,
+        String description,
+        String sourceIpField,
+        String destinationIpField,
+        String targetField,
+        List<String> internalNetworks,
+        boolean ignoreMissing
+    ) {
+        super(tag, description);
+        this.sourceIpField = sourceIpField;
+        this.destinationIpField = destinationIpField;
+        this.targetField = targetField;
+        this.internalNetworks = internalNetworks;
+        this.ignoreMissing = ignoreMissing;
+    }
+
+    public String getSourceIpField() {
+        return sourceIpField;
+    }
+
+    public String getDestinationIpField() {
+        return destinationIpField;
+    }
+
+    public String getTargetField() {
+        return targetField;
+    }
+
+    public List<String> getInternalNetworks() {
+        return internalNetworks;
+    }
+
+    public boolean getIgnoreMissing() {
+        return ignoreMissing;
+    }
+
+    @Override
+    public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
+        String direction = getDirection(ingestDocument);
+        if (direction == null) {
+            if (ignoreMissing) {
+                return ingestDocument;
+            } else {
+                throw new IllegalArgumentException("unable to calculate network direction from document");
+            }
+        }
+
+        ingestDocument.setFieldValue(targetField, direction);
+        return ingestDocument;
+    }
+
+    private String getDirection(IngestDocument d) {
+        if (internalNetworks == null) {
+            return null;
+        }
+
+        String sourceIpAddrString = d.getFieldValue(sourceIpField, String.class, ignoreMissing);
+        if (sourceIpAddrString == null) {
+            return null;
+        }
+
+        String destIpAddrString = d.getFieldValue(destinationIpField, String.class, ignoreMissing);
+        if (destIpAddrString == null) {
+            return null;
+        }
+
+        boolean sourceInternal = isInternal(sourceIpAddrString);
+        boolean destinationInternal = isInternal(destIpAddrString);
+
+        if (sourceInternal && destinationInternal) {
+            return DIRECTION_INTERNAL;
+        }
+        if (sourceInternal) {
+            return DIRECTION_OUTBOUND;
+        }
+        if (destinationInternal) {
+            return DIRECTION_INBOUND;
+        }
+        return DIRECTION_EXTERNAL;
+    }
+
+    private boolean isInternal(String ip) {
+        for (String network : internalNetworks) {
+            if (inNetwork(ip, network)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean inNetwork(String ip, String network) {
+        InetAddress address = InetAddresses.forString(ip);
+        switch (network) {
+            case LOOPBACK_NAMED_NETWORK:
+                return isLoopback(address);
+            case GLOBAL_UNICAST_NAMED_NETWORK:
+            case UNICAST_NAMED_NETWORK:
+                return isUnicast(address);
+            case LINK_LOCAL_UNICAST_NAMED_NETWORK:
+                return isLinkLocalUnicast(address);
+            case INTERFACE_LOCAL_NAMED_NETWORK:
+                return isInterfaceLocalMulticast(address);
+            case LINK_LOCAL_MULTICAST_NAMED_NETWORK:
+                return isLinkLocalMulticast(address);
+            case MULTICAST_NAMED_NETWORK:
+                return isMulticast(address);
+            case UNSPECIFIED_NAMED_NETWORK:
+                return isUnspecified(address);
+            case PRIVATE_NAMED_NETWORK:
+                return isPrivate(ip);
+            case PUBLIC_NAMED_NETWORK:
+                return isPublic(ip);
+            default:
+                return CIDRUtils.isInRange(ip, network);
+        }
+    }
+
+    private boolean isLoopback(InetAddress ip) {
+        return ip.isLoopbackAddress();
+    }
+
+    private boolean isUnicast(InetAddress ip) {
+        return Arrays.equals(ip.getAddress(), BROADCAST_IP4) == false
+            && isUnspecified(ip) == false
+            && isLoopback(ip) == false
+            && isMulticast(ip) == false
+            && isLinkLocalUnicast(ip) == false;
+    }
+
+    private boolean isLinkLocalUnicast(InetAddress ip) {
+        return ip.isLinkLocalAddress();
+    }
+
+    private boolean isInterfaceLocalMulticast(InetAddress ip) {
+        return ip.isMCNodeLocal();
+    }
+
+    private boolean isLinkLocalMulticast(InetAddress ip) {
+        return ip.isMCLinkLocal();
+    }
+
+    private boolean isMulticast(InetAddress ip) {
+        return ip.isMulticastAddress();
+    }
+
+    private boolean isUnspecified(InetAddress ip) {
+        var address = ip.getAddress();
+        return Arrays.equals(UNDEFINED_IP4, address) || Arrays.equals(UNDEFINED_IP6, address);
+    }
+
+    private boolean isPrivate(String ip) {
+        return CIDRUtils.isInRange(ip, "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "fd00::/8");
+    }
+
+    private boolean isPublic(String ip) {
+        return isLocalOrPrivate(ip) == false;
+    }
+
+    private boolean isLocalOrPrivate(String ip) {
+        var address = InetAddresses.forString(ip);
+        return isPrivate(ip)
+            || isLoopback(address)
+            || isUnspecified(address)
+            || isLinkLocalUnicast(address)
+            || isLinkLocalMulticast(address)
+            || isInterfaceLocalMulticast(address)
+            || Arrays.equals(address.getAddress(), BROADCAST_IP4);
+    }
+
+    @Override
+    public String getType() {
+        return TYPE;
+    }
+
+    public static final class Factory implements Processor.Factory {
+
+        static final String DEFAULT_SOURCE_IP = "source.ip";
+        static final String DEFAULT_DEST_IP = "destination.ip";
+        static final String DEFAULT_TARGET = "network.direction";
+
+        @Override
+        public NetworkDirectionProcessor create(
+            Map<String, Processor.Factory> registry,
+            String processorTag,
+            String description,
+            Map<String, Object> config
+        ) throws Exception {
+            String sourceIpField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "source_ip", DEFAULT_SOURCE_IP);
+            String destIpField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "destination_ip", DEFAULT_DEST_IP);
+            String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", DEFAULT_TARGET);
+            List<String> internalNetworks = ConfigurationUtils.readList(TYPE, processorTag, config, "internal_networks");
+            boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", true);
+
+            return new NetworkDirectionProcessor(
+                processorTag,
+                description,
+                sourceIpField,
+                destIpField,
+                targetField,
+                internalNetworks,
+                ignoreMissing
+            );
+        }
+    }
+}

+ 82 - 0
x-pack/plugin/ingest/src/test/java/org/elasticsearch/xpack/ingest/NetworkDirectionProcessorFactoryTests.java

@@ -0,0 +1,82 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.ingest;
+
+import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.test.ESTestCase;
+import org.junit.Before;
+
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.xpack.ingest.NetworkDirectionProcessor.Factory.DEFAULT_DEST_IP;
+import static org.elasticsearch.xpack.ingest.NetworkDirectionProcessor.Factory.DEFAULT_SOURCE_IP;
+import static org.elasticsearch.xpack.ingest.NetworkDirectionProcessor.Factory.DEFAULT_TARGET;
+import static org.hamcrest.CoreMatchers.equalTo;
+
+public class NetworkDirectionProcessorFactoryTests extends ESTestCase {
+
+    private NetworkDirectionProcessor.Factory factory;
+
+    @Before
+    public void init() {
+        factory = new NetworkDirectionProcessor.Factory();
+    }
+
+    public void testCreate() throws Exception {
+        Map<String, Object> config = new HashMap<>();
+
+        String sourceIpField = randomAlphaOfLength(6);
+        config.put("source_ip", sourceIpField);
+        String destIpField = randomAlphaOfLength(6);
+        config.put("destination_ip", destIpField);
+        String targetField = randomAlphaOfLength(6);
+        config.put("target_field", targetField);
+        List<String> internalNetworks = new ArrayList<>();
+        internalNetworks.add("10.0.0.0/8");
+        config.put("internal_networks", internalNetworks);
+        boolean ignoreMissing = randomBoolean();
+        config.put("ignore_missing", ignoreMissing);
+
+        String processorTag = randomAlphaOfLength(10);
+        NetworkDirectionProcessor networkProcessor = factory.create(null, processorTag, null, config);
+        assertThat(networkProcessor.getTag(), equalTo(processorTag));
+        assertThat(networkProcessor.getSourceIpField(), equalTo(sourceIpField));
+        assertThat(networkProcessor.getDestinationIpField(), equalTo(destIpField));
+        assertThat(networkProcessor.getTargetField(), equalTo(targetField));
+        assertThat(networkProcessor.getInternalNetworks(), equalTo(internalNetworks));
+        assertThat(networkProcessor.getIgnoreMissing(), equalTo(ignoreMissing));
+    }
+
+    public void testRequiredFields() throws Exception {
+        HashMap<String, Object> config = new HashMap<>();
+        String processorTag = randomAlphaOfLength(10);
+        try {
+            factory.create(null, processorTag, null, config);
+            fail("factory create should have failed");
+        } catch (ElasticsearchParseException e) {
+            assertThat(e.getMessage(), equalTo("[internal_networks] required property is missing"));
+        }
+    }
+
+    public void testDefaultFields() throws Exception {
+        HashMap<String, Object> config = new HashMap<>();
+        String processorTag = randomAlphaOfLength(10);
+        List<String> internalNetworks = new ArrayList<>();
+        internalNetworks.add("10.0.0.0/8");
+        config.put("internal_networks", internalNetworks);
+
+        NetworkDirectionProcessor networkProcessor = factory.create(null, processorTag, null, config);
+        assertThat(networkProcessor.getTag(), equalTo(processorTag));
+        assertThat(networkProcessor.getSourceIpField(), equalTo(DEFAULT_SOURCE_IP));
+        assertThat(networkProcessor.getDestinationIpField(), equalTo(DEFAULT_DEST_IP));
+        assertThat(networkProcessor.getTargetField(), equalTo(DEFAULT_TARGET));
+        assertThat(networkProcessor.getIgnoreMissing(), equalTo(true));
+    }
+}

+ 157 - 0
x-pack/plugin/ingest/src/test/java/org/elasticsearch/xpack/ingest/NetworkDirectionProcessorTests.java

@@ -0,0 +1,157 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.ingest;
+
+import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.xpack.ingest.NetworkDirectionProcessor.Factory.DEFAULT_DEST_IP;
+import static org.elasticsearch.xpack.ingest.NetworkDirectionProcessor.Factory.DEFAULT_SOURCE_IP;
+import static org.elasticsearch.xpack.ingest.NetworkDirectionProcessor.Factory.DEFAULT_TARGET;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+
+public class NetworkDirectionProcessorTests extends ESTestCase {
+    private Map<String, Object> buildEvent() {
+        return buildEvent("128.232.110.120");
+    }
+
+    private Map<String, Object> buildEvent(String source) {
+        return buildEvent(source, "66.35.250.204");
+    }
+
+    private Map<String, Object> buildEvent(String source, String destination) {
+        return new HashMap<>() {
+            {
+                put("source", new HashMap<String, Object>() {
+                    {
+                        put("ip", source);
+                    }
+                });
+                put("destination", new HashMap<String, Object>() {
+                    {
+                        put("ip", destination);
+                    }
+                });
+            }
+        };
+    }
+
+    public void testNoInternalNetworks() throws Exception {
+        IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> testNetworkDirectionProcessor(buildEvent(), null));
+        assertThat(e.getMessage(), containsString("unable to calculate network direction from document"));
+    }
+
+    public void testNoSource() throws Exception {
+        IllegalArgumentException e = expectThrows(
+            IllegalArgumentException.class,
+            () -> testNetworkDirectionProcessor(buildEvent(null), new String[] { "10.0.0.0/8" })
+        );
+        assertThat(e.getMessage(), containsString("unable to calculate network direction from document"));
+    }
+
+    public void testNoDestination() throws Exception {
+        IllegalArgumentException e = expectThrows(
+            IllegalArgumentException.class,
+            () -> testNetworkDirectionProcessor(buildEvent("192.168.1.1", null), new String[] { "10.0.0.0/8" })
+        );
+        assertThat(e.getMessage(), containsString("unable to calculate network direction from document"));
+    }
+
+    public void testIgnoreMissing() throws Exception {
+        testNetworkDirectionProcessor(buildEvent(null), new String[] { "10.0.0.0/8" }, null, true);
+    }
+
+    public void testInvalidSource() throws Exception {
+        IllegalArgumentException e = expectThrows(
+            IllegalArgumentException.class,
+            () -> testNetworkDirectionProcessor(buildEvent("invalid"), new String[] { "10.0.0.0/8" })
+        );
+        assertThat(e.getMessage(), containsString("'invalid' is not an IP string literal."));
+    }
+
+    public void testInvalidDestination() throws Exception {
+        IllegalArgumentException e = expectThrows(
+            IllegalArgumentException.class,
+            () -> testNetworkDirectionProcessor(buildEvent("192.168.1.1", "invalid"), new String[] { "10.0.0.0/8" })
+        );
+        assertThat(e.getMessage(), containsString("'invalid' is not an IP string literal."));
+    }
+
+    public void testInvalidNetwork() throws Exception {
+        IllegalArgumentException e = expectThrows(
+            IllegalArgumentException.class,
+            () -> testNetworkDirectionProcessor(buildEvent("192.168.1.1", "192.168.1.1"), new String[] { "10.0.0.0/8", "invalid" })
+        );
+        assertThat(e.getMessage(), containsString("'invalid' is not an IP string literal."));
+    }
+
+    public void testCIDR() throws Exception {
+        testNetworkDirectionProcessor(buildEvent("10.0.1.1", "192.168.1.2"), new String[] { "10.0.0.0/8" }, "outbound");
+        testNetworkDirectionProcessor(buildEvent("192.168.1.2", "10.0.1.1"), new String[] { "10.0.0.0/8" }, "inbound");
+    }
+
+    public void testUnspecified() throws Exception {
+        testNetworkDirectionProcessor(buildEvent("0.0.0.0", "0.0.0.0"), new String[] { "unspecified" }, "internal");
+        testNetworkDirectionProcessor(buildEvent("::", "::"), new String[] { "unspecified" }, "internal");
+    }
+
+    public void testNetworkPrivate() throws Exception {
+        testNetworkDirectionProcessor(buildEvent("192.168.1.1", "192.168.1.2"), new String[] { "private" }, "internal");
+        testNetworkDirectionProcessor(buildEvent("10.0.1.1", "192.168.1.2"), new String[] { "private" }, "internal");
+        testNetworkDirectionProcessor(buildEvent("192.168.1.1", "172.16.0.1"), new String[] { "private" }, "internal");
+        testNetworkDirectionProcessor(buildEvent("192.168.1.1", "fd12:3456:789a:1::1"), new String[] { "private" }, "internal");
+    }
+
+    public void testNetworkPublic() throws Exception {
+        testNetworkDirectionProcessor(buildEvent("192.168.1.1", "192.168.1.2"), new String[] { "public" }, "external");
+        testNetworkDirectionProcessor(buildEvent("10.0.1.1", "192.168.1.2"), new String[] { "public" }, "external");
+        testNetworkDirectionProcessor(buildEvent("192.168.1.1", "172.16.0.1"), new String[] { "public" }, "external");
+        testNetworkDirectionProcessor(buildEvent("192.168.1.1", "fd12:3456:789a:1::1"), new String[] { "public" }, "external");
+    }
+
+    private void testNetworkDirectionProcessor(Map<String, Object> source, String[] internalNetworks) throws Exception {
+        testNetworkDirectionProcessor(source, internalNetworks, "");
+    }
+
+    private void testNetworkDirectionProcessor(Map<String, Object> source, String[] internalNetworks, String expectedDirection)
+        throws Exception {
+        testNetworkDirectionProcessor(source, internalNetworks, expectedDirection, false);
+    }
+
+    private void testNetworkDirectionProcessor(
+        Map<String, Object> source,
+        String[] internalNetworks,
+        String expectedDirection,
+        boolean ignoreMissing
+    ) throws Exception {
+        List<String> networks = null;
+
+        if (internalNetworks != null) networks = Arrays.asList(internalNetworks);
+
+        var processor = new NetworkDirectionProcessor(
+            null,
+            null,
+            DEFAULT_SOURCE_IP,
+            DEFAULT_DEST_IP,
+            DEFAULT_TARGET,
+            networks,
+            ignoreMissing
+        );
+
+        IngestDocument input = new IngestDocument(source, Map.of());
+        IngestDocument output = processor.execute(input);
+
+        String hash = output.getFieldValue(DEFAULT_TARGET, String.class, ignoreMissing);
+        assertThat(hash, equalTo(expectedDirection));
+    }
+}