Browse Source

Speed up merging field-caps response (#83704)

- Sort the index responses before merging to avoid sorting them for each 
  field name

- Track the number of searchable indices to avoid looping to find the 
  number of non-searchable indices for each field name
Nhat Nguyen 3 years ago
parent
commit
21533cc430

+ 5 - 0
docs/changelog/83704.yaml

@@ -0,0 +1,5 @@
+pr: 83704
+summary: Speed up merging field-caps response
+area: Search
+type: enhancement
+issues: []

+ 67 - 49
server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilities.java

@@ -25,6 +25,7 @@ import org.elasticsearch.xcontent.XContentParser;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -476,22 +477,19 @@ public class FieldCapabilities implements Writeable, ToXContentObject {
         private final String name;
         private final String type;
         private boolean isMetadataField;
-        private boolean isSearchable;
-        private boolean isAggregatable;
-        private boolean isDimension;
+        private int searchableIndices = 0;
+        private int aggregatableIndices = 0;
+        private int dimensionIndices = 0;
         private TimeSeriesParams.MetricType metricType;
-        private boolean metricTypeIsSet;
-        private List<IndexCaps> indiceList;
-        private Map<String, Set<String>> meta;
+        private boolean hasConflictMetricType;
+        private final List<IndexCaps> indiceList;
+        private final Map<String, Set<String>> meta;
 
         Builder(String name, String type) {
             this.name = name;
             this.type = type;
-            this.isSearchable = true;
-            this.isAggregatable = true;
-            this.isDimension = true;
             this.metricType = null;
-            this.metricTypeIsSet = false;
+            this.hasConflictMetricType = false;
             this.indiceList = new ArrayList<>();
             this.meta = new HashMap<>();
         }
@@ -508,81 +506,101 @@ public class FieldCapabilities implements Writeable, ToXContentObject {
             TimeSeriesParams.MetricType metricType,
             Map<String, String> meta
         ) {
-            IndexCaps indexCaps = new IndexCaps(index, search, agg, isDimension, metricType);
-            indiceList.add(indexCaps);
-            this.isSearchable &= search;
-            this.isAggregatable &= agg;
+            assert indiceList.isEmpty() || indiceList.get(indiceList.size() - 1).name.compareTo(index) < 0
+                : "indices aren't sorted; previous [" + indiceList.get(indiceList.size() - 1).name + "], current [" + index + "]";
+            if (search) {
+                searchableIndices++;
+            }
+            if (agg) {
+                aggregatableIndices++;
+            }
+            if (isDimension) {
+                dimensionIndices++;
+            }
             this.isMetadataField |= isMetadataField;
-            this.isDimension &= isDimension;
             // If we have discrepancy in metric types or in some indices this field is not marked as a metric field - we will
             // treat is a non-metric field and report this discrepancy in metricConflictsIndices
-            if (this.metricTypeIsSet) {
-                if (this.metricType != metricType) {
-                    this.metricType = null;
-                }
-            } else {
-                this.metricTypeIsSet = true;
+            if (indiceList.isEmpty()) {
                 this.metricType = metricType;
+            } else if (this.metricType != metricType) {
+                hasConflictMetricType = true;
+                this.metricType = null;
             }
+            IndexCaps indexCaps = new IndexCaps(index, search, agg, isDimension, metricType);
+            indiceList.add(indexCaps);
             for (Map.Entry<String, String> entry : meta.entrySet()) {
                 this.meta.computeIfAbsent(entry.getKey(), key -> new HashSet<>()).add(entry.getValue());
             }
         }
 
-        List<String> getIndices() {
-            return indiceList.stream().map(c -> c.name).collect(Collectors.toList());
+        void getIndices(Collection<String> indices) {
+            indiceList.forEach(cap -> indices.add(cap.name));
         }
 
         FieldCapabilities build(boolean withIndices) {
             final String[] indices;
-            Collections.sort(indiceList, Comparator.comparing(o -> o.name));
             if (withIndices) {
                 indices = indiceList.stream().map(caps -> caps.name).toArray(String[]::new);
             } else {
                 indices = null;
             }
 
+            // Iff this field is searchable in some indices AND non-searchable in others
+            // we record the list of non-searchable indices
+            final boolean isSearchable = searchableIndices == indiceList.size();
             final String[] nonSearchableIndices;
-            if (isSearchable == false && indiceList.stream().anyMatch((caps) -> caps.isSearchable)) {
-                // Iff this field is searchable in some indices AND non-searchable in others
-                // we record the list of non-searchable indices
-                nonSearchableIndices = indiceList.stream()
-                    .filter((caps) -> caps.isSearchable == false)
-                    .map(caps -> caps.name)
-                    .toArray(String[]::new);
-            } else {
+            if (isSearchable || searchableIndices == 0) {
                 nonSearchableIndices = null;
+            } else {
+                nonSearchableIndices = new String[indiceList.size() - searchableIndices];
+                int index = 0;
+                for (IndexCaps indexCaps : indiceList) {
+                    if (indexCaps.isSearchable == false) {
+                        nonSearchableIndices[index++] = indexCaps.name;
+                    }
+                }
             }
 
+            // Iff this field is aggregatable in some indices AND non-aggregatable in others
+            // we keep the list of non-aggregatable indices
+            final boolean isAggregatable = aggregatableIndices == indiceList.size();
             final String[] nonAggregatableIndices;
-            if (isAggregatable == false && indiceList.stream().anyMatch((caps) -> caps.isAggregatable)) {
-                // Iff this field is aggregatable in some indices AND non-searchable in others
-                // we keep the list of non-aggregatable indices
-                nonAggregatableIndices = indiceList.stream()
-                    .filter((caps) -> caps.isAggregatable == false)
-                    .map(caps -> caps.name)
-                    .toArray(String[]::new);
-            } else {
+            if (isAggregatable || aggregatableIndices == 0) {
                 nonAggregatableIndices = null;
+            } else {
+                nonAggregatableIndices = new String[indiceList.size() - aggregatableIndices];
+                int index = 0;
+                for (IndexCaps indexCaps : indiceList) {
+                    if (indexCaps.isAggregatable == false) {
+                        nonAggregatableIndices[index++] = indexCaps.name;
+                    }
+                }
             }
 
+            // Collect all indices that have dimension == false if this field is marked as a dimension in at least one index
+            final boolean isDimension = dimensionIndices == indiceList.size();
             final String[] nonDimensionIndices;
-            if (isDimension == false && indiceList.stream().anyMatch((caps) -> caps.isDimension)) {
-                // Collect all indices that have dimension == false if this field is marked as a dimension in at least one index
-                nonDimensionIndices = indiceList.stream()
-                    .filter((caps) -> caps.isDimension == false)
-                    .map(caps -> caps.name)
-                    .toArray(String[]::new);
-            } else {
+            if (isDimension || dimensionIndices == 0) {
                 nonDimensionIndices = null;
+            } else {
+                nonDimensionIndices = new String[indiceList.size() - dimensionIndices];
+                int index = 0;
+                for (IndexCaps indexCaps : indiceList) {
+                    if (indexCaps.isDimension == false) {
+                        nonDimensionIndices[index++] = indexCaps.name;
+                    }
+                }
             }
 
             final String[] metricConflictsIndices;
-            if (indiceList.stream().anyMatch((caps) -> caps.metricType != metricType)) {
+            if (hasConflictMetricType) {
                 // Collect all indices that have this field. If it is marked differently in different indices, we cannot really
                 // make a decisions which index is "right" and which index is "wrong" so collecting all indices where this field
                 // is present is probably the only sensible thing to do here
-                metricConflictsIndices = indiceList.stream().map(caps -> caps.name).toArray(String[]::new);
+                metricConflictsIndices = Objects.requireNonNullElseGet(
+                    indices,
+                    () -> indiceList.stream().map(caps -> caps.name).toArray(String[]::new)
+                );
             } else {
                 metricConflictsIndices = null;
             }

+ 17 - 11
server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

@@ -36,8 +36,8 @@ import org.elasticsearch.transport.TransportRequestHandler;
 import org.elasticsearch.transport.TransportService;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -219,13 +219,17 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
     }
 
     private FieldCapabilitiesResponse merge(
-        Map<String, FieldCapabilitiesIndexResponse> indexResponses,
+        Map<String, FieldCapabilitiesIndexResponse> indexResponsesMap,
         boolean includeUnmapped,
         List<FieldCapabilitiesFailure> failures
     ) {
-        String[] indices = indexResponses.keySet().stream().sorted().toArray(String[]::new);
+        final List<FieldCapabilitiesIndexResponse> indexResponses = indexResponsesMap.values()
+            .stream()
+            .sorted(Comparator.comparing(FieldCapabilitiesIndexResponse::getIndexName))
+            .toList();
+        final String[] indices = indexResponses.stream().map(FieldCapabilitiesIndexResponse::getIndexName).toArray(String[]::new);
         final Map<String, Map<String, FieldCapabilities.Builder>> responseMapBuilder = new HashMap<>();
-        for (FieldCapabilitiesIndexResponse response : indexResponses.values()) {
+        for (FieldCapabilitiesIndexResponse response : indexResponses) {
             innerMerge(responseMapBuilder, response);
         }
         final Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
@@ -245,14 +249,16 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
     }
 
     private void addUnmappedFields(String[] indices, String field, Map<String, FieldCapabilities.Builder> typeMap) {
-        Set<String> unmappedIndices = new HashSet<>(Arrays.asList(indices));
-        typeMap.values().forEach((b) -> b.getIndices().forEach(unmappedIndices::remove));
-        if (unmappedIndices.isEmpty() == false) {
-            FieldCapabilities.Builder unmapped = new FieldCapabilities.Builder(field, "unmapped");
-            typeMap.put("unmapped", unmapped);
-            for (String index : unmappedIndices) {
-                unmapped.add(index, false, false, false, false, null, Collections.emptyMap());
+        final Set<String> mappedIndices = new HashSet<>();
+        typeMap.values().forEach(t -> t.getIndices(mappedIndices));
+        if (mappedIndices.size() != indices.length) {
+            final FieldCapabilities.Builder unmapped = new FieldCapabilities.Builder(field, "unmapped");
+            for (String index : indices) {
+                if (mappedIndices.contains(index) == false) {
+                    unmapped.add(index, false, false, false, false, null, Collections.emptyMap());
+                }
             }
+            typeMap.put("unmapped", unmapped);
         }
     }
 

+ 142 - 0
server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesTests.java

@@ -9,6 +9,8 @@
 package org.elasticsearch.action.fieldcaps;
 
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.util.iterable.Iterables;
+import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.index.mapper.TimeSeriesParams;
 import org.elasticsearch.test.AbstractSerializingTestCase;
 import org.elasticsearch.xcontent.XContentParser;
@@ -16,9 +18,15 @@ import org.elasticsearch.xcontent.XContentParser;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.IntStream;
 
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 
 public class FieldCapabilitiesTests extends AbstractSerializingTestCase<FieldCapabilities> {
@@ -158,6 +166,140 @@ public class FieldCapabilitiesTests extends AbstractSerializingTestCase<FieldCap
         }
     }
 
+    public void testRandomBuilder() {
+        List<String> indices = IntStream.range(0, randomIntBetween(1, 50))
+            .mapToObj(n -> String.format(Locale.ROOT, "index_%2d", n))
+            .toList();
+        Set<String> searchableIndices = new HashSet<>(randomSubsetOf(indices));
+        Set<String> aggregatableIndices = new HashSet<>(randomSubsetOf(indices));
+        Set<String> dimensionIndices = new HashSet<>(randomSubsetOf(indices));
+        FieldCapabilities.Builder builder = new FieldCapabilities.Builder("field", "type");
+        for (String index : indices) {
+            builder.add(
+                index,
+                randomBoolean(),
+                searchableIndices.contains(index),
+                aggregatableIndices.contains(index),
+                dimensionIndices.contains(index),
+                null,
+                Map.of()
+            );
+        }
+        FieldCapabilities fieldCaps = builder.build(randomBoolean());
+        // search
+        if (searchableIndices.isEmpty()) {
+            assertFalse(fieldCaps.isSearchable());
+            assertNull(fieldCaps.nonSearchableIndices());
+        } else if (searchableIndices.size() == indices.size()) {
+            assertTrue(fieldCaps.isSearchable());
+            assertNull(fieldCaps.nonSearchableIndices());
+        } else {
+            assertFalse(fieldCaps.isSearchable());
+            assertThat(
+                Sets.newHashSet(fieldCaps.nonSearchableIndices()),
+                equalTo(Sets.difference(Sets.newHashSet(indices), searchableIndices))
+            );
+        }
+        // aggregate
+        if (aggregatableIndices.isEmpty()) {
+            assertFalse(fieldCaps.isAggregatable());
+            assertNull(fieldCaps.nonAggregatableIndices());
+        } else if (aggregatableIndices.size() == indices.size()) {
+            assertTrue(fieldCaps.isAggregatable());
+            assertNull(fieldCaps.nonAggregatableIndices());
+        } else {
+            assertFalse(fieldCaps.isAggregatable());
+            assertThat(
+                Sets.newHashSet(fieldCaps.nonAggregatableIndices()),
+                equalTo(Sets.difference(Sets.newHashSet(indices), aggregatableIndices))
+            );
+        }
+        // dimension
+        if (dimensionIndices.isEmpty()) {
+            assertFalse(fieldCaps.isDimension());
+            assertNull(fieldCaps.nonDimensionIndices());
+        } else if (dimensionIndices.size() == indices.size()) {
+            assertTrue(fieldCaps.isDimension());
+            assertNull(fieldCaps.nonDimensionIndices());
+        } else {
+            assertFalse(fieldCaps.isDimension());
+            assertThat(
+                Sets.newHashSet(fieldCaps.nonDimensionIndices()),
+                equalTo(Sets.difference(Sets.newHashSet(indices), dimensionIndices))
+            );
+        }
+    }
+
+    public void testBuilderSingleMetricType() {
+        List<String> indices = IntStream.range(0, randomIntBetween(1, 50))
+            .mapToObj(n -> String.format(Locale.ROOT, "index_%2d", n))
+            .toList();
+        TimeSeriesParams.MetricType metric = randomBoolean() ? null : randomFrom(TimeSeriesParams.MetricType.values());
+        FieldCapabilities.Builder builder = new FieldCapabilities.Builder("field", "type");
+        for (String index : indices) {
+            builder.add(index, randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), metric, Map.of());
+        }
+        FieldCapabilities fieldCaps = builder.build(randomBoolean());
+        assertThat(fieldCaps.getMetricType(), equalTo(metric));
+        assertNull(fieldCaps.metricConflictsIndices());
+    }
+
+    public void testBuilderMixedMetricType() {
+        List<String> indices = IntStream.range(0, randomIntBetween(1, 50))
+            .mapToObj(n -> String.format(Locale.ROOT, "index_%2d", n))
+            .toList();
+        Map<String, TimeSeriesParams.MetricType> metricTypes = new HashMap<>();
+        for (String index : indices) {
+            if (randomBoolean()) {
+                metricTypes.put(index, randomFrom(TimeSeriesParams.MetricType.values()));
+            }
+        }
+        FieldCapabilities.Builder builder = new FieldCapabilities.Builder("field", "type");
+        for (String index : indices) {
+            builder.add(index, randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), metricTypes.get(index), Map.of());
+        }
+        FieldCapabilities fieldCaps = builder.build(randomBoolean());
+        if (metricTypes.isEmpty()) {
+            assertNull(fieldCaps.getMetricType());
+            assertNull(fieldCaps.metricConflictsIndices());
+        } else if (metricTypes.size() == indices.size() && metricTypes.values().size() == 1) {
+            assertThat(fieldCaps.getMetricType(), equalTo(Iterables.get(metricTypes.values(), 0)));
+            assertNull(fieldCaps.metricConflictsIndices());
+        } else {
+            assertNull(fieldCaps.getMetricType());
+            assertThat(fieldCaps.metricConflictsIndices(), equalTo(indices.toArray(String[]::new)));
+        }
+    }
+
+    public void testOutOfOrderIndices() {
+        FieldCapabilities.Builder builder = new FieldCapabilities.Builder("field", "type");
+        int numIndex = randomIntBetween(1, 5);
+        for (int i = 1; i <= numIndex; i++) {
+            builder.add(
+                "index-" + i,
+                randomBoolean(),
+                randomBoolean(),
+                randomBoolean(),
+                randomBoolean(),
+                randomFrom(TimeSeriesParams.MetricType.values()),
+                Map.of()
+            );
+        }
+        final String outOfOrderIndex = randomBoolean() ? "abc" : "index-" + randomIntBetween(1, numIndex);
+        AssertionError error = expectThrows(AssertionError.class, () -> {
+            builder.add(
+                outOfOrderIndex,
+                randomBoolean(),
+                randomBoolean(),
+                randomBoolean(),
+                randomBoolean(),
+                randomFrom(TimeSeriesParams.MetricType.values()),
+                Map.of()
+            );
+        });
+        assertThat(error.getMessage(), containsString("indices aren't sorted"));
+    }
+
     static FieldCapabilities randomFieldCaps(String fieldName) {
         String[] indices = null;
         if (randomBoolean()) {