Browse Source

Make enrich cache based on memory usage (#111412)

The max enrich cache size setting now also supports an absolute max size in bytes (of used heap space) and a percentage of the max heap space, next to the existing flat document count. The default is 1% of the max heap space.

This should prevent issues where the enrich cache takes up a lot of memory when there are large documents in the cache.
Niels Bauman 1 year ago
parent
commit
e0c1ccbc1e

+ 6 - 0
docs/changelog/111412.yaml

@@ -0,0 +1,6 @@
+pr: 111412
+summary: Make enrich cache based on memory usage
+area: Ingest Node
+type: enhancement
+issues:
+ - 106081

+ 13 - 7
docs/reference/ingest/enrich.asciidoc

@@ -230,12 +230,12 @@ Instead, you can:
 [[ingest-enrich-components]]
 ==== Enrich components
 
-The enrich coordinator is a component that manages and performs the searches 
+The enrich coordinator is a component that manages and performs the searches
 required to enrich documents on each ingest node. It combines searches from all enrich
 processors in all pipelines into bulk <<search-multi-search,multi-searches>>.
 
-The enrich policy executor is a component that manages the executions of all 
-enrich policies. When an enrich policy is executed, this component creates 
+The enrich policy executor is a component that manages the executions of all
+enrich policies. When an enrich policy is executed, this component creates
 a new enrich index and removes the previous enrich index. The enrich policy
 executions are managed from the elected master node. The execution of these
 policies occurs on a different node.
@@ -249,9 +249,15 @@ enrich policy executor.
 The enrich coordinator supports the following node settings:
 
 `enrich.cache_size`::
-Maximum number of searches to cache for enriching documents. Defaults to `1000`.
-There is a single cache for all enrich processors in the cluster. This setting
-determines the size of that cache.
+Maximum size of the cache that caches searches for enriching documents.
+The size can be specified in three units: the raw number of
+cached searches (e.g. `1000`), an absolute size in bytes (e.g. `100Mb`),
+or a percentage of the max heap space of the node (e.g. `1%`).
+Both for the absolute byte size and the percentage of heap space,
+{es} does not guarantee that the enrich cache size will adhere exactly to that maximum,
+as {es} uses the byte size of the serialized search response
+which is is a good representation of the used space on the heap, but not an exact match.
+Defaults to `1%`. There is a single cache for all enrich processors in the cluster.
 
 `enrich.coordinator_proxy.max_concurrent_requests`::
 Maximum number of concurrent <<search-multi-search,multi-search requests>> to
@@ -280,4 +286,4 @@ Maximum number of enrich policies to execute concurrently. Defaults to `50`.
 
 include::geo-match-enrich-policy-type-ex.asciidoc[]
 include::match-enrich-policy-type-ex.asciidoc[]
-include::range-enrich-policy-type-ex.asciidoc[]
+include::range-enrich-policy-type-ex.asciidoc[]

+ 21 - 2
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichCache.java

@@ -14,6 +14,7 @@ import org.elasticsearch.cluster.metadata.IndexAbstraction;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.cache.Cache;
 import org.elasticsearch.common.cache.CacheBuilder;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.IndexNotFoundException;
@@ -29,6 +30,7 @@ import java.util.Objects;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.function.LongSupplier;
+import java.util.function.ToLongBiFunction;
 
 /**
  * A simple cache for enrich that uses {@link Cache}. There is one instance of this cache and
@@ -61,12 +63,29 @@ public final class EnrichCache {
         this(maxSize, System::nanoTime);
     }
 
+    EnrichCache(ByteSizeValue maxByteSize) {
+        this(maxByteSize, System::nanoTime);
+    }
+
     // non-private for unit testing only
     EnrichCache(long maxSize, LongSupplier relativeNanoTimeProvider) {
         this.relativeNanoTimeProvider = relativeNanoTimeProvider;
-        this.cache = CacheBuilder.<CacheKey, CacheValue>builder().setMaximumWeight(maxSize).removalListener(notification -> {
+        this.cache = createCache(maxSize, null);
+    }
+
+    EnrichCache(ByteSizeValue maxByteSize, LongSupplier relativeNanoTimeProvider) {
+        this.relativeNanoTimeProvider = relativeNanoTimeProvider;
+        this.cache = createCache(maxByteSize.getBytes(), (key, value) -> value.sizeInBytes);
+    }
+
+    private Cache<CacheKey, CacheValue> createCache(long maxWeight, ToLongBiFunction<CacheKey, CacheValue> weigher) {
+        var builder = CacheBuilder.<CacheKey, CacheValue>builder().setMaximumWeight(maxWeight).removalListener(notification -> {
             sizeInBytes.getAndAdd(-1 * notification.getValue().sizeInBytes);
-        }).build();
+        });
+        if (weigher != null) {
+            builder.weigher(weigher);
+        }
+        return builder.build();
     }
 
     /**

+ 63 - 2
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java

@@ -12,17 +12,22 @@ import org.elasticsearch.cluster.NamedDiff;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsFilter;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.MemorySizeValue;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.features.NodeFeature;
 import org.elasticsearch.indices.SystemIndexDescriptor;
 import org.elasticsearch.ingest.Processor;
 import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.monitor.jvm.JvmInfo;
 import org.elasticsearch.plugins.IngestPlugin;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.plugins.SystemIndexPlugin;
@@ -121,14 +126,29 @@ public class EnrichPlugin extends Plugin implements SystemIndexPlugin, IngestPlu
         return String.valueOf(maxConcurrentRequests * maxLookupsPerRequest);
     }, val -> Setting.parseInt(val, 1, Integer.MAX_VALUE, QUEUE_CAPACITY_SETTING_NAME), Setting.Property.NodeScope);
 
-    public static final Setting<Long> CACHE_SIZE = Setting.longSetting("enrich.cache_size", 1000, 0, Setting.Property.NodeScope);
+    public static final String CACHE_SIZE_SETTING_NAME = "enrich.cache.size";
+    public static final Setting<FlatNumberOrByteSizeValue> CACHE_SIZE = new Setting<>(
+        "enrich.cache.size",
+        (String) null,
+        (String s) -> FlatNumberOrByteSizeValue.parse(
+            s,
+            CACHE_SIZE_SETTING_NAME,
+            new FlatNumberOrByteSizeValue(ByteSizeValue.ofBytes((long) (0.01 * JvmInfo.jvmInfo().getConfiguredMaxHeapSize())))
+        ),
+        Setting.Property.NodeScope
+    );
 
     private final Settings settings;
     private final EnrichCache enrichCache;
 
     public EnrichPlugin(final Settings settings) {
         this.settings = settings;
-        this.enrichCache = new EnrichCache(CACHE_SIZE.get(settings));
+        FlatNumberOrByteSizeValue maxSize = CACHE_SIZE.get(settings);
+        if (maxSize.byteSizeValue() != null) {
+            this.enrichCache = new EnrichCache(maxSize.byteSizeValue());
+        } else {
+            this.enrichCache = new EnrichCache(maxSize.flatNumber());
+        }
     }
 
     @Override
@@ -265,4 +285,45 @@ public class EnrichPlugin extends Plugin implements SystemIndexPlugin, IngestPlu
     public String getFeatureDescription() {
         return "Manages data related to Enrich policies";
     }
+
+    /**
+     * A class that specifies either a flat (unit-less) number or a byte size value.
+     */
+    public static class FlatNumberOrByteSizeValue {
+
+        @Nullable
+        private final Long flatNumber;
+        @Nullable
+        private final ByteSizeValue byteSizeValue;
+
+        public FlatNumberOrByteSizeValue(ByteSizeValue byteSizeValue) {
+            this.byteSizeValue = byteSizeValue;
+            this.flatNumber = null;
+        }
+
+        public FlatNumberOrByteSizeValue(Long flatNumber) {
+            this.flatNumber = flatNumber;
+            this.byteSizeValue = null;
+        }
+
+        public static FlatNumberOrByteSizeValue parse(String value, String settingName, FlatNumberOrByteSizeValue defaultValue) {
+            if (Strings.hasText(value) == false) {
+                return defaultValue;
+            }
+            if (Character.isDigit(value.charAt(value.length() - 1)) == false) {
+                return new FlatNumberOrByteSizeValue(MemorySizeValue.parseBytesSizeValueOrHeapRatio(value, settingName));
+            }
+            return new FlatNumberOrByteSizeValue(Long.parseLong(value));
+        }
+
+        @Nullable
+        public ByteSizeValue byteSizeValue() {
+            return byteSizeValue;
+        }
+
+        @Nullable
+        public Long flatNumber() {
+            return flatNumber;
+        }
+    }
 }

+ 59 - 0
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/FlatNumberOrByteSizeValueTests.java

@@ -0,0 +1,59 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.enrich;
+
+import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.monitor.jvm.JvmInfo;
+import org.elasticsearch.test.ESTestCase;
+
+import static org.elasticsearch.xpack.enrich.EnrichPlugin.FlatNumberOrByteSizeValue;
+
+public class FlatNumberOrByteSizeValueTests extends ESTestCase {
+
+    private static final String SETTING_NAME = "test.setting";
+
+    public void testParse() {
+        int number = randomIntBetween(1, Integer.MAX_VALUE);
+        assertEquals(
+            new FlatNumberOrByteSizeValue((long) number),
+            FlatNumberOrByteSizeValue.parse(Integer.toString(number), SETTING_NAME, null)
+        );
+        assertEquals(
+            new FlatNumberOrByteSizeValue(ByteSizeValue.ofGb(number)),
+            FlatNumberOrByteSizeValue.parse(number + "GB", SETTING_NAME, null)
+        );
+        assertEquals(
+            new FlatNumberOrByteSizeValue(ByteSizeValue.ofGb(number)),
+            FlatNumberOrByteSizeValue.parse(number + "g", SETTING_NAME, null)
+        );
+        int percentage = randomIntBetween(0, 100);
+        assertEquals(
+            new FlatNumberOrByteSizeValue(
+                ByteSizeValue.ofBytes((long) ((double) percentage / 100 * JvmInfo.jvmInfo().getConfiguredMaxHeapSize()))
+            ),
+            FlatNumberOrByteSizeValue.parse(percentage + "%", SETTING_NAME, null)
+        );
+        assertEquals(new FlatNumberOrByteSizeValue(0L), FlatNumberOrByteSizeValue.parse("0", SETTING_NAME, null));
+        assertEquals(new FlatNumberOrByteSizeValue(ByteSizeValue.ZERO), FlatNumberOrByteSizeValue.parse("0GB", SETTING_NAME, null));
+        assertEquals(new FlatNumberOrByteSizeValue(ByteSizeValue.ZERO), FlatNumberOrByteSizeValue.parse("0%", SETTING_NAME, null));
+        // Assert default value.
+        assertEquals(
+            new FlatNumberOrByteSizeValue((long) number),
+            FlatNumberOrByteSizeValue.parse(null, SETTING_NAME, new FlatNumberOrByteSizeValue((long) number))
+        );
+        assertThrows(ElasticsearchParseException.class, () -> FlatNumberOrByteSizeValue.parse("5GB%", SETTING_NAME, null));
+        assertThrows(ElasticsearchParseException.class, () -> FlatNumberOrByteSizeValue.parse("5%GB", SETTING_NAME, null));
+        assertThrows(ElasticsearchParseException.class, () -> FlatNumberOrByteSizeValue.parse("5GBX", SETTING_NAME, null));
+    }
+
+    private void assertEquals(FlatNumberOrByteSizeValue expected, FlatNumberOrByteSizeValue actual) {
+        assertEquals(expected.byteSizeValue(), actual.byteSizeValue());
+        assertEquals(expected.flatNumber(), actual.flatNumber());
+    }
+}