Browse Source

Enable BloomFilter for _id of non-datastream indices (#88409)

This PR adds BloomFilter to Elasticsearch and enables it for the _id 
field of non-data stream indices. BloomFilter should speed up the
performance of mget and update requests at a small expense of refresh,
merge, and storage.
Nhat Nguyen 3 years ago
parent
commit
cfad420cde
23 changed files with 781 additions and 28 deletions
  1. 5 0
      docs/changelog/88409.yaml
  2. 2 1
      server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java
  3. 3 0
      server/src/main/java/module-info.java
  4. 1 0
      server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java
  5. 12 0
      server/src/main/java/org/elasticsearch/index/IndexSettings.java
  6. 4 3
      server/src/main/java/org/elasticsearch/index/codec/CodecService.java
  7. 24 5
      server/src/main/java/org/elasticsearch/index/codec/PerFieldMapperCodec.java
  8. 568 0
      server/src/main/java/org/elasticsearch/index/codec/bloomfilter/ES85BloomFilterPostingsFormat.java
  9. 1 1
      server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  10. 3 0
      server/src/main/java/org/elasticsearch/index/store/LuceneFilesExtensions.java
  11. 1 0
      server/src/main/resources/META-INF/services/org.apache.lucene.codecs.PostingsFormat
  12. 2 1
      server/src/test/java/org/elasticsearch/index/codec/CodecTests.java
  13. 127 0
      server/src/test/java/org/elasticsearch/index/codec/bloomfilter/ES85BloomFilterPostingsFormatTests.java
  14. 3 3
      server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
  15. 2 1
      server/src/test/java/org/elasticsearch/index/mapper/CompletionFieldMapperTests.java
  16. 2 1
      server/src/test/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapperTests.java
  17. 2 1
      server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
  18. 1 1
      server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java
  19. 2 1
      server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java
  20. 10 6
      test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java
  21. 3 1
      test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java
  22. 2 1
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java
  23. 1 1
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java

+ 5 - 0
docs/changelog/88409.yaml

@@ -0,0 +1,5 @@
+pr: 88409
+summary: Enable `BloomFilter` for `_id` of non-datastream indices
+area: Search
+type: enhancement
+issues: []

+ 2 - 1
server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java

@@ -9,6 +9,7 @@ package org.elasticsearch.indices;
 
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.CollectionUtils;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.IndexSettings;
@@ -62,7 +63,7 @@ public class IndexingMemoryControllerIT extends ESSingleNodeTestCase {
                 config.getMergePolicy(),
                 config.getAnalyzer(),
                 config.getSimilarity(),
-                new CodecService(null),
+                new CodecService(null, BigArrays.NON_RECYCLING_INSTANCE),
                 config.getEventListener(),
                 config.getQueryCache(),
                 config.getQueryCachingPolicy(),

+ 3 - 0
server/src/main/java/module-info.java

@@ -224,6 +224,7 @@ module org.elasticsearch.server {
     exports org.elasticsearch.index.cache.query;
     exports org.elasticsearch.index.cache.request;
     exports org.elasticsearch.index.codec;
+    exports org.elasticsearch.index.codec.bloomfilter;
     exports org.elasticsearch.index.engine;
     exports org.elasticsearch.index.fielddata;
     exports org.elasticsearch.index.fielddata.fieldcomparator;
@@ -362,4 +363,6 @@ module org.elasticsearch.server {
             org.elasticsearch.index.shard.ShardToolCliProvider;
 
     uses org.elasticsearch.reservedstate.ReservedClusterStateHandlerProvider;
+
+    provides org.apache.lucene.codecs.PostingsFormat with org.elasticsearch.index.codec.bloomfilter.ES85BloomFilterPostingsFormat;
 }

+ 1 - 0
server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

@@ -162,6 +162,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
         DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS,
         ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP,
         DataTier.TIER_PREFERENCE_SETTING,
+        IndexSettings.BLOOM_FILTER_ID_FIELD_ENABLED_SETTING,
 
         // validate that built-in similarities don't get redefined
         Setting.groupSetting("index.similarity.", (s) -> {

+ 12 - 0
server/src/main/java/org/elasticsearch/index/IndexSettings.java

@@ -455,6 +455,18 @@ public final class IndexSettings {
         Setting.Property.IndexScope
     );
 
+    /**
+     * This index setting is intentionally undocumented and should be used as an escape hatch to disable BloomFilter of the
+     * _id field of non-data-stream indices, which is enabled by default. This setting doesn't affect data-stream indices.
+     */
+    public static final Setting<Boolean> BLOOM_FILTER_ID_FIELD_ENABLED_SETTING = Setting.boolSetting(
+        "index.bloom_filter_for_id_field.enabled",
+        true,
+        Setting.Property.Dynamic,
+        Setting.Property.IndexScope,
+        Property.DeprecatedWarning
+    );
+
     /**
      * Is the {@code index.mode} enabled? It should only be enbaled if you
      * pass a jvm parameter or are running a snapshot build.

+ 4 - 3
server/src/main/java/org/elasticsearch/index/codec/CodecService.java

@@ -10,6 +10,7 @@ package org.elasticsearch.index.codec;
 
 import org.apache.lucene.codecs.Codec;
 import org.apache.lucene.codecs.lucene92.Lucene92Codec;
+import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.index.mapper.MapperService;
 
@@ -31,14 +32,14 @@ public class CodecService {
     /** the raw unfiltered lucene default. useful for testing */
     public static final String LUCENE_DEFAULT_CODEC = "lucene_default";
 
-    public CodecService(@Nullable MapperService mapperService) {
+    public CodecService(@Nullable MapperService mapperService, BigArrays bigArrays) {
         final var codecs = new HashMap<String, Codec>();
         if (mapperService == null) {
             codecs.put(DEFAULT_CODEC, new Lucene92Codec());
             codecs.put(BEST_COMPRESSION_CODEC, new Lucene92Codec(Lucene92Codec.Mode.BEST_COMPRESSION));
         } else {
-            codecs.put(DEFAULT_CODEC, new PerFieldMapperCodec(Lucene92Codec.Mode.BEST_SPEED, mapperService));
-            codecs.put(BEST_COMPRESSION_CODEC, new PerFieldMapperCodec(Lucene92Codec.Mode.BEST_COMPRESSION, mapperService));
+            codecs.put(DEFAULT_CODEC, new PerFieldMapperCodec(Lucene92Codec.Mode.BEST_SPEED, mapperService, bigArrays));
+            codecs.put(BEST_COMPRESSION_CODEC, new PerFieldMapperCodec(Lucene92Codec.Mode.BEST_COMPRESSION, mapperService, bigArrays));
         }
         codecs.put(LUCENE_DEFAULT_CODEC, Codec.getDefault());
         for (String codec : Codec.availableCodecs()) {

+ 24 - 5
server/src/main/java/org/elasticsearch/index/codec/PerFieldMapperCodec.java

@@ -15,6 +15,10 @@ import org.apache.lucene.codecs.PostingsFormat;
 import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat;
 import org.apache.lucene.codecs.lucene92.Lucene92Codec;
 import org.elasticsearch.common.lucene.Lucene;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.codec.bloomfilter.ES85BloomFilterPostingsFormat;
+import org.elasticsearch.index.mapper.IdFieldMapper;
 import org.elasticsearch.index.mapper.Mapper;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.vectors.DenseVectorFieldMapper;
@@ -31,24 +35,39 @@ public class PerFieldMapperCodec extends Lucene92Codec {
     private final MapperService mapperService;
 
     private final DocValuesFormat docValuesFormat = new Lucene90DocValuesFormat();
+    private final ES85BloomFilterPostingsFormat bloomFilterPostingsFormat;
 
     static {
         assert Codec.forName(Lucene.LATEST_CODEC).getClass().isAssignableFrom(PerFieldMapperCodec.class)
             : "PerFieldMapperCodec must subclass the latest " + "lucene codec: " + Lucene.LATEST_CODEC;
     }
 
-    public PerFieldMapperCodec(Mode compressionMode, MapperService mapperService) {
+    public PerFieldMapperCodec(Mode compressionMode, MapperService mapperService, BigArrays bigArrays) {
         super(compressionMode);
         this.mapperService = mapperService;
+        this.bloomFilterPostingsFormat = new ES85BloomFilterPostingsFormat(bigArrays, this::internalGetPostingsFormatForField);
     }
 
     @Override
     public PostingsFormat getPostingsFormatForField(String field) {
-        PostingsFormat format = mapperService.mappingLookup().getPostingsFormat(field);
-        if (format == null) {
-            return super.getPostingsFormatForField(field);
+        if (useBloomFilter(field)) {
+            return bloomFilterPostingsFormat;
         }
-        return format;
+        return internalGetPostingsFormatForField(field);
+    }
+
+    private PostingsFormat internalGetPostingsFormatForField(String field) {
+        final PostingsFormat format = mapperService.mappingLookup().getPostingsFormat(field);
+        if (format != null) {
+            return format;
+        }
+        return super.getPostingsFormatForField(field);
+    }
+
+    private boolean useBloomFilter(String field) {
+        return IdFieldMapper.NAME.equals(field)
+            && mapperService.mappingLookup().isDataStreamTimestampFieldEnabled() == false
+            && IndexSettings.BLOOM_FILTER_ID_FIELD_ENABLED_SETTING.get(mapperService.getIndexSettings().getSettings());
     }
 
     @Override

+ 568 - 0
server/src/main/java/org/elasticsearch/index/codec/bloomfilter/ES85BloomFilterPostingsFormat.java

@@ -0,0 +1,568 @@
+/*
+ * @notice
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Modifications copyright (C) 2022 Elasticsearch B.V.
+ */
+package org.elasticsearch.index.codec.bloomfilter;
+
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.codecs.FieldsConsumer;
+import org.apache.lucene.codecs.FieldsProducer;
+import org.apache.lucene.codecs.NormsProducer;
+import org.apache.lucene.codecs.PostingsFormat;
+import org.apache.lucene.index.BaseTermsEnum;
+import org.apache.lucene.index.FieldInfos;
+import org.apache.lucene.index.Fields;
+import org.apache.lucene.index.FilterLeafReader;
+import org.apache.lucene.index.ImpactsEnum;
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.index.PostingsEnum;
+import org.apache.lucene.index.SegmentInfo;
+import org.apache.lucene.index.SegmentReadState;
+import org.apache.lucene.index.SegmentWriteState;
+import org.apache.lucene.index.TermState;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.store.ChecksumIndexInput;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.RandomAccessInput;
+import org.apache.lucene.util.AttributeSource;
+import org.apache.lucene.util.BitUtil;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.ByteArray;
+import org.elasticsearch.core.IOUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * This implementation is forked from Lucene's BloomFilterPosting to support on-disk bloom filters.
+ * <p>
+ * A {@link PostingsFormat} useful for low doc-frequency fields such as primary keys. Bloom filters
+ * offers "fast-fail" for reads in segments known to have no record of the key.
+ */
+public class ES85BloomFilterPostingsFormat extends PostingsFormat {
+    static final String BLOOM_CODEC_NAME = "ES85BloomFilter";
+    static final int VERSION_START = 0;
+    static final int VERSION_CURRENT = VERSION_START;
+    static final String BLOOM_FILTER_META_FILE = "bfm";
+    static final String BLOOM_FILTER_INDEX_FILE = "bfi";
+
+    private Function<String, PostingsFormat> postingsFormats;
+    private BigArrays bigArrays;
+
+    public ES85BloomFilterPostingsFormat(BigArrays bigArrays, Function<String, PostingsFormat> postingsFormats) {
+        this();
+        this.bigArrays = Objects.requireNonNull(bigArrays);
+        this.postingsFormats = Objects.requireNonNull(postingsFormats);
+    }
+
+    public ES85BloomFilterPostingsFormat() {
+        super(BLOOM_CODEC_NAME);
+    }
+
+    @Override
+    public FieldsConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
+        if (postingsFormats == null || bigArrays == null) {
+            assert false : BLOOM_CODEC_NAME + " was initialized with a wrong constructor";
+            throw new UnsupportedOperationException(BLOOM_CODEC_NAME + " was initialized with a wrong constructor");
+        }
+        return new FieldsWriter(state);
+    }
+
+    @Override
+    public FieldsProducer fieldsProducer(SegmentReadState state) throws IOException {
+        return new FieldsReader(state);
+    }
+
+    @Override
+    public String toString() {
+        return BLOOM_CODEC_NAME;
+    }
+
+    private static String metaFile(SegmentInfo si, String segmentSuffix) {
+        return IndexFileNames.segmentFileName(si.name, segmentSuffix, BLOOM_FILTER_META_FILE);
+    }
+
+    private static String indexFile(SegmentInfo si, String segmentSuffix) {
+        return IndexFileNames.segmentFileName(si.name, segmentSuffix, BLOOM_FILTER_INDEX_FILE);
+    }
+
+    final class FieldsWriter extends FieldsConsumer {
+        private final SegmentWriteState state;
+        private final IndexOutput indexOut;
+        private final List<BloomFilter> bloomFilters = new ArrayList<>();
+        private final List<FieldsGroup> fieldsGroups = new ArrayList<>();
+        private final List<Closeable> toCloses = new ArrayList<>();
+        private boolean closed;
+
+        FieldsWriter(SegmentWriteState state) throws IOException {
+            this.state = state;
+            boolean success = false;
+            try {
+                indexOut = state.directory.createOutput(indexFile(state.segmentInfo, state.segmentSuffix), state.context);
+                toCloses.add(indexOut);
+                CodecUtil.writeIndexHeader(indexOut, BLOOM_CODEC_NAME, VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix);
+                success = true;
+            } finally {
+                if (success == false) {
+                    IOUtils.closeWhileHandlingException(toCloses);
+                }
+            }
+        }
+
+        @Override
+        public void write(Fields fields, NormsProducer norms) throws IOException {
+            writePostings(fields, norms);
+            writeBloomFilters(fields);
+        }
+
+        private void writePostings(Fields fields, NormsProducer norms) throws IOException {
+            final Map<PostingsFormat, FieldsGroup> currentGroups = new HashMap<>();
+            for (String field : fields) {
+                final PostingsFormat postingsFormat = postingsFormats.apply(field);
+                if (postingsFormat == null) {
+                    throw new IllegalStateException("PostingsFormat for field [" + field + "] wasn't specified");
+                }
+                FieldsGroup group = currentGroups.get(postingsFormat);
+                if (group == null) {
+                    group = new FieldsGroup(postingsFormat, Integer.toString(fieldsGroups.size()), new ArrayList<>());
+                    currentGroups.put(postingsFormat, group);
+                    fieldsGroups.add(group);
+                }
+                group.fields.add(field);
+            }
+            for (FieldsGroup group : currentGroups.values()) {
+                final FieldsConsumer writer = group.postingsFormat.fieldsConsumer(new SegmentWriteState(state, group.suffix));
+                toCloses.add(writer);
+                final Fields maskedFields = new FilterLeafReader.FilterFields(fields) {
+                    @Override
+                    public Iterator<String> iterator() {
+                        return group.fields.iterator();
+                    }
+                };
+                writer.write(maskedFields, norms);
+            }
+        }
+
+        private void writeBloomFilters(Fields fields) throws IOException {
+            for (String field : fields) {
+                final Terms terms = fields.terms(field);
+                if (terms == null) {
+                    continue;
+                }
+                final int bloomFilterSize = bloomFilterSize(state.segmentInfo.maxDoc());
+                final int numBytes = numBytesForBloomFilter(bloomFilterSize);
+                try (ByteArray buffer = bigArrays.newByteArray(numBytes)) {
+                    final TermsEnum termsEnum = terms.iterator();
+                    while (true) {
+                        final BytesRef term = termsEnum.next();
+                        if (term == null) {
+                            break;
+                        }
+                        final int hash = hashTerm(term) % bloomFilterSize;
+                        final int pos = hash >> 3;
+                        final int mask = 1 << (hash & 0x7);
+                        final byte val = (byte) (buffer.get(pos) | mask);
+                        buffer.set(pos, val);
+                    }
+                    bloomFilters.add(new BloomFilter(field, indexOut.getFilePointer(), bloomFilterSize));
+                    final BytesReference bytes = BytesReference.fromByteArray(buffer, numBytes);
+                    bytes.writeTo(new IndexOutputOutputStream(indexOut));
+                }
+            }
+        }
+
+        @Override
+        public void close() throws IOException {
+            if (closed) {
+                return;
+            }
+            closed = true;
+            try {
+                CodecUtil.writeFooter(indexOut);
+            } finally {
+                IOUtils.close(toCloses);
+            }
+            try (IndexOutput metaOut = state.directory.createOutput(metaFile(state.segmentInfo, state.segmentSuffix), state.context)) {
+                CodecUtil.writeIndexHeader(metaOut, BLOOM_CODEC_NAME, VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix);
+                // write postings formats
+                metaOut.writeVInt(fieldsGroups.size());
+                for (FieldsGroup group : fieldsGroups) {
+                    group.writeTo(metaOut, state.fieldInfos);
+                }
+                // Write bloom filters
+                metaOut.writeVInt(bloomFilters.size());
+                for (BloomFilter bloomFilter : bloomFilters) {
+                    bloomFilter.writeTo(metaOut, state.fieldInfos);
+                }
+                CodecUtil.writeFooter(metaOut);
+            }
+        }
+    }
+
+    private record BloomFilter(String field, long startFilePointer, int bloomFilterSize) {
+        void writeTo(IndexOutput out, FieldInfos fieldInfos) throws IOException {
+            out.writeVInt(fieldInfos.fieldInfo(field).number);
+            out.writeVLong(startFilePointer);
+            out.writeVInt(bloomFilterSize);
+        }
+
+        static BloomFilter readFrom(IndexInput in, FieldInfos fieldInfos) throws IOException {
+            final String fieldName = fieldInfos.fieldInfo(in.readVInt()).name;
+            final long startFilePointer = in.readVLong();
+            final int bloomFilterSize = in.readVInt();
+            return new BloomFilter(fieldName, startFilePointer, bloomFilterSize);
+        }
+    }
+
+    private record FieldsGroup(PostingsFormat postingsFormat, String suffix, List<String> fields) {
+        void writeTo(IndexOutput out, FieldInfos fieldInfos) throws IOException {
+            out.writeString(postingsFormat.getName());
+            out.writeString(suffix);
+            out.writeVInt(fields.size());
+            for (String field : fields) {
+                out.writeVInt(fieldInfos.fieldInfo(field).number);
+
+            }
+        }
+
+        static FieldsGroup readFrom(IndexInput in, FieldInfos fieldInfos) throws IOException {
+            final PostingsFormat postingsFormat = forName(in.readString());
+            final String suffix = in.readString();
+            final int numFields = in.readVInt();
+            final List<String> fields = new ArrayList<>();
+            for (int i = 0; i < numFields; i++) {
+                fields.add(fieldInfos.fieldInfo(in.readVInt()).name);
+            }
+            return new FieldsGroup(postingsFormat, suffix, fields);
+        }
+    }
+
+    static final class FieldsReader extends FieldsProducer {
+        private final Map<String, BloomFilter> bloomFilters;
+        private final List<Closeable> toCloses = new ArrayList<>();
+        private final Map<String, FieldsProducer> readerMap = new HashMap<>();
+        private final IndexInput indexIn;
+
+        FieldsReader(SegmentReadState state) throws IOException {
+            boolean success = false;
+            try (
+                ChecksumIndexInput metaIn = state.directory.openChecksumInput(
+                    metaFile(state.segmentInfo, state.segmentSuffix),
+                    IOContext.READONCE
+                )
+            ) {
+                CodecUtil.checkIndexHeader(
+                    metaIn,
+                    BLOOM_CODEC_NAME,
+                    VERSION_START,
+                    VERSION_CURRENT,
+                    state.segmentInfo.getId(),
+                    state.segmentSuffix
+                );
+                // read postings formats
+                final int numFieldsGroups = metaIn.readVInt();
+                for (int i = 0; i < numFieldsGroups; i++) {
+                    final FieldsGroup group = FieldsGroup.readFrom(metaIn, state.fieldInfos);
+                    final FieldsProducer reader = group.postingsFormat.fieldsProducer(new SegmentReadState(state, group.suffix));
+                    toCloses.add(reader);
+                    for (String field : group.fields) {
+                        readerMap.put(field, reader);
+                    }
+                }
+                // read bloom filters
+                final int numBloomFilters = metaIn.readVInt();
+                bloomFilters = new HashMap<>(numBloomFilters);
+                for (int i = 0; i < numBloomFilters; i++) {
+                    final BloomFilter bloomFilter = BloomFilter.readFrom(metaIn, state.fieldInfos);
+                    assert bloomFilter.bloomFilterSize == bloomFilterSize(state.segmentInfo.maxDoc())
+                        : "bloom_filter=" + bloomFilter + ", max_docs=" + state.segmentInfo.maxDoc();
+                    bloomFilters.put(bloomFilter.field, bloomFilter);
+                }
+                CodecUtil.checkFooter(metaIn);
+                indexIn = state.directory.openInput(indexFile(state.segmentInfo, state.segmentSuffix), state.context);
+                toCloses.add(indexIn);
+                CodecUtil.checkIndexHeader(
+                    indexIn,
+                    BLOOM_CODEC_NAME,
+                    VERSION_START,
+                    VERSION_CURRENT,
+                    state.segmentInfo.getId(),
+                    state.segmentSuffix
+                );
+                CodecUtil.retrieveChecksum(indexIn);
+                success = true;
+            } finally {
+                if (success == false) {
+                    IOUtils.closeWhileHandlingException(toCloses);
+                }
+            }
+        }
+
+        @Override
+        public Iterator<String> iterator() {
+            return readerMap.keySet().iterator();
+        }
+
+        @Override
+        public void close() throws IOException {
+            IOUtils.close(toCloses);
+        }
+
+        @Override
+        public Terms terms(String field) throws IOException {
+            final FieldsProducer reader = readerMap.get(field);
+            if (reader == null) {
+                return null;
+            }
+            final Terms terms = reader.terms(field);
+            if (terms == null) {
+                return null;
+            }
+            final BloomFilter bloomFilter = bloomFilters.get(field);
+            if (bloomFilter != null) {
+                final RandomAccessInput data = indexIn.randomAccessSlice(
+                    bloomFilter.startFilePointer(),
+                    numBytesForBloomFilter(bloomFilter.bloomFilterSize)
+                );
+                return new BloomFilterTerms(terms, data, bloomFilter.bloomFilterSize);
+            } else {
+                return terms;
+            }
+        }
+
+        @Override
+        public int size() {
+            return readerMap.size();
+        }
+
+        @Override
+        public void checkIntegrity() throws IOException {
+            // already fully checked the meta file; let's fully checked the index file.
+            CodecUtil.checksumEntireFile(indexIn);
+            // multiple fields can share the same reader
+            final Set<FieldsProducer> seenReaders = new HashSet<>();
+            for (FieldsProducer reader : readerMap.values()) {
+                if (seenReaders.add(reader)) {
+                    reader.checkIntegrity();
+                }
+            }
+        }
+    }
+
+    private static class BloomFilterTerms extends FilterLeafReader.FilterTerms {
+        private final RandomAccessInput data;
+        private final int bloomFilterSize;
+
+        BloomFilterTerms(Terms in, RandomAccessInput data, int bloomFilterSize) {
+            super(in);
+            this.data = data;
+            this.bloomFilterSize = bloomFilterSize;
+        }
+
+        private boolean mayContainTerm(BytesRef term) throws IOException {
+            final int hash = hashTerm(term) % bloomFilterSize;
+            final int pos = hash >> 3;
+            final int mask = 1 << (hash & 0x7);
+            final byte bits = data.readByte(pos);
+            return (bits & mask) != 0;
+        }
+
+        @Override
+        public TermsEnum iterator() throws IOException {
+            return new LazyFilterTermsEnum() {
+                private TermsEnum delegate;
+
+                @Override
+                TermsEnum getDelegate() throws IOException {
+                    if (delegate == null) {
+                        delegate = in.iterator();
+                    }
+                    return delegate;
+                }
+
+                @Override
+                public boolean seekExact(BytesRef term) throws IOException {
+                    if (mayContainTerm(term)) {
+                        return getDelegate().seekExact(term);
+                    } else {
+                        return false;
+                    }
+                }
+
+                @Override
+                public void seekExact(BytesRef term, TermState state) throws IOException {
+                    getDelegate().seekExact(term, state);
+                }
+
+                @Override
+                public TermState termState() throws IOException {
+                    // TODO: return TermState that includes BloomFilter and fix _disk_usage API
+                    return getDelegate().termState();
+                }
+            };
+        }
+    }
+
+    private abstract static class LazyFilterTermsEnum extends BaseTermsEnum {
+        abstract TermsEnum getDelegate() throws IOException;
+
+        @Override
+        public SeekStatus seekCeil(BytesRef text) throws IOException {
+            return getDelegate().seekCeil(text);
+        }
+
+        @Override
+        public void seekExact(long ord) throws IOException {
+            getDelegate().seekExact(ord);
+        }
+
+        @Override
+        public BytesRef term() throws IOException {
+            return getDelegate().term();
+        }
+
+        @Override
+        public long ord() throws IOException {
+            return getDelegate().ord();
+        }
+
+        @Override
+        public int docFreq() throws IOException {
+            return getDelegate().docFreq();
+        }
+
+        @Override
+        public long totalTermFreq() throws IOException {
+            return getDelegate().totalTermFreq();
+        }
+
+        @Override
+        public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException {
+            return getDelegate().postings(reuse, flags);
+        }
+
+        @Override
+        public ImpactsEnum impacts(int flags) throws IOException {
+            return getDelegate().impacts(flags);
+        }
+
+        @Override
+        public BytesRef next() throws IOException {
+            return getDelegate().next();
+        }
+
+        @Override
+        public AttributeSource attributes() {
+            try {
+                return getDelegate().attributes();
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        }
+    }
+
+    static int bloomFilterSize(int maxDocs) {
+        // 10% saturation (i.e., 10 bits for each term)
+        final long numBits = maxDocs * 10L;
+        if (numBits > Integer.MAX_VALUE) {
+            return Integer.MAX_VALUE;
+        } else {
+            return (int) numBits;
+        }
+    }
+
+    static int numBytesForBloomFilter(int bloomFilterSize) {
+        return Math.toIntExact((bloomFilterSize + 7L) / 8L);
+    }
+
+    static int hashTerm(BytesRef br) {
+        final int hash = murmurhash3_x86_32(br.bytes, br.offset, br.length, 0x9747b28c);
+        return hash & 0x7FFF_FFFF;
+    }
+
+    /**
+     * Forked from Lucene's StringHelper#murmurhash3_x86_32 so that changes to the Lucene implementation
+     * do not break the compatibility of this format.
+     */
+    @SuppressWarnings("fallthrough")
+    private static int murmurhash3_x86_32(byte[] data, int offset, int len, int seed) {
+        final int c1 = 0xcc9e2d51;
+        final int c2 = 0x1b873593;
+
+        int h1 = seed;
+        int roundedEnd = offset + (len & 0xfffffffc); // round down to 4 byte block
+
+        for (int i = offset; i < roundedEnd; i += 4) {
+            // little endian load order
+            int k1 = (int) BitUtil.VH_LE_INT.get(data, i);
+            k1 *= c1;
+            k1 = Integer.rotateLeft(k1, 15);
+            k1 *= c2;
+
+            h1 ^= k1;
+            h1 = Integer.rotateLeft(h1, 13);
+            h1 = h1 * 5 + 0xe6546b64;
+        }
+
+        // tail
+        int k1 = 0;
+
+        switch (len & 0x03) {
+            case 3:
+                k1 = (data[roundedEnd + 2] & 0xff) << 16;
+                // fallthrough
+            case 2:
+                k1 |= (data[roundedEnd + 1] & 0xff) << 8;
+                // fallthrough
+            case 1:
+                k1 |= (data[roundedEnd] & 0xff);
+                k1 *= c1;
+                k1 = Integer.rotateLeft(k1, 15);
+                k1 *= c2;
+                h1 ^= k1;
+        }
+
+        // finalization
+        h1 ^= len;
+
+        // fmix(h1);
+        h1 ^= h1 >>> 16;
+        h1 *= 0x85ebca6b;
+        h1 ^= h1 >>> 13;
+        h1 *= 0xc2b2ae35;
+        h1 ^= h1 >>> 16;
+
+        return h1;
+    }
+}

+ 1 - 1
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -313,7 +313,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         assert shardRouting.initializing();
         this.shardRouting = shardRouting;
         final Settings settings = indexSettings.getSettings();
-        this.codecService = new CodecService(mapperService);
+        this.codecService = new CodecService(mapperService, bigArrays);
         this.warmer = warmer;
         this.similarityService = similarityService;
         Objects.requireNonNull(store, "Store must be provided to the index shard");

+ 3 - 0
server/src/main/java/org/elasticsearch/index/store/LuceneFilesExtensions.java

@@ -18,6 +18,9 @@ import java.util.Objects;
 
 public enum LuceneFilesExtensions {
 
+    // Elasticsearch BloomFilterPostingsFormat
+    BFI("bfi", "BloomFilter Index", false, true),
+    BFM("bfm", "BloomFilter Metadata", true, false),
     CFE("cfe", "Compound Files Entries", true, false),
     // Compound files are tricky because they store all the information for the segment. Benchmarks
     // suggested that not mapping them hurts performance.

+ 1 - 0
server/src/main/resources/META-INF/services/org.apache.lucene.codecs.PostingsFormat

@@ -0,0 +1 @@
+org.elasticsearch.index.codec.bloomfilter.ES85BloomFilterPostingsFormat

+ 2 - 1
server/src/test/java/org/elasticsearch/index/codec/CodecTests.java

@@ -19,6 +19,7 @@ import org.apache.lucene.index.SegmentReader;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.tests.util.LuceneTestCase.SuppressCodecs;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.analysis.IndexAnalyzers;
@@ -93,7 +94,7 @@ public class CodecTests extends ESTestCase {
             settings.getMode().idFieldMapperWithoutFieldData(),
             ScriptCompiler.NONE
         );
-        return new CodecService(service);
+        return new CodecService(service, BigArrays.NON_RECYCLING_INSTANCE);
     }
 
 }

+ 127 - 0
server/src/test/java/org/elasticsearch/index/codec/bloomfilter/ES85BloomFilterPostingsFormatTests.java

@@ -0,0 +1,127 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.index.codec.bloomfilter;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+
+import org.apache.lucene.codecs.Codec;
+import org.apache.lucene.codecs.PostingsFormat;
+import org.apache.lucene.codecs.perfield.PerFieldPostingsFormat;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.tests.index.BasePostingsFormatTestCase;
+import org.apache.lucene.tests.util.TestUtil;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefBuilder;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.GraalVMThreadsFilter;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+
+@ThreadLeakFilters(filters = { GraalVMThreadsFilter.class })
+public class ES85BloomFilterPostingsFormatTests extends BasePostingsFormatTestCase {
+
+    @Override
+    protected Codec getCodec() {
+        return TestUtil.alwaysPostingsFormat(new ES85BloomFilterPostingsFormat(BigArrays.NON_RECYCLING_INSTANCE, field -> {
+            PostingsFormat postingsFormat = Codec.getDefault().postingsFormat();
+            if (postingsFormat instanceof PerFieldPostingsFormat) {
+                postingsFormat = TestUtil.getDefaultPostingsFormat();
+            }
+            return postingsFormat;
+        }));
+    }
+
+    public void testBloomFilterSize() {
+        assertThat(ES85BloomFilterPostingsFormat.bloomFilterSize(1000), equalTo(10_000));
+        assertThat(ES85BloomFilterPostingsFormat.bloomFilterSize(IndexWriter.MAX_DOCS - random().nextInt(10)), equalTo(Integer.MAX_VALUE));
+        assertThat(ES85BloomFilterPostingsFormat.numBytesForBloomFilter(16384), equalTo(2048));
+        assertThat(ES85BloomFilterPostingsFormat.numBytesForBloomFilter(16383), equalTo(2048));
+        assertThat(ES85BloomFilterPostingsFormat.numBytesForBloomFilter(Integer.MAX_VALUE), equalTo(1 << 28));
+    }
+
+    public void testHashTerms() {
+        Map<String, Integer> testStrings = Map.of(
+            "hello",
+            1568626408,
+            "elasticsearch",
+            1410942402,
+            "elastic",
+            255526858,
+            "java",
+            684588044,
+            "lucene",
+            881308315,
+            "bloom_filter",
+            83797118,
+            "",
+            1807139368
+        );
+        for (Map.Entry<String, Integer> e : testStrings.entrySet()) {
+            String term = e.getKey();
+            BytesRef byteRef = randomBytesRef(term.getBytes(StandardCharsets.UTF_8));
+            int hash = ES85BloomFilterPostingsFormat.hashTerm(byteRef);
+            assertThat("term=" + term, hash, equalTo(e.getValue()));
+        }
+
+        Map<byte[], Integer> testBytes = Map.of(
+            new byte[] { 126, 49, -19, -128, 4, -77, 114, -61, 104, -58, -35, 113, 107 },
+            1155258673,
+            new byte[] { -50, 83, -18, 81, -44, -75, -77, 124, -76, 62, -16, 99, 75, -55, 119 },
+            973344634,
+            new byte[] { 110, -26, 71, -17, -113, -83, 58, 31, 13, -32, 38, -61, -97, -104, -9, -38 },
+            1950254802,
+            new byte[] { -20, 20, -88, 12, 5, -38, -50, 33, -21, -13, 90, 37, 28, -35, 107, 93, 30, -32, -76, 38 },
+            1123005351,
+            new byte[] { 88, -112, -11, -59, -103, 5, -107, -56, 14, 31, 2, -5, 67, -108, -125, 42, 28 },
+            1411536425,
+            new byte[] { 114, 82, -59, -103, 0, 7, -77 },
+            1883229848,
+            new byte[] { 34, 91, -26, 90, 21, -64, -72, 0, 101, -12, -33, 27, 119, 77, -13, 39, -60, -53 },
+            603518683,
+            new byte[] { 3, -68, -103, -125, 74, 122, -64, -19 },
+            84707471,
+            new byte[] { 0 },
+            691257000,
+            new byte[] { 1 },
+            955192589
+        );
+        for (Map.Entry<byte[], Integer> e : testBytes.entrySet()) {
+            byte[] term = e.getKey();
+            final BytesRef bytesRef = randomBytesRef(term);
+            int hash = ES85BloomFilterPostingsFormat.hashTerm(bytesRef);
+            assertThat("term=" + Arrays.toString(term), hash, equalTo(e.getValue()));
+        }
+
+        byte[] bytes = ESTestCase.randomByteArrayOfLength(ESTestCase.between(0, 1000));
+        assertThat(ES85BloomFilterPostingsFormat.hashTerm(randomBytesRef(bytes)), greaterThanOrEqualTo(0));
+    }
+
+    private static BytesRef randomBytesRef(byte[] bytes) {
+        if (random().nextBoolean()) {
+            final BytesRefBuilder builder = new BytesRefBuilder();
+            // prefix
+            int offset = ESTestCase.randomIntBetween(0, 10);
+            builder.append(new BytesRef(ESTestCase.randomByteArrayOfLength(offset)));
+            // term
+            builder.append(bytes, 0, bytes.length);
+            // suffix
+            int suffixLength = ESTestCase.between(0, 10);
+            builder.append(new BytesRef(ESTestCase.randomByteArrayOfLength(suffixLength)));
+            return new BytesRef(builder.bytes(), offset, bytes.length);
+        } else {
+            return new BytesRef(bytes);
+        }
+    }
+}

+ 3 - 3
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -3122,7 +3122,7 @@ public class InternalEngineTests extends EngineTestCase {
     }
 
     public void testSettings() {
-        CodecService codecService = new CodecService(null);
+        CodecService codecService = newCodecService();
         LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig();
 
         assertEquals(engine.config().getCodec().getName(), codecService.codec(codecName).getName());
@@ -3560,7 +3560,7 @@ public class InternalEngineTests extends EngineTestCase {
             newMergePolicy(),
             config.getAnalyzer(),
             config.getSimilarity(),
-            new CodecService(null),
+            newCodecService(),
             config.getEventListener(),
             IndexSearcher.getDefaultQueryCache(),
             IndexSearcher.getDefaultQueryCachingPolicy(),
@@ -7228,7 +7228,7 @@ public class InternalEngineTests extends EngineTestCase {
                 config.getMergePolicy(),
                 config.getAnalyzer(),
                 config.getSimilarity(),
-                new CodecService(null),
+                newCodecService(),
                 config.getEventListener(),
                 config.getQueryCache(),
                 config.getQueryCachingPolicy(),

+ 2 - 1
server/src/test/java/org/elasticsearch/index/mapper/CompletionFieldMapperTests.java

@@ -29,6 +29,7 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.unit.Fuzziness;
+import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.analysis.AnalyzerScope;
@@ -136,7 +137,7 @@ public class CompletionFieldMapperTests extends MapperTestCase {
 
     public void testPostingsFormat() throws IOException {
         MapperService mapperService = createMapperService(fieldMapping(this::minimalMapping));
-        CodecService codecService = new CodecService(mapperService);
+        CodecService codecService = new CodecService(mapperService, BigArrays.NON_RECYCLING_INSTANCE);
         Codec codec = codecService.codec("default");
         assertThat(codec, instanceOf(PerFieldMapperCodec.class));
         PerFieldMapperCodec perFieldCodec = (PerFieldMapperCodec) codec;

+ 2 - 1
server/src/test/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapperTests.java

@@ -20,6 +20,7 @@ import org.apache.lucene.search.FieldExistsQuery;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.Version;
+import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.index.codec.CodecService;
 import org.elasticsearch.index.codec.PerFieldMapperCodec;
 import org.elasticsearch.index.mapper.DocumentMapper;
@@ -450,7 +451,7 @@ public class DenseVectorFieldMapperTests extends MapperTestCase {
             b.field("ef_construction", efConstruction);
             b.endObject();
         }));
-        CodecService codecService = new CodecService(mapperService);
+        CodecService codecService = new CodecService(mapperService, BigArrays.NON_RECYCLING_INSTANCE);
         Codec codec = codecService.codec("default");
         assertThat(codec, instanceOf(PerFieldMapperCodec.class));
         KnnVectorsFormat knnVectorsFormat = ((PerFieldMapperCodec) codec).getKnnVectorsFormatForField("field");

+ 2 - 1
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -55,6 +55,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@@ -4400,7 +4401,7 @@ public class IndexShardTests extends IndexShardTestCase {
                 config.getMergePolicy(),
                 config.getAnalyzer(),
                 config.getSimilarity(),
-                new CodecService(null),
+                new CodecService(null, BigArrays.NON_RECYCLING_INSTANCE),
                 config.getEventListener(),
                 config.getQueryCache(),
                 config.getQueryCachingPolicy(),

+ 1 - 1
server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java

@@ -137,7 +137,7 @@ public class RefreshListenersTests extends ESTestCase {
             newMergePolicy(),
             iwc.getAnalyzer(),
             iwc.getSimilarity(),
-            new CodecService(null),
+            new CodecService(null, BigArrays.NON_RECYCLING_INSTANCE),
             eventListener,
             IndexSearcher.getDefaultQueryCache(),
             IndexSearcher.getDefaultQueryCachingPolicy(),

+ 2 - 1
server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.index.codec.CodecService;
 import org.elasticsearch.index.engine.EngineConfig;
 import org.elasticsearch.index.engine.InternalEngine;
@@ -385,7 +386,7 @@ public class IndexingMemoryControllerTests extends IndexShardTestCase {
             config.getMergePolicy(),
             config.getAnalyzer(),
             config.getSimilarity(),
-            new CodecService(null),
+            new CodecService(null, BigArrays.NON_RECYCLING_INSTANCE),
             config.getEventListener(),
             config.getQueryCache(),
             config.getQueryCachingPolicy(),

+ 10 - 6
test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

@@ -205,7 +205,7 @@ public abstract class EngineTestCase extends ESTestCase {
     public void setUp() throws Exception {
         super.setUp();
         primaryTerm.set(randomLongBetween(1, Long.MAX_VALUE));
-        CodecService codecService = new CodecService(null);
+        CodecService codecService = newCodecService();
         String name = Codec.getDefault().getName();
         if (Arrays.asList(codecService.availableCodecs()).contains(name)) {
             // some codecs are read only so we only take the ones that we have in the service and randomly
@@ -259,7 +259,7 @@ public abstract class EngineTestCase extends ESTestCase {
             config.getMergePolicy(),
             config.getAnalyzer(),
             config.getSimilarity(),
-            new CodecService(null),
+            newCodecService(),
             config.getEventListener(),
             config.getQueryCache(),
             config.getQueryCachingPolicy(),
@@ -287,7 +287,7 @@ public abstract class EngineTestCase extends ESTestCase {
             config.getMergePolicy(),
             analyzer,
             config.getSimilarity(),
-            new CodecService(null),
+            newCodecService(),
             config.getEventListener(),
             config.getQueryCache(),
             config.getQueryCachingPolicy(),
@@ -315,7 +315,7 @@ public abstract class EngineTestCase extends ESTestCase {
             mergePolicy,
             config.getAnalyzer(),
             config.getSimilarity(),
-            new CodecService(null),
+            newCodecService(),
             config.getEventListener(),
             config.getQueryCache(),
             config.getQueryCachingPolicy(),
@@ -833,7 +833,7 @@ public abstract class EngineTestCase extends ESTestCase {
             mergePolicy,
             iwc.getAnalyzer(),
             iwc.getSimilarity(),
-            new CodecService(null),
+            newCodecService(),
             eventListener,
             IndexSearcher.getDefaultQueryCache(),
             IndexSearcher.getDefaultQueryCachingPolicy(),
@@ -869,7 +869,7 @@ public abstract class EngineTestCase extends ESTestCase {
             config.getMergePolicy(),
             config.getAnalyzer(),
             config.getSimilarity(),
-            new CodecService(null),
+            newCodecService(),
             config.getEventListener(),
             config.getQueryCache(),
             config.getQueryCachingPolicy(),
@@ -1614,4 +1614,8 @@ public abstract class EngineTestCase extends ESTestCase {
         // hard fail - we can't get the lazybits
         throw new IllegalStateException("Can not extract lazy bits from given index reader [" + reader + "]");
     }
+
+    static CodecService newCodecService() {
+        return new CodecService(null, BigArrays.NON_RECYCLING_INSTANCE);
+    }
 }

+ 3 - 1
test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

@@ -452,7 +452,9 @@ public abstract class ESIntegTestCase extends ESTestCase {
                 RandomNumbers.randomIntBetween(random, 1, 15) + "ms"
             );
         }
-
+        if (randomBoolean()) {
+            builder.put(IndexSettings.BLOOM_FILTER_ID_FIELD_ENABLED_SETTING.getKey(), randomBoolean());
+        }
         return builder;
     }
 

+ 2 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java

@@ -509,7 +509,8 @@ public class TransportResumeFollowAction extends AcknowledgedTransportMasterNode
         MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING,
         MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING,
         EngineConfig.INDEX_CODEC_SETTING,
-        DataTier.TIER_PREFERENCE_SETTING
+        DataTier.TIER_PREFERENCE_SETTING,
+        IndexSettings.BLOOM_FILTER_ID_FIELD_ENABLED_SETTING
     );
 
     public static Settings filter(Settings originalSettings) {

+ 1 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java

@@ -267,7 +267,7 @@ public class FollowingEngineTests extends ESTestCase {
             newMergePolicy(),
             indexWriterConfig.getAnalyzer(),
             indexWriterConfig.getSimilarity(),
-            new CodecService(null),
+            new CodecService(null, BigArrays.NON_RECYCLING_INSTANCE),
             new Engine.EventListener() {
                 @Override
                 public void onFailedEngine(String reason, Exception e) {