Browse Source

Auto-reload analyzers for specific resource (#96986)

This PR adds a new optional parameter "resource" for ReloadAnalyzersRequest.
If used, only analyzers that use this specific "resource" will be reload.
This parameter is not documented, for internal use only.

PR #96886 introduced auto-reload of analyzers on synonyms index change. The problem
was that reloading was applied broadly for all indices that contained reloadable
analyzers. This PR improves this, so when a particular synonyms set changes,
only analyzers that use this synonyms set  will auto-reloaded. Note that shard
requests will still be sent to all indices shards, as only on a shard we can
decide if analyzers need to be reloaded.
Mayya Sharipova 2 years ago
parent
commit
11a3104a8c
15 changed files with 248 additions and 27 deletions
  1. 12 6
      modules/analysis-common/src/internalClusterTest/java/org/elasticsearch/analysis/common/ReloadAnalyzerTests.java
  2. 4 2
      modules/analysis-common/src/internalClusterTest/java/org/elasticsearch/analysis/common/ReloadSynonymAnalyzerIT.java
  3. 5 0
      modules/analysis-common/src/main/java/org/elasticsearch/analysis/common/SynonymGraphTokenFilterFactory.java
  4. 12 2
      modules/analysis-common/src/main/java/org/elasticsearch/analysis/common/SynonymTokenFilterFactory.java
  5. 134 0
      modules/analysis-common/src/yamlRestTest/resources/rest-api-spec/test/analysis-common/80_synonyms_reloading_for_synset.yml
  6. 4 0
      rest-api-spec/src/main/resources/rest-api-spec/api/indices.reload_search_analyzers.json
  7. 22 4
      server/src/main/java/org/elasticsearch/action/admin/indices/analyze/ReloadAnalyzersRequest.java
  8. 7 1
      server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java
  9. 4 3
      server/src/main/java/org/elasticsearch/index/analysis/IndexAnalyzers.java
  10. 23 0
      server/src/main/java/org/elasticsearch/index/analysis/ReloadableCustomAnalyzer.java
  11. 12 0
      server/src/main/java/org/elasticsearch/index/analysis/TokenFilterFactory.java
  12. 2 2
      server/src/main/java/org/elasticsearch/index/mapper/MapperService.java
  13. 1 0
      server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestReloadAnalyzersAction.java
  14. 5 6
      server/src/main/java/org/elasticsearch/synonyms/SynonymsManagementAPIService.java
  15. 1 1
      server/src/test/java/org/elasticsearch/index/mapper/ReloadableAnalyzerTests.java

+ 12 - 6
modules/analysis-common/src/internalClusterTest/java/org/elasticsearch/analysis/common/ReloadAnalyzerTests.java

@@ -96,8 +96,10 @@ public class ReloadAnalyzerTests extends ESSingleNodeTestCase {
         ) {
             out.println("foo, baz, buzz");
         }
-        ReloadAnalyzersResponse reloadResponse = client().execute(ReloadAnalyzerAction.INSTANCE, new ReloadAnalyzersRequest(indexName))
-            .actionGet();
+        ReloadAnalyzersResponse reloadResponse = client().execute(
+            ReloadAnalyzerAction.INSTANCE,
+            new ReloadAnalyzersRequest(null, indexName)
+        ).actionGet();
         assertNoFailures(reloadResponse);
         Set<String> reloadedAnalyzers = reloadResponse.getReloadDetails().get(indexName).getReloadedAnalyzers();
         assertEquals(2, reloadedAnalyzers.size());
@@ -165,8 +167,10 @@ public class ReloadAnalyzerTests extends ESSingleNodeTestCase {
         ) {
             out.println("foo, baz, buzz");
         }
-        ReloadAnalyzersResponse reloadResponse = client().execute(ReloadAnalyzerAction.INSTANCE, new ReloadAnalyzersRequest(indexName))
-            .actionGet();
+        ReloadAnalyzersResponse reloadResponse = client().execute(
+            ReloadAnalyzerAction.INSTANCE,
+            new ReloadAnalyzersRequest(null, indexName)
+        ).actionGet();
         assertNoFailures(reloadResponse);
         Set<String> reloadedAnalyzers = reloadResponse.getReloadDetails().get(indexName).getReloadedAnalyzers();
         assertEquals(1, reloadedAnalyzers.size());
@@ -285,8 +289,10 @@ public class ReloadAnalyzerTests extends ESSingleNodeTestCase {
             out.println("jumping");
         }
 
-        ReloadAnalyzersResponse reloadResponse = client().execute(ReloadAnalyzerAction.INSTANCE, new ReloadAnalyzersRequest(indexName))
-            .actionGet();
+        ReloadAnalyzersResponse reloadResponse = client().execute(
+            ReloadAnalyzerAction.INSTANCE,
+            new ReloadAnalyzersRequest(null, indexName)
+        ).actionGet();
         assertNoFailures(reloadResponse);
         Set<String> reloadedAnalyzers = reloadResponse.getReloadDetails().get(indexName).getReloadedAnalyzers();
         assertEquals(1, reloadedAnalyzers.size());

+ 4 - 2
modules/analysis-common/src/internalClusterTest/java/org/elasticsearch/analysis/common/ReloadSynonymAnalyzerIT.java

@@ -95,8 +95,10 @@ public class ReloadSynonymAnalyzerIT extends ESIntegTestCase {
             ) {
                 out.println("foo, baz, " + testTerm);
             }
-            ReloadAnalyzersResponse reloadResponse = client().execute(ReloadAnalyzerAction.INSTANCE, new ReloadAnalyzersRequest("test"))
-                .actionGet();
+            ReloadAnalyzersResponse reloadResponse = client().execute(
+                ReloadAnalyzerAction.INSTANCE,
+                new ReloadAnalyzersRequest(null, "test")
+            ).actionGet();
             assertNoFailures(reloadResponse);
             assertEquals(cluster().numDataNodes(), reloadResponse.getSuccessfulShards());
             assertTrue(reloadResponse.getReloadDetails().containsKey("test"));

+ 5 - 0
modules/analysis-common/src/main/java/org/elasticsearch/analysis/common/SynonymGraphTokenFilterFactory.java

@@ -69,6 +69,11 @@ public class SynonymGraphTokenFilterFactory extends SynonymTokenFilterFactory {
             public AnalysisMode getAnalysisMode() {
                 return analysisMode;
             }
+
+            @Override
+            public String getResourceName() {
+                return rulesFromSettings.resource();
+            }
         };
     }
 

+ 12 - 2
modules/analysis-common/src/main/java/org/elasticsearch/analysis/common/SynonymTokenFilterFactory.java

@@ -120,6 +120,11 @@ public class SynonymTokenFilterFactory extends AbstractTokenFilterFactory {
             public AnalysisMode getAnalysisMode() {
                 return analysisMode;
             }
+
+            @Override
+            public String getResourceName() {
+                return rulesFromSettings.resource();
+            }
         };
     }
 
@@ -175,7 +180,8 @@ public class SynonymTokenFilterFactory extends AbstractTokenFilterFactory {
             }
             return new ReaderWithOrigin(
                 Analysis.getReaderFromIndex(synonymsSet, threadPool, synonymsManagementAPIService),
-                "[" + synonymsSet + "] synonyms_set in .synonyms index"
+                "[" + synonymsSet + "] synonyms_set in .synonyms index",
+                synonymsSet
             );
         } else if (settings.get("synonyms_path") != null) {
             String synonyms_path = settings.get("synonyms_path", null);
@@ -186,5 +192,9 @@ public class SynonymTokenFilterFactory extends AbstractTokenFilterFactory {
         }
     }
 
-    record ReaderWithOrigin(Reader reader, String origin) {};
+    record ReaderWithOrigin(Reader reader, String origin, String resource) {
+        ReaderWithOrigin(Reader reader, String origin) {
+            this(reader, origin, null);
+        }
+    }
 }

+ 134 - 0
modules/analysis-common/src/yamlRestTest/resources/rest-api-spec/test/analysis-common/80_synonyms_reloading_for_synset.yml

@@ -0,0 +1,134 @@
+---
+"Reload analyzers for specific synonym set":
+  - skip:
+      version: " - 8.9.99"
+      reason: Reloading analyzers for specific synonym set is introduced in 8.10.0
+
+  # Create synonyms_set1
+  - do:
+      synonyms.put:
+        synonyms_set: synonyms_set1
+        body:
+          synonyms_set:
+            - synonyms: "hello, hi"
+              id: "synonym-rule-1"
+            - synonyms: "bye => goodbye"
+              id: "synonym-rule-2"
+
+  # Create synonyms synonyms_set2
+  - do:
+      synonyms.put:
+        synonyms_set: synonyms_set2
+        body:
+          synonyms_set:
+            - synonyms: "hello, hi"
+              id: "synonym-rule-1"
+            - synonyms: "bye => goodbye"
+              id: "synonym-rule-2"
+
+  # Create my_index1 with synonym_filter that uses synonyms_set1
+  - do:
+      indices.create:
+        index: my_index1
+        body:
+          settings:
+            index:
+              number_of_shards: 1
+              number_of_replicas: 0
+            analysis:
+              filter:
+                my_synonym_filter:
+                  type: synonym_graph
+                  synonyms_set: synonyms_set1
+                  updateable: true
+              analyzer:
+                my_analyzer1:
+                  type: custom
+                  tokenizer: standard
+                  filter: [ lowercase, my_synonym_filter ]
+          mappings:
+            properties:
+              my_field:
+                type: text
+                search_analyzer: my_analyzer1
+  - do:
+      bulk:
+        refresh: true
+        body:
+          - '{"index": {"_index": "my_index1", "_id": "1"}}'
+          - '{"my_field": "hello"}'
+          - '{"index": {"_index": "my_index1", "_id": "2"}}'
+          - '{"my_field": "goodbye"}'
+
+  # Create my_index2 with synonym_filter that uses synonyms_set2
+  - do:
+      indices.create:
+        index: my_index2
+        body:
+          settings:
+            index:
+              number_of_shards: 1
+              number_of_replicas: 0
+            analysis:
+              filter:
+                my_synonym_filter:
+                  type: synonym_graph
+                  synonyms_set: synonyms_set2
+                  updateable: true
+              analyzer:
+                my_analyzer2:
+                  type: custom
+                  tokenizer: standard
+                  filter: [ lowercase, my_synonym_filter ]
+          mappings:
+            properties:
+              my_field:
+                type: text
+                search_analyzer: my_analyzer2
+
+  - do:
+      bulk:
+        refresh: true
+        body:
+          - '{"index": {"_index": "my_index2", "_id": "1"}}'
+          - '{"my_field": "hello"}'
+          - '{"index": {"_index": "my_index2", "_id": "2"}}'
+          - '{"my_field": "goodbye"}'
+
+
+  # An update of synonyms_set1 must trigger auto-reloading of analyzers only for synonyms_set1
+  - do:
+      synonyms.put:
+        synonyms_set: synonyms_set1
+        body:
+          synonyms_set:
+            - synonyms: "hello, salute"
+            - synonyms: "ciao => goodbye"
+  - match: { result: "updated" }
+  - match: { reload_analyzers_details._shards.total: 2 } # shard requests are still sent to 2 indices
+  - match: { reload_analyzers_details._shards.successful: 2 }
+  - length: { reload_analyzers_details.reload_details: 1 } # reload details contain only a single index
+  - match: { reload_analyzers_details.reload_details.0.index: "my_index1" }
+  - match: { reload_analyzers_details.reload_details.0.reloaded_analyzers.0: "my_analyzer1" }
+
+  # Confirm that the index analyzers are reloaded for my_index1
+  - do:
+      search:
+        index: my_index1
+        body:
+          query:
+            match:
+              my_field:
+                query: salute
+  - match: { hits.total.value: 1 }
+
+  # Confirm that the index analyzers are still the same for my_index2
+  - do:
+      search:
+        index: my_index2
+        body:
+          query:
+            match:
+              my_field:
+                query: salute
+  - match: { hits.total.value: 0 }

+ 4 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/indices.reload_search_analyzers.json

@@ -46,6 +46,10 @@
         ],
         "default":"open",
         "description":"Whether to expand wildcard expression to concrete indices that are open, closed or both."
+      },
+      "resource" : {
+        "type" : "string",
+        "description" : "changed resource to reload analyzers from if applicable"
       }
     }
   }

+ 22 - 4
server/src/main/java/org/elasticsearch/action/admin/indices/analyze/ReloadAnalyzersRequest.java

@@ -10,6 +10,7 @@ package org.elasticsearch.action.admin.indices.analyze;
 
 import org.elasticsearch.action.support.broadcast.BroadcastRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -19,16 +20,31 @@ import java.util.Objects;
  * Request for reloading index search analyzers
  */
 public class ReloadAnalyzersRequest extends BroadcastRequest<ReloadAnalyzersRequest> {
+    private final String resource;
 
     /**
-     * Constructs a new request for reloading index search analyzers for one or more indices
+     * Constructs a request for reloading index search analyzers
+     * @param resource changed resource to reload analyzers from, @null if not applicable
+     * @param indices the indices to reload analyzers for
      */
-    public ReloadAnalyzersRequest(String... indices) {
+    public ReloadAnalyzersRequest(String resource, String... indices) {
         super(indices);
+        this.resource = resource;
     }
 
     public ReloadAnalyzersRequest(StreamInput in) throws IOException {
         super(in);
+        this.resource = in.readOptionalString();
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        out.writeOptionalString(resource);
+    }
+
+    public String resource() {
+        return resource;
     }
 
     @Override
@@ -40,12 +56,14 @@ public class ReloadAnalyzersRequest extends BroadcastRequest<ReloadAnalyzersRequ
             return false;
         }
         ReloadAnalyzersRequest that = (ReloadAnalyzersRequest) o;
-        return Objects.equals(indicesOptions(), that.indicesOptions()) && Arrays.equals(indices, that.indices);
+        return Objects.equals(indicesOptions(), that.indicesOptions())
+            && Arrays.equals(indices, that.indices)
+            && Objects.equals(resource, that.resource);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(indicesOptions(), Arrays.hashCode(indices));
+        return Objects.hash(indicesOptions(), Arrays.hashCode(indices), resource);
     }
 
 }

+ 7 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportReloadAnalyzersAction.java

@@ -86,10 +86,15 @@ public class TransportReloadAnalyzersAction extends TransportBroadcastByNodeActi
     ) {
         return (totalShards, successfulShards, failedShards, responses, shardFailures) -> {
             Map<String, ReloadAnalyzersResponse.ReloadDetails> reloadedIndicesDetails = new HashMap<>();
+            // if the request was to reload for a specific resource, we only want to return the details for that resource
+            boolean includeEmptyReloadDetails = request.resource() == null;
             for (ReloadResult result : responses) {
                 if (reloadedIndicesDetails.containsKey(result.index)) {
                     reloadedIndicesDetails.get(result.index).merge(result);
                 } else {
+                    if (result.reloadedSearchAnalyzers.isEmpty() && includeEmptyReloadDetails == false) {
+                        continue;
+                    }
                     HashSet<String> nodeIds = new HashSet<>();
                     nodeIds.add(result.nodeId);
                     ReloadAnalyzersResponse.ReloadDetails details = new ReloadAnalyzersResponse.ReloadDetails(
@@ -119,7 +124,8 @@ public class TransportReloadAnalyzersAction extends TransportBroadcastByNodeActi
         ActionListener.completeWith(listener, () -> {
             logger.info("reloading analyzers for index shard " + shardRouting);
             IndexService indexService = indicesService.indexService(shardRouting.index());
-            List<String> reloadedSearchAnalyzers = indexService.mapperService().reloadSearchAnalyzers(indicesService.getAnalysis());
+            List<String> reloadedSearchAnalyzers = indexService.mapperService()
+                .reloadSearchAnalyzers(indicesService.getAnalysis(), request.resource());
             return new ReloadResult(shardRouting.index().getName(), shardRouting.currentNodeId(), reloadedSearchAnalyzers);
         });
     }

+ 4 - 3
server/src/main/java/org/elasticsearch/index/analysis/IndexAnalyzers.java

@@ -95,7 +95,7 @@ public interface IndexAnalyzers extends Closeable {
     /**
      * Reload any analyzers that have reloadable components
      */
-    default List<String> reload(AnalysisRegistry analysisRegistry, IndexSettings indexSettings) throws IOException {
+    default List<String> reload(AnalysisRegistry analysisRegistry, IndexSettings indexSettings, String resource) throws IOException {
         return List.of();
     }
 
@@ -135,12 +135,13 @@ public interface IndexAnalyzers extends Closeable {
             }
 
             @Override
-            public List<String> reload(AnalysisRegistry registry, IndexSettings indexSettings) throws IOException {
+            public List<String> reload(AnalysisRegistry registry, IndexSettings indexSettings, String resource) throws IOException {
 
                 List<NamedAnalyzer> reloadableAnalyzers = analyzers.values()
                     .stream()
-                    .filter(a -> a.analyzer() instanceof ReloadableCustomAnalyzer)
+                    .filter(a -> a.analyzer() instanceof ReloadableCustomAnalyzer ra && ra.usesResource(resource))
                     .toList();
+
                 if (reloadableAnalyzers.isEmpty()) {
                     return List.of();
                 }

+ 23 - 0
server/src/main/java/org/elasticsearch/index/analysis/ReloadableCustomAnalyzer.java

@@ -16,7 +16,9 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.CollectionUtils;
 
 import java.io.Reader;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 public final class ReloadableCustomAnalyzer extends Analyzer implements AnalyzerComponentsProvider {
 
@@ -24,6 +26,9 @@ public final class ReloadableCustomAnalyzer extends Analyzer implements Analyzer
 
     private CloseableThreadLocal<AnalyzerComponents> storedComponents = new CloseableThreadLocal<>();
 
+    // external resources that this analyzer is based on
+    private final Set<String> resources;
+
     private final int positionIncrementGap;
 
     private final int offsetGap;
@@ -63,6 +68,14 @@ public final class ReloadableCustomAnalyzer extends Analyzer implements Analyzer
         this.components = components;
         this.positionIncrementGap = positionIncrementGap;
         this.offsetGap = offsetGap;
+
+        Set<String> resourcesTemp = new HashSet<>();
+        for (TokenFilterFactory tokenFilter : components.getTokenFilters()) {
+            if (tokenFilter.getResourceName() != null) {
+                resourcesTemp.add(tokenFilter.getResourceName());
+            }
+        }
+        resources = resourcesTemp.isEmpty() ? null : Set.copyOf(resourcesTemp);
     }
 
     @Override
@@ -70,6 +83,16 @@ public final class ReloadableCustomAnalyzer extends Analyzer implements Analyzer
         return this.components;
     }
 
+    public boolean usesResource(String resourceName) {
+        if (resourceName == null) {
+            return true;
+        }
+        if (resources == null) {
+            return false;
+        }
+        return resources.contains(resourceName);
+    }
+
     @Override
     public int getPositionIncrementGap(String fieldName) {
         return this.positionIncrementGap;

+ 12 - 0
server/src/main/java/org/elasticsearch/index/analysis/TokenFilterFactory.java

@@ -75,6 +75,18 @@ public interface TokenFilterFactory {
         return AnalysisMode.ALL;
     }
 
+    /**
+     * Get the name of the resource that this filter is based on.
+     * Used to reload analyzers on this resource changes.
+     *
+     * For an example, see @SynonymGraphTokenFilterFactory#getResourceName()
+     *
+     * @return the name of the resource that this filter was loaded from if any
+     */
+    default String getResourceName() {
+        return null;
+    }
+
     /**
      * A TokenFilterFactory that does no filtering to its TokenStream
      */

+ 2 - 2
server/src/main/java/org/elasticsearch/index/mapper/MapperService.java

@@ -525,10 +525,10 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
         return mappingLookup().isMultiField(field);
     }
 
-    public synchronized List<String> reloadSearchAnalyzers(AnalysisRegistry registry) throws IOException {
+    public synchronized List<String> reloadSearchAnalyzers(AnalysisRegistry registry, String resource) throws IOException {
         logger.info("reloading search analyzers");
         // TODO this should bust the cache somehow. Tracked in https://github.com/elastic/elasticsearch/issues/66722
-        return indexAnalyzers.reload(registry, indexSettings);
+        return indexAnalyzers.reload(registry, indexSettings, resource);
     }
 
     /**

+ 1 - 0
server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestReloadAnalyzersAction.java

@@ -38,6 +38,7 @@ public class RestReloadAnalyzersAction extends BaseRestHandler {
     @Override
     public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
         ReloadAnalyzersRequest reloadAnalyzersRequest = new ReloadAnalyzersRequest(
+            request.param("resource"),
             Strings.splitStringByCommaToArray(request.param("index"))
         );
         reloadAnalyzersRequest.indicesOptions(IndicesOptions.fromRequest(request, reloadAnalyzersRequest.indicesOptions()));

+ 5 - 6
server/src/main/java/org/elasticsearch/synonyms/SynonymsManagementAPIService.java

@@ -234,7 +234,7 @@ public class SynonymsManagementAPIService {
                         ? UpdateSynonymsResultStatus.CREATED
                         : UpdateSynonymsResultStatus.UPDATED;
 
-                    reloadAnalyzers(bulkInsertResponseListener, updateSynonymsResultStatus);
+                    reloadAnalyzers(resourceName, bulkInsertResponseListener, updateSynonymsResultStatus);
                 }));
         }));
     }
@@ -254,7 +254,7 @@ public class SynonymsManagementAPIService {
                         ? UpdateSynonymsResultStatus.CREATED
                         : UpdateSynonymsResultStatus.UPDATED;
 
-                    reloadAnalyzers(l2, updateStatus);
+                    reloadAnalyzers(synonymsSetId, l2, updateStatus);
                 }));
             } catch (IOException e) {
                 l1.onFailure(e);
@@ -337,14 +337,13 @@ public class SynonymsManagementAPIService {
                 return;
             }
 
-            reloadAnalyzers(l, AcknowledgedResponse.of(true));
+            reloadAnalyzers(resourceName, l, AcknowledgedResponse.of(true));
         }));
     }
 
-    private <T> void reloadAnalyzers(ActionListener<SynonymsReloadResult<T>> listener, T synonymsOperationResult) {
+    private <T> void reloadAnalyzers(String resourceName, ActionListener<SynonymsReloadResult<T>> listener, T synonymsOperationResult) {
         // auto-reload all reloadable analyzers (currently only those that use updateable synonym or keyword_marker filters)
-        // TODO: reload only those analyzers that use this synonymsSet
-        ReloadAnalyzersRequest reloadAnalyzersRequest = new ReloadAnalyzersRequest("*");
+        ReloadAnalyzersRequest reloadAnalyzersRequest = new ReloadAnalyzersRequest(resourceName, "*");
         client.execute(
             ReloadAnalyzerAction.INSTANCE,
             reloadAnalyzersRequest,

+ 1 - 1
server/src/test/java/org/elasticsearch/index/mapper/ReloadableAnalyzerTests.java

@@ -77,7 +77,7 @@ public class ReloadableAnalyzerTests extends ESSingleNodeTestCase {
         assertEquals("myReloadableFilter", originalTokenFilters[0].name());
 
         // now reload, this should change the tokenfilterFactory inside the analyzer
-        mapperService.reloadSearchAnalyzers(getInstanceFromNode(AnalysisRegistry.class));
+        mapperService.reloadSearchAnalyzers(getInstanceFromNode(AnalysisRegistry.class), null);
         IndexAnalyzers updatedAnalyzers = mapperService.getIndexAnalyzers();
         assertSame(current, updatedAnalyzers);
         assertSame(current.getDefaultIndexAnalyzer(), updatedAnalyzers.getDefaultIndexAnalyzer());