Selaa lähdekoodia

Handle fields.with.dots in routing_path (#83148)

Our document parsing code treats:
```
{"foo.bar": "baz"}
```
the same as:
```
{"foo": {"bar": "baz"}}
```
but the code that extracted routing_path didn't! This is bad because
routing_path has to identify a subset of dimensions precisely and in the
same way that our mapping code identifies the dimensions. They have to
line up or two time series could end up on different shards. Sad times.

This makes `routing_path` interpret dots in field names as though they
were an object. It creates an option for the includes/excludes filters
on `XContentParser` to treats dots as objects. So the filter would see
```
{"foo.bar": "baz"}
```
as though it were:
```
{"foo": {"bar": "baz"}}
```

So the filter `foo.bar` would match both documents.

This is part of #82511.
Nik Everett 3 vuotta sitten
vanhempi
commit
be09f8b0dd

+ 1 - 0
build-tools-internal/src/main/resources/changelog-schema.json

@@ -81,6 +81,7 @@
             "TLS",
             "Task Management",
             "Transform",
+            "TSDB",
             "Watcher"
           ]
         },

+ 5 - 0
docs/changelog/83148.yaml

@@ -0,0 +1,5 @@
+pr: 83148
+summary: Handle `fields.with.dots` in `routing_path`
+area: TSDB
+type: feature
+issues: []

+ 60 - 41
server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

@@ -8,6 +8,8 @@
 
 package org.elasticsearch.cluster.routing;
 
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.StringHelper;
 import org.elasticsearch.action.RoutingMissingException;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.MappingMetadata;
@@ -23,7 +25,7 @@ import org.elasticsearch.xcontent.XContentType;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.function.IntConsumer;
@@ -224,66 +226,78 @@ public abstract class IndexRouting {
             }
             assert Transports.assertNotTransportThread("parsing the _source can get slow");
 
+            List<NameAndHash> hashes = new ArrayList<>();
             try {
                 try (XContentParser parser = sourceType.xContent().createParser(parserConfig, source.streamInput())) {
                     parser.nextToken(); // Move to first token
                     if (parser.currentToken() == null) {
                         throw new IllegalArgumentException("Error extracting routing: source didn't contain any routing fields");
                     }
-                    int hash = extractObject(parser);
+                    parser.nextToken();
+                    extractObject(hashes, null, parser);
                     ensureExpectedToken(null, parser.nextToken(), parser);
-                    return hashToShardId(hash);
                 }
             } catch (IOException | ParsingException e) {
                 throw new IllegalArgumentException("Error extracting routing: " + e.getMessage(), e);
             }
+            return hashToShardId(hashesToHash(hashes));
         }
 
-        private static int extractObject(XContentParser source) throws IOException {
-            ensureExpectedToken(Token.FIELD_NAME, source.nextToken(), source);
-            String firstFieldName = source.currentName();
-            source.nextToken();
-            int firstHash = extractItem(source);
-            if (source.currentToken() == Token.END_OBJECT) {
-                // Just one routing key in this object
-                // Use ^ like Map.Entry's hashcode
-                return Murmur3HashFunction.hash(firstFieldName) ^ firstHash;
-            }
-            List<NameAndHash> hashes = new ArrayList<>();
-            hashes.add(new NameAndHash(firstFieldName, firstHash));
-            do {
+        private static void extractObject(List<NameAndHash> hashes, @Nullable String path, XContentParser source) throws IOException {
+            while (source.currentToken() != Token.END_OBJECT) {
                 ensureExpectedToken(Token.FIELD_NAME, source.currentToken(), source);
                 String fieldName = source.currentName();
+                String subPath = path == null ? fieldName : path + "." + fieldName;
                 source.nextToken();
-                hashes.add(new NameAndHash(fieldName, extractItem(source)));
-            } while (source.currentToken() != Token.END_OBJECT);
-            Collections.sort(hashes, Comparator.comparing(nameAndHash -> nameAndHash.name));
-            /*
-             * This is the same as Arrays.hash(Map.Entry<fieldName, hash>) but we're
-             * writing it out so for extra paranoia. Changing this will change how
-             * documents are routed and we don't want a jdk update that modifies Arrays
-             * or Map.Entry to sneak up on us.
-             */
-            int hash = 0;
-            for (NameAndHash nameAndHash : hashes) {
-                int thisHash = Murmur3HashFunction.hash(nameAndHash.name) ^ nameAndHash.hash;
-                hash = 31 * hash + thisHash;
+                extractItem(hashes, subPath, source);
             }
-            return hash;
         }
 
-        private static int extractItem(XContentParser source) throws IOException {
-            if (source.currentToken() == Token.START_OBJECT) {
-                int hash = extractObject(source);
-                source.nextToken();
-                return hash;
+        private static void extractItem(List<NameAndHash> hashes, String path, XContentParser source) throws IOException {
+            switch (source.currentToken()) {
+                case START_OBJECT:
+                    source.nextToken();
+                    extractObject(hashes, path, source);
+                    source.nextToken();
+                    break;
+                case VALUE_STRING:
+                    hashes.add(new NameAndHash(new BytesRef(path), hash(new BytesRef(source.text()))));
+                    source.nextToken();
+                    break;
+                case VALUE_NULL:
+                    source.nextToken();
+                    break;
+                default:
+                    throw new ParsingException(
+                        source.getTokenLocation(),
+                        "Routing values must be strings but found [{}]",
+                        source.currentToken()
+                    );
             }
-            if (source.currentToken() == Token.VALUE_STRING) {
-                int hash = Murmur3HashFunction.hash(source.text());
-                source.nextToken();
-                return hash;
+        }
+
+        private static int hash(BytesRef ref) {
+            return StringHelper.murmurhash3_x86_32(ref, 0);
+        }
+
+        private static int hashesToHash(List<NameAndHash> hashes) {
+            Collections.sort(hashes);
+            Iterator<NameAndHash> itr = hashes.iterator();
+            if (itr.hasNext() == false) {
+                throw new IllegalArgumentException("Error extracting routing: source didn't contain any routing fields");
+            }
+            NameAndHash prev = itr.next();
+            int hash = hash(prev.name) ^ prev.hash;
+            while (itr.hasNext()) {
+                NameAndHash next = itr.next();
+                if (prev.name.equals(next.name)) {
+                    throw new IllegalArgumentException("Duplicate routing dimension for [" + next.name + "]");
+                }
+                int thisHash = hash(next.name) ^ next.hash;
+                hash = 31 * hash + thisHash;
+                prev = next;
             }
-            throw new ParsingException(source.getTokenLocation(), "Routing values must be strings but found [{}]", source.currentToken());
+            return hash;
         }
 
         @Override
@@ -316,5 +330,10 @@ public abstract class IndexRouting {
         }
     }
 
-    private record NameAndHash(String name, int hash) {}
+    private static record NameAndHash(BytesRef name, int hash) implements Comparable<NameAndHash> {
+        @Override
+        public int compareTo(NameAndHash o) {
+            return name.compareTo(o.name);
+        }
+    }
 }

+ 21 - 22
server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java

@@ -7,6 +7,8 @@
  */
 package org.elasticsearch.cluster.routing;
 
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.StringHelper;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.RoutingMissingException;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -513,7 +515,7 @@ public class IndexRoutingTests extends ESTestCase {
         assertIndexShard(
             routing,
             Map.of("foo", Map.of("bar", "cat"), "baz", "dog"),
-            Math.floorMod(hash(List.of("foo", List.of("bar", "cat"))), shards)
+            Math.floorMod(hash(List.of("foo.bar", "cat")), shards)
         );
     }
 
@@ -523,10 +525,16 @@ public class IndexRoutingTests extends ESTestCase {
         assertIndexShard(
             routing,
             Map.of("foo", Map.of("a", "cat"), "bar", Map.of("thing", "yay", "this", "too")),
-            Math.floorMod(hash(List.of("bar", List.of("thing", "yay", "this", "too"), "foo", List.of("a", "cat"))), shards)
+            Math.floorMod(hash(List.of("bar.thing", "yay", "bar.this", "too", "foo.a", "cat")), shards)
         );
     }
 
+    public void testRoutingPathDotInName() throws IOException {
+        int shards = between(2, 1000);
+        IndexRouting routing = indexRoutingForPath(shards, "foo.bar");
+        assertIndexShard(routing, Map.of("foo.bar", "cat", "baz", "dog"), Math.floorMod(hash(List.of("foo.bar", "cat")), shards));
+    }
+
     public void testRoutingPathBwc() throws IOException {
         Version version = VersionUtils.randomIndexCompatibleVersion(random());
         IndexRouting routing = indexRoutingForPath(version, 8, "dim.*,other.*,top");
@@ -538,12 +546,13 @@ public class IndexRoutingTests extends ESTestCase {
          * versions of Elasticsearch must continue to route based on the
          * version on the index.
          */
-        assertIndexShard(routing, Map.of("dim", Map.of("a", "a")), 0);
+        assertIndexShard(routing, Map.of("dim", Map.of("a", "a")), 4);
         assertIndexShard(routing, Map.of("dim", Map.of("a", "b")), 5);
         assertIndexShard(routing, Map.of("dim", Map.of("c", "d")), 4);
-        assertIndexShard(routing, Map.of("other", Map.of("a", "a")), 5);
-        assertIndexShard(routing, Map.of("top", "a"), 3);
-        assertIndexShard(routing, Map.of("dim", Map.of("c", "d"), "top", "b"), 2);
+        assertIndexShard(routing, Map.of("other", Map.of("a", "a")), 7);
+        assertIndexShard(routing, Map.of("top", "a"), 5);
+        assertIndexShard(routing, Map.of("dim", Map.of("c", "d"), "top", "b"), 0);
+        assertIndexShard(routing, Map.of("dim.a", "a"), 4);
     }
 
     private IndexRouting indexRoutingForPath(int shards, String path) {
@@ -560,8 +569,8 @@ public class IndexRoutingTests extends ESTestCase {
         );
     }
 
-    private void assertIndexShard(IndexRouting routing, Map<String, Object> source, int id) throws IOException {
-        assertThat(routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(source)), equalTo(id));
+    private void assertIndexShard(IndexRouting routing, Map<String, Object> source, int expected) throws IOException {
+        assertThat(routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(source)), equalTo(expected));
     }
 
     private BytesReference source(Map<String, Object> doc) throws IOException {
@@ -581,24 +590,14 @@ public class IndexRoutingTests extends ESTestCase {
     /**
      * Build the hash we expect from the extracter.
      */
-    private int hash(List<?> keysAndValues) {
+    private int hash(List<String> keysAndValues) {
         assertThat(keysAndValues.size() % 2, equalTo(0));
         int hash = 0;
         for (int i = 0; i < keysAndValues.size(); i += 2) {
-            int thisHash = Murmur3HashFunction.hash(keysAndValues.get(i).toString()) ^ expectedValueHash(keysAndValues.get(i + 1));
-            hash = hash * 31 + thisHash;
+            int keyHash = StringHelper.murmurhash3_x86_32(new BytesRef(keysAndValues.get(i)), 0);
+            int valueHash = StringHelper.murmurhash3_x86_32(new BytesRef(keysAndValues.get(i + 1)), 0);
+            hash = hash * 31 + (keyHash ^ valueHash);
         }
         return hash;
     }
-
-    private int expectedValueHash(Object value) {
-        if (value instanceof List) {
-            return hash((List<?>) value);
-        }
-        if (value instanceof String) {
-            return Murmur3HashFunction.hash((String) value);
-        }
-        throw new IllegalArgumentException("Unsupported value: " + value);
-    }
-
 }