Browse Source

Simplify and Speed up `ExecutorSelector` (#83514)

This runs on the transport threads and was just needlessly high overhead
for large bulk requests that target many different indices.
This change should makes the method disappear from profiling.
For datastreams the duplicate checks are obviously unnecessary
due to the way the map for those is build now.
For indices the existing checks for no-pattern-overlap should be
goo enough as well to allow for a tight loop without allocations.

closes #82450
Armin Braun 3 years ago
parent
commit
8d3c4a3dfa

+ 6 - 0
docs/changelog/83514.yaml

@@ -0,0 +1,6 @@
+pr: 83514
+summary: Simplify and Speed up `ExecutorSelector`
+area: Indices APIs
+type: bug
+issues:
+ - 82450

+ 19 - 61
server/src/main/java/org/elasticsearch/indices/SystemIndices.java

@@ -45,6 +45,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -77,6 +78,8 @@ public class SystemIndices {
     private final CharacterRunAutomaton systemDataStreamIndicesRunAutomaton;
     private final Predicate<String> systemDataStreamPredicate;
     private final Map<String, Feature> featureDescriptors;
+    private final SystemIndexDescriptor[] indexDescriptors;
+    private final Map<String, SystemDataStreamDescriptor> dataStreamDescriptors;
     private final Map<String, CharacterRunAutomaton> productToSystemIndicesMatcher;
     private final ExecutorSelector executorSelector;
 
@@ -87,6 +90,14 @@ public class SystemIndices {
      */
     public SystemIndices(Map<String, Feature> pluginAndModulesDescriptors) {
         featureDescriptors = buildSystemIndexDescriptorMap(pluginAndModulesDescriptors);
+        indexDescriptors = featureDescriptors.values()
+            .stream()
+            .flatMap(f -> f.getIndexDescriptors().stream())
+            .toArray(SystemIndexDescriptor[]::new);
+        dataStreamDescriptors = featureDescriptors.values()
+            .stream()
+            .flatMap(f -> f.getDataStreamDescriptors().stream())
+            .collect(Collectors.toUnmodifiableMap(SystemDataStreamDescriptor::getDataStreamName, Function.identity()));
         checkForOverlappingPatterns(featureDescriptors);
         ensurePatternsAllowSuffix(featureDescriptors);
         checkForDuplicateAliases(this.getSystemIndexDescriptors());
@@ -267,78 +278,25 @@ public class SystemIndices {
      * Finds a single matching {@link SystemIndexDescriptor}, if any, for the given index name.
      * @param name the name of the index
      * @return The matching {@link SystemIndexDescriptor} or {@code null} if no descriptor is found
-     * @throws IllegalStateException if multiple descriptors match the name
      */
     public @Nullable SystemIndexDescriptor findMatchingDescriptor(String name) {
-        final List<SystemIndexDescriptor> matchingDescriptors = featureDescriptors.values()
-            .stream()
-            .flatMap(feature -> feature.getIndexDescriptors().stream())
-            .filter(descriptor -> descriptor.matchesIndexPattern(name))
-            .collect(toUnmodifiableList());
-
-        if (matchingDescriptors.isEmpty()) {
-            return null;
-        } else if (matchingDescriptors.size() == 1) {
-            return matchingDescriptors.get(0);
-        } else {
-            // This should be prevented by failing on overlapping patterns at startup time, but is here just in case.
-            StringBuilder errorMessage = new StringBuilder().append("index name [")
-                .append(name)
-                .append("] is claimed as a system index by multiple system index patterns: [")
-                .append(
-                    matchingDescriptors.stream()
-                        .map(
-                            descriptor -> "pattern: ["
-                                + descriptor.getIndexPattern()
-                                + "], description: ["
-                                + descriptor.getDescription()
-                                + "]"
-                        )
-                        .collect(Collectors.joining("; "))
-                );
-            // Throw AssertionError if assertions are enabled, or a regular exception otherwise:
-            assert false : errorMessage.toString();
-            throw new IllegalStateException(errorMessage.toString());
+        SystemIndexDescriptor matchingDescriptor = null;
+        for (SystemIndexDescriptor systemIndexDescriptor : indexDescriptors) {
+            if (systemIndexDescriptor.matchesIndexPattern(name)) {
+                matchingDescriptor = systemIndexDescriptor;
+                break;
+            }
         }
+        return matchingDescriptor;
     }
 
     /**
      * Finds a single matching {@link SystemDataStreamDescriptor}, if any, for the given DataStream name.
      * @param name the name of the DataStream
      * @return The matching {@link SystemDataStreamDescriptor} or {@code null} if no descriptor is found
-     * @throws IllegalStateException if multiple descriptors match the name
      */
     public @Nullable SystemDataStreamDescriptor findMatchingDataStreamDescriptor(String name) {
-        final List<SystemDataStreamDescriptor> matchingDescriptors = featureDescriptors.values()
-            .stream()
-            .flatMap(feature -> feature.getDataStreamDescriptors().stream())
-            .filter(descriptor -> descriptor.getDataStreamName().equals(name))
-            .collect(toUnmodifiableList());
-
-        if (matchingDescriptors.isEmpty()) {
-            return null;
-        } else if (matchingDescriptors.size() == 1) {
-            return matchingDescriptors.get(0);
-        } else {
-            // This should be prevented by failing on overlapping patterns at startup time, but is here just in case.
-            StringBuilder errorMessage = new StringBuilder().append("DataStream name [")
-                .append(name)
-                .append("] is claimed as a system data stream by multiple descriptors: [")
-                .append(
-                    matchingDescriptors.stream()
-                        .map(
-                            descriptor -> "name: ["
-                                + descriptor.getDataStreamName()
-                                + "], description: ["
-                                + descriptor.getDescription()
-                                + "]"
-                        )
-                        .collect(Collectors.joining("; "))
-                );
-            // Throw AssertionError if assertions are enabled, or a regular exception otherwise:
-            assert false : errorMessage.toString();
-            throw new IllegalStateException(errorMessage.toString());
-        }
+        return dataStreamDescriptors.get(name);
     }
 
     /**

+ 1 - 2
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java

@@ -53,8 +53,7 @@ public class TransportBulkShardOperationsAction extends TransportWriteAction<
         final ShardStateAction shardStateAction,
         final ActionFilters actionFilters,
         final IndexingPressure indexingPressure,
-        final SystemIndices systemIndices,
-        final ExecutorSelector executorSelector
+        final SystemIndices systemIndices
     ) {
         super(
             settings,