Просмотр исходного кода

TSDB: Build `_id` without reparsing (#88789)

This replaces the code that build `_id` in tsid indices that used to
re-parse the entire json object with one that reuses the parsed values.
It speed up writes by about 4%. Here's the rally output:

```
|    Min Throughput |  8164.67 |  8547.24 | docs/s | +4.69% |
|   Mean Throughput |  8891.11 |  9256.75 | docs/s | +4.11% |
| Median Throughput |  8774.52 |  9134.15 | docs/s | +4.10% |
|    Max Throughput | 10246.7  | 10482.3  | docs/s | +2.30% |
```
Nik Everett 3 лет назад
Родитель
Сommit
82ad45f411

+ 5 - 0
docs/changelog/88789.yaml

@@ -0,0 +1,5 @@
+pr: 88789
+summary: "TSDB: Build `_id` without reparsing"
+area: "TSDB"
+type: enhancement
+issues: []

+ 81 - 66
server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.IntConsumer;
+import java.util.function.IntSupplier;
 
 import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
 
@@ -251,26 +252,33 @@ public abstract class IndexRouting {
         public int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source) {
             assert Transports.assertNotTransportThread("parsing the _source can get slow");
             checkNoRouting(routing);
-            return hashToShardId(hashSource(sourceType, source));
+            return hashToShardId(hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty));
         }
 
         public String createId(XContentType sourceType, BytesReference source, byte[] suffix) {
-            return createId(hashSource(sourceType, source), suffix);
+            return hashSource(sourceType, source).createId(suffix, IndexRouting.ExtractFromSource::defaultOnEmpty);
         }
 
         public String createId(Map<String, Object> flat, byte[] suffix) {
-            return createId(hashSource(flat), suffix);
+            Builder b = builder();
+            for (Map.Entry<String, Object> e : flat.entrySet()) {
+                if (Regex.simpleMatch(routingPaths, e.getKey())) {
+                    b.hashes.add(new NameAndHash(new BytesRef(e.getKey()), hash(new BytesRef(e.getValue().toString()))));
+                }
+            }
+            return b.createId(suffix, IndexRouting.ExtractFromSource::defaultOnEmpty);
         }
 
-        private static String createId(int routingHash, byte[] suffix) {
-            byte[] idBytes = new byte[4 + suffix.length];
-            ByteUtils.writeIntLE(routingHash, idBytes, 0);
-            System.arraycopy(suffix, 0, idBytes, 4, suffix.length);
-            return Base64.getUrlEncoder().withoutPadding().encodeToString(idBytes);
+        private static int defaultOnEmpty() {
+            throw new IllegalArgumentException("Error extracting routing: source didn't contain any routing fields");
         }
 
-        private int hashSource(XContentType sourceType, BytesReference source) {
-            List<NameAndHash> hashes = new ArrayList<>();
+        public Builder builder() {
+            return new Builder();
+        }
+
+        private Builder hashSource(XContentType sourceType, BytesReference source) {
+            Builder b = builder();
             try {
                 try (XContentParser parser = sourceType.xContent().createParser(parserConfig, source.streamInput())) {
                     parser.nextToken(); // Move to first token
@@ -278,82 +286,89 @@ public abstract class IndexRouting {
                         throw new IllegalArgumentException("Error extracting routing: source didn't contain any routing fields");
                     }
                     parser.nextToken();
-                    extractObject(hashes, null, parser);
+                    b.extractObject(null, parser);
                     ensureExpectedToken(null, parser.nextToken(), parser);
                 }
             } catch (IOException | ParsingException e) {
                 throw new IllegalArgumentException("Error extracting routing: " + e.getMessage(), e);
             }
-            return hashesToHash(hashes);
+            return b;
         }
 
-        private static void extractObject(List<NameAndHash> hashes, @Nullable String path, XContentParser source) throws IOException {
-            while (source.currentToken() != Token.END_OBJECT) {
-                ensureExpectedToken(Token.FIELD_NAME, source.currentToken(), source);
-                String fieldName = source.currentName();
-                String subPath = path == null ? fieldName : path + "." + fieldName;
-                source.nextToken();
-                extractItem(hashes, subPath, source);
+        public class Builder {
+            private final List<NameAndHash> hashes = new ArrayList<>();
+
+            public void addMatching(String fieldName, BytesRef string) {
+                if (Regex.simpleMatch(routingPaths, fieldName)) {
+                    hashes.add(new NameAndHash(new BytesRef(fieldName), hash(string)));
+                }
             }
-        }
 
-        private static void extractItem(List<NameAndHash> hashes, String path, XContentParser source) throws IOException {
-            switch (source.currentToken()) {
-                case START_OBJECT:
-                    source.nextToken();
-                    extractObject(hashes, path, source);
-                    source.nextToken();
-                    break;
-                case VALUE_STRING:
-                    hashes.add(new NameAndHash(new BytesRef(path), hash(new BytesRef(source.text()))));
-                    source.nextToken();
-                    break;
-                case VALUE_NULL:
+            public String createId(byte[] suffix, IntSupplier onEmpty) {
+                byte[] idBytes = new byte[4 + suffix.length];
+                ByteUtils.writeIntLE(buildHash(onEmpty), idBytes, 0);
+                System.arraycopy(suffix, 0, idBytes, 4, suffix.length);
+                return Base64.getUrlEncoder().withoutPadding().encodeToString(idBytes);
+            }
+
+            private void extractObject(@Nullable String path, XContentParser source) throws IOException {
+                while (source.currentToken() != Token.END_OBJECT) {
+                    ensureExpectedToken(Token.FIELD_NAME, source.currentToken(), source);
+                    String fieldName = source.currentName();
+                    String subPath = path == null ? fieldName : path + "." + fieldName;
                     source.nextToken();
-                    break;
-                default:
-                    throw new ParsingException(
-                        source.getTokenLocation(),
-                        "Routing values must be strings but found [{}]",
-                        source.currentToken()
-                    );
+                    extractItem(subPath, source);
+                }
             }
-        }
 
-        private int hashSource(Map<String, Object> flat) {
-            List<NameAndHash> hashes = new ArrayList<>();
-            for (Map.Entry<String, Object> e : flat.entrySet()) {
-                if (Regex.simpleMatch(routingPaths, e.getKey())) {
-                    hashes.add(new NameAndHash(new BytesRef(e.getKey()), hash(new BytesRef(e.getValue().toString()))));
+            private void extractItem(String path, XContentParser source) throws IOException {
+                switch (source.currentToken()) {
+                    case START_OBJECT:
+                        source.nextToken();
+                        extractObject(path, source);
+                        source.nextToken();
+                        break;
+                    case VALUE_STRING:
+                        hashes.add(new NameAndHash(new BytesRef(path), hash(new BytesRef(source.text()))));
+                        source.nextToken();
+                        break;
+                    case VALUE_NULL:
+                        source.nextToken();
+                        break;
+                    default:
+                        throw new ParsingException(
+                            source.getTokenLocation(),
+                            "Routing values must be strings but found [{}]",
+                            source.currentToken()
+                        );
+                }
+            }
+
+            private int buildHash(IntSupplier onEmpty) {
+                Collections.sort(hashes);
+                Iterator<NameAndHash> itr = hashes.iterator();
+                if (itr.hasNext() == false) {
+                    return onEmpty.getAsInt();
                 }
+                NameAndHash prev = itr.next();
+                int hash = hash(prev.name) ^ prev.hash;
+                while (itr.hasNext()) {
+                    NameAndHash next = itr.next();
+                    if (prev.name.equals(next.name)) {
+                        throw new IllegalArgumentException("Duplicate routing dimension for [" + next.name + "]");
+                    }
+                    int thisHash = hash(next.name) ^ next.hash;
+                    hash = 31 * hash + thisHash;
+                    prev = next;
+                }
+                return hash;
             }
-            return hashesToHash(hashes);
         }
 
         private static int hash(BytesRef ref) {
             return StringHelper.murmurhash3_x86_32(ref, 0);
         }
 
-        private static int hashesToHash(List<NameAndHash> hashes) {
-            Collections.sort(hashes);
-            Iterator<NameAndHash> itr = hashes.iterator();
-            if (itr.hasNext() == false) {
-                throw new IllegalArgumentException("Error extracting routing: source didn't contain any routing fields");
-            }
-            NameAndHash prev = itr.next();
-            int hash = hash(prev.name) ^ prev.hash;
-            while (itr.hasNext()) {
-                NameAndHash next = itr.next();
-                if (prev.name.equals(next.name)) {
-                    throw new IllegalArgumentException("Duplicate routing dimension for [" + next.name + "]");
-                }
-                int thisHash = hash(next.name) ^ next.hash;
-                hash = 31 * hash + thisHash;
-                prev = next;
-            }
-            return hash;
-        }
-
         @Override
         public int updateShard(String id, @Nullable String routing) {
             throw new IllegalArgumentException(error("update"));

+ 6 - 4
server/src/main/java/org/elasticsearch/index/IndexMode.java

@@ -10,6 +10,7 @@ package org.elasticsearch.index;
 
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService;
+import org.elasticsearch.cluster.routing.IndexRouting;
 import org.elasticsearch.common.compress.CompressedXContent;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
@@ -101,7 +102,7 @@ public enum IndexMode {
         }
 
         @Override
-        public DocumentDimensions buildDocumentDimensions() {
+        public DocumentDimensions buildDocumentDimensions(IndexSettings settings) {
             return new DocumentDimensions.OnlySingleValueAllowed();
         }
 
@@ -186,8 +187,9 @@ public enum IndexMode {
         }
 
         @Override
-        public DocumentDimensions buildDocumentDimensions() {
-            return new TimeSeriesIdFieldMapper.TimeSeriesIdBuilder();
+        public DocumentDimensions buildDocumentDimensions(IndexSettings settings) {
+            IndexRouting.ExtractFromSource routing = (IndexRouting.ExtractFromSource) settings.getIndexRouting();
+            return new TimeSeriesIdFieldMapper.TimeSeriesIdBuilder(routing.builder());
         }
 
         @Override
@@ -301,7 +303,7 @@ public enum IndexMode {
     /**
      * How {@code time_series_dimension} fields are handled by indices in this mode.
      */
-    public abstract DocumentDimensions buildDocumentDimensions();
+    public abstract DocumentDimensions buildDocumentDimensions(IndexSettings settings);
 
     /**
      * @return Whether timestamps should be validated for being withing the time range of an index.

+ 1 - 1
server/src/main/java/org/elasticsearch/index/mapper/DocumentParserContext.java

@@ -127,7 +127,7 @@ public abstract class DocumentParserContext {
         this.newFieldsSeen = new HashSet<>();
         this.dynamicObjectMappers = new HashMap<>();
         this.dynamicRuntimeFields = new ArrayList<>();
-        this.dimensions = indexSettings.getMode().buildDocumentDimensions();
+        this.dimensions = indexSettings.getMode().buildDocumentDimensions(indexSettings);
     }
 
     public final IndexSettings indexSettings() {

+ 17 - 2
server/src/main/java/org/elasticsearch/index/mapper/TimeSeriesIdFieldMapper.java

@@ -12,11 +12,13 @@ import org.apache.lucene.document.SortedDocValuesField;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.util.ByteBlockPool;
 import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.cluster.routing.IndexRouting;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.network.NetworkAddress;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.fielddata.FieldData;
 import org.elasticsearch.index.fielddata.FieldDataContext;
@@ -145,7 +147,7 @@ public class TimeSeriesIdFieldMapper extends MetadataFieldMapper {
         TimeSeriesIdBuilder timeSeriesIdBuilder = (TimeSeriesIdBuilder) context.getDimensions();
         BytesRef timeSeriesId = timeSeriesIdBuilder.build().toBytesRef();
         context.doc().add(new SortedDocValuesField(fieldType().name(), timeSeriesId));
-        TsidExtractingIdFieldMapper.createField(context, timeSeriesId);
+        TsidExtractingIdFieldMapper.createField(context, timeSeriesIdBuilder.routingBuilder, timeSeriesId);
     }
 
     @Override
@@ -190,6 +192,15 @@ public class TimeSeriesIdFieldMapper extends MetadataFieldMapper {
          * to build the _tsid field for the document.
          */
         private final SortedMap<BytesRef, BytesReference> dimensions = new TreeMap<>();
+        /**
+         * Builds the routing. Used for building {@code _id}. If null then skipped.
+         */
+        @Nullable
+        private final IndexRouting.ExtractFromSource.Builder routingBuilder;
+
+        public TimeSeriesIdBuilder(@Nullable IndexRouting.ExtractFromSource.Builder routingBuilder) {
+            this.routingBuilder = routingBuilder;
+        }
 
         public BytesReference build() throws IOException {
             if (dimensions.isEmpty()) {
@@ -228,7 +239,7 @@ public class TimeSeriesIdFieldMapper extends MetadataFieldMapper {
                 out.write((byte) 's');
                 /*
                  * Write in utf8 instead of StreamOutput#writeString which is utf-16-ish
-                 * so its easier for folks to reason about the space taken up. Mostly
+                 * so it's easier for folks to reason about the space taken up. Mostly
                  * it'll be smaller too.
                  */
                 BytesRef bytes = new BytesRef(value);
@@ -239,6 +250,10 @@ public class TimeSeriesIdFieldMapper extends MetadataFieldMapper {
                 }
                 out.writeBytesRef(bytes);
                 add(fieldName, out.bytes());
+
+                if (routingBuilder != null) {
+                    routingBuilder.addMatching(fieldName, bytes);
+                }
             } catch (IOException e) {
                 throw new IllegalArgumentException("Dimension field cannot be serialized.", e);
             }

+ 13 - 3
server/src/main/java/org/elasticsearch/index/mapper/TsidExtractingIdFieldMapper.java

@@ -108,7 +108,7 @@ public class TsidExtractingIdFieldMapper extends IdFieldMapper {
 
     private static final long SEED = 0;
 
-    public static void createField(DocumentParserContext context, BytesRef tsid) {
+    public static void createField(DocumentParserContext context, IndexRouting.ExtractFromSource.Builder routingBuilder, BytesRef tsid) {
         IndexableField[] timestampFields = context.rootDoc().getFields(DataStreamTimestampFieldMapper.DEFAULT_PATH);
         if (timestampFields.length == 0) {
             throw new IllegalArgumentException(
@@ -125,8 +125,15 @@ public class TsidExtractingIdFieldMapper extends IdFieldMapper {
         ByteUtils.writeLongBE(timestamp, suffix, 8);   // Big Ending shrinks the inverted index by ~37%
 
         IndexRouting.ExtractFromSource indexRouting = (IndexRouting.ExtractFromSource) context.indexSettings().getIndexRouting();
-        // TODO it'd be way faster to use the fields that we've extract here rather than the source or parse the tsid
-        String id = indexRouting.createId(context.sourceToParse().getXContentType(), context.sourceToParse().source(), suffix);
+        String id = routingBuilder.createId(suffix, () -> {
+            if (context.getDynamicMappers().isEmpty() == false) {
+                throw new IllegalStateException(
+                    "Didn't find any fields to include in the routing which would be fine if there are"
+                        + " dynamic mapping waiting but we couldn't find any of those either!"
+                );
+            }
+            return 0;
+        });
         assert Uid.isURLBase64WithoutPadding(id); // Make sure we get to use Uid's nice optimizations
         /*
          * Make sure that _id from extracting the tsid matches that _id
@@ -140,6 +147,9 @@ public class TsidExtractingIdFieldMapper extends IdFieldMapper {
         assert context.getDynamicMappers().isEmpty() == false
             || context.getDynamicRuntimeFields().isEmpty() == false
             || id.equals(indexRouting.createId(TimeSeriesIdFieldMapper.decodeTsid(tsid), suffix));
+        assert context.getDynamicMappers().isEmpty() == false
+            || context.getDynamicRuntimeFields().isEmpty() == false
+            || id.equals(indexRouting.createId(context.sourceToParse().getXContentType(), context.sourceToParse().source(), suffix));
 
         if (context.sourceToParse().id() != null && false == context.sourceToParse().id().equals(id)) {
             throw new IllegalArgumentException(

+ 1 - 1
server/src/main/java/org/elasticsearch/search/DocValueFormat.java

@@ -704,7 +704,7 @@ public interface DocValueFormat extends NamedWriteable {
             }
 
             Map<?, ?> m = (Map<?, ?>) value;
-            TimeSeriesIdBuilder builder = new TimeSeriesIdBuilder();
+            TimeSeriesIdBuilder builder = new TimeSeriesIdBuilder(null);
             for (Map.Entry<?, ?> entry : m.entrySet()) {
                 String f = entry.getKey().toString();
                 Object v = entry.getValue();

+ 9 - 1
server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java

@@ -644,8 +644,16 @@ public class IndexRoutingTests extends ESTestCase {
         IndexRouting.ExtractFromSource r = (IndexRouting.ExtractFromSource) routing;
         String idFromSource = r.createId(XContentType.JSON, sourceBytes, suffix);
         assertThat(shardIdForReadFromSourceExtracting(routing, idFromSource), equalTo(expectedShard));
-        String idFromFlattened = r.createId(flatten(source), suffix);
+        Map<String, Object> flattened = flatten(source);
+        String idFromFlattened = r.createId(flattened, suffix);
         assertThat(idFromFlattened, equalTo(idFromSource));
+
+        IndexRouting.ExtractFromSource.Builder b = r.builder();
+        for (Map.Entry<String, Object> e : flattened.entrySet()) {
+            b.addMatching(e.getKey(), new BytesRef(e.getValue().toString()));
+        }
+        String idFromBuilder = b.createId(suffix, () -> { throw new AssertionError(); });
+        assertThat(idFromBuilder, equalTo(idFromSource));
     }
 
     private byte[] randomSuffix() {

+ 1 - 1
server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregatorTests.java

@@ -77,7 +77,7 @@ public class TimeSeriesAggregatorTests extends AggregatorTestCase {
     public static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimensions, Object[] metrics) throws IOException {
         final List<IndexableField> fields = new ArrayList<>();
         fields.add(new SortedNumericDocValuesField(DataStreamTimestampFieldMapper.DEFAULT_PATH, timestamp));
-        final TimeSeriesIdBuilder builder = new TimeSeriesIdBuilder();
+        final TimeSeriesIdBuilder builder = new TimeSeriesIdBuilder(null);
         for (int i = 0; i < dimensions.length; i += 2) {
             if (dimensions[i + 1]instanceof Number n) {
                 builder.addLong(dimensions[i].toString(), n.longValue());