1
0
Эх сурвалжийг харах

Add ingest cluster state listeners (#46650)

In the case that an ingest processor factory relies on other configuration
in the cluster state in order to construct a processor instance then
it is currently undetermined if a processor facotry can be notified about
a change if multiple cluster state updates are bundled together and
if a processor implement `ClusterStateApplier` interface.
(IngestService implements this interface too)

The idea with ingest cluster state listener is that it is guaranteed to
update the processor factory first before the ingest service creates
a pipeline with their respective processor instances.

Currently this concept is used in the enrich branch:
https://github.com/elastic/elasticsearch/blob/enrich/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java#L21

In this case it a processor factory is interested in enrich indices' _meta
mapping fields.

This is the third PR that merges changes made to server module from
the enrich branch (see #32789) into the master branch.

Changes to the server module are merged separately from the pr that will
merge enrich into master, so that these changes can be reviewed in isolation.
Martijn van Groningen 6 жил өмнө
parent
commit
03e20d184a

+ 19 - 0
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -61,6 +61,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
@@ -84,6 +85,7 @@ public class IngestService implements ClusterStateApplier {
     private volatile Map<String, PipelineHolder> pipelines = Map.of();
     private final ThreadPool threadPool;
     private final IngestMetric totalMetrics = new IngestMetric();
+    private final List<Consumer<ClusterState>> ingestClusterStateListeners = new CopyOnWriteArrayList<>();
 
     public IngestService(ClusterService clusterService, ThreadPool threadPool,
                          Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
@@ -385,6 +387,17 @@ public class IngestService implements ClusterStateApplier {
         return statsBuilder.build();
     }
 
+    /**
+     * Adds a listener that gets invoked with the current cluster state before processor factories
+     * get invoked.
+     *
+     * This is useful for components that are used by ingest processors, so that they have the opportunity to update
+     * before these components get used by the ingest processor factory.
+     */
+    public void addIngestClusterStateListener(Consumer<ClusterState> listener) {
+        ingestClusterStateListeners.add(listener);
+    }
+
     //package private for testing
     static String getProcessorName(Processor processor){
         // conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
@@ -457,6 +470,12 @@ public class IngestService implements ClusterStateApplier {
             return;
         }
 
+        // Publish cluster state to components that are used by processor factories before letting
+        // processor factories create new processor instances.
+        // (Note that this needs to be done also in the case when there is no change to ingest metadata, because in the case
+        // when only the part of the cluster state that a component is interested in, is updated.)
+        ingestClusterStateListeners.forEach(consumer -> consumer.accept(state));
+
         IngestMetadata newIngestMetadata = state.getMetaData().custom(IngestMetadata.TYPE);
         if (newIngestMetadata == null) {
             return;

+ 39 - 0
server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

@@ -70,6 +70,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
@@ -1117,6 +1118,44 @@ public class IngestServiceTests extends ESTestCase {
         verify(dropHandler, times(1)).accept(indexRequest);
     }
 
+    public void testIngestClusterStateListeners_orderOfExecution() {
+        final AtomicInteger counter = new AtomicInteger(0);
+
+        // Ingest cluster state listener state should be invoked first:
+        Consumer<ClusterState> ingestClusterStateListener = clusterState -> {
+            assertThat(counter.compareAndSet(0, 1), is(true));
+        };
+
+        // Processor factory should be invoked secondly after ingest cluster state listener:
+        IngestPlugin testPlugin = new IngestPlugin() {
+            @Override
+            public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
+                return Collections.singletonMap("test", (factories, tag, config) -> {
+                    assertThat(counter.compareAndSet(1, 2), is(true));
+                    return new FakeProcessor("test", tag, ingestDocument -> {});
+                });
+            }
+        };
+
+        // Create ingest service:
+        ThreadPool tp = mock(ThreadPool.class);
+        Client client = mock(Client.class);
+        IngestService ingestService =
+            new IngestService(mock(ClusterService.class), tp, null, null, null, List.of(testPlugin), client);
+        ingestService.addIngestClusterStateListener(ingestClusterStateListener);
+
+        // Create pipeline and apply the resulting cluster state, which should update the counter in the right order:
+        PutPipelineRequest putRequest = new PutPipelineRequest("_id",
+            new BytesArray("{\"processors\": [{\"test\" : {}}]}"), XContentType.JSON);
+        ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
+        ClusterState previousClusterState = clusterState;
+        clusterState = IngestService.innerPut(putRequest, clusterState);
+        ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
+
+        // Sanity check that counter has been updated twice:
+        assertThat(counter.get(), equalTo(2));
+    }
+
     private IngestDocument eqIndexTypeId(final Map<String, Object> source) {
         return argThat(new IngestDocumentMatcher("_index", "_type", "_id", source));
     }