Sfoglia il codice sorgente

Fixed a bug that prevents pipelines to load that use stored scripts after a restart.

The bug was caused because the ScriptService had no reference to a ClusterState instance,
because it received the ClusterState after the PipelineStore. This only is the case
after a restart.

A bad side effect is that during a restart, any pipeline to be loaded after the pipeline that uses a stored script,
was never loaded, which caused many pipeline to be missing in bulk / index request api calls.
Martijn van Groningen 7 anni fa
parent
commit
766b9d600e

+ 151 - 0
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java

@@ -0,0 +1,151 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.ingest.common;
+
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.script.MockScriptEngine;
+import org.elasticsearch.script.MockScriptPlugin;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.InternalTestCluster;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.hamcrest.Matchers.equalTo;
+
+// Ideally I like this test to live in the server module, but otherwise a large part of the ScriptProcessor
+// ends up being copied into this test.
+@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
+public class IngestRestartIT extends ESIntegTestCase {
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return Arrays.asList(IngestCommonPlugin.class, CustomScriptPlugin.class);
+    }
+
+    @Override
+    protected boolean ignoreExternalCluster() {
+        return true;
+    }
+
+    public static class CustomScriptPlugin extends MockScriptPlugin {
+        @Override
+        protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
+            return Collections.singletonMap("my_script", script -> {
+                @SuppressWarnings("unchecked")
+                Map<String, Object> ctx = (Map) script.get("ctx");
+                ctx.put("z", 0);
+                return null;
+            });
+        }
+    }
+
+    public void testPipelineWithScriptProcessorThatHasStoredScript() throws Exception {
+        internalCluster().startNode();
+
+        client().admin().cluster().preparePutStoredScript()
+                .setId("1")
+                .setContent(new BytesArray("{\"script\": {\"lang\": \"" + MockScriptEngine.NAME +
+                        "\", \"source\": \"my_script\"} }"), XContentType.JSON)
+                .get();
+        BytesReference pipeline = new BytesArray("{\n" +
+                "  \"processors\" : [\n" +
+                "      {\"set\" : {\"field\": \"y\", \"value\": 0}},\n" +
+                "      {\"script\" : {\"id\": \"1\"}}\n" +
+                "  ]\n" +
+                "}");
+        client().admin().cluster().preparePutPipeline("_id", pipeline, XContentType.JSON).get();
+
+        client().prepareIndex("index", "doc", "1")
+                .setSource("x", 0)
+                .setPipeline("_id")
+                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+                .get();
+
+        Map<String, Object> source = client().prepareGet("index", "doc", "1").get().getSource();
+        assertThat(source.get("x"), equalTo(0));
+        assertThat(source.get("y"), equalTo(0));
+        assertThat(source.get("z"), equalTo(0));
+
+        // Prior to making this ScriptService implement ClusterStateApplier instead of ClusterStateListener,
+        // pipelines with a script processor failed to load causing these pipelines and pipelines that were
+        // supposed to load after these pipelines to not be available during ingestion, which then causes
+        // the next index request in this test to fail.
+        internalCluster().fullRestart();
+        ensureYellow("index");
+
+        client().prepareIndex("index", "doc", "2")
+                .setSource("x", 0)
+                .setPipeline("_id")
+                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+                .get();
+
+        source = client().prepareGet("index", "doc", "2").get().getSource();
+        assertThat(source.get("x"), equalTo(0));
+        assertThat(source.get("y"), equalTo(0));
+        assertThat(source.get("z"), equalTo(0));
+    }
+
+    public void testWithDedicatedIngestNode() throws Exception {
+        String node = internalCluster().startNode();
+        String ingestNode = internalCluster().startNode(Settings.builder()
+                .put("node.master", false)
+                .put("node.data", false)
+        );
+
+        BytesReference pipeline = new BytesArray("{\n" +
+                "  \"processors\" : [\n" +
+                "      {\"set\" : {\"field\": \"y\", \"value\": 0}}\n" +
+                "  ]\n" +
+                "}");
+        client().admin().cluster().preparePutPipeline("_id", pipeline, XContentType.JSON).get();
+
+        client().prepareIndex("index", "doc", "1")
+                .setSource("x", 0)
+                .setPipeline("_id")
+                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+                .get();
+
+        Map<String, Object> source = client().prepareGet("index", "doc", "1").get().getSource();
+        assertThat(source.get("x"), equalTo(0));
+        assertThat(source.get("y"), equalTo(0));
+
+        logger.info("Stopping");
+        internalCluster().restartNode(node, new InternalTestCluster.RestartCallback());
+
+        client(ingestNode).prepareIndex("index", "doc", "2")
+                .setSource("x", 0)
+                .setPipeline("_id")
+                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+                .get();
+
+        source = client(ingestNode).prepareGet("index", "doc", "2").get().getSource();
+        assertThat(source.get("x"), equalTo(0));
+        assertThat(source.get("y"), equalTo(0));
+    }
+
+}

+ 5 - 1
server/src/main/java/org/elasticsearch/ingest/PipelineStore.java

@@ -35,9 +35,9 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.regex.Regex;
-import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.gateway.GatewayService;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -70,6 +70,10 @@ public class PipelineStore extends AbstractComponent implements ClusterStateAppl
     }
 
     void innerUpdatePipelines(ClusterState previousState, ClusterState state) {
+        if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
+            return;
+        }
+
         IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE);
         IngestMetadata previousIngestMetadata = previousState.getMetaData().custom(IngestMetadata.TYPE);
         if (Objects.equals(ingestMetadata, previousIngestMetadata)) {

+ 1 - 1
server/src/main/java/org/elasticsearch/node/Node.java

@@ -342,7 +342,7 @@ public class Node implements Closeable {
             List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
             final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool,
                ClusterModule.getClusterStateCustomSuppliers(clusterPlugins));
-            clusterService.addListener(scriptModule.getScriptService());
+            clusterService.addStateApplier(scriptModule.getScriptService());
             resourcesToClose.add(clusterService);
             final IngestService ingestService = new IngestService(settings, threadPool, this.environment,
                 scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));

+ 3 - 3
server/src/main/java/org/elasticsearch/script/ScriptService.java

@@ -30,7 +30,7 @@ import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRespo
 import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateListener;
+import org.elasticsearch.cluster.ClusterStateApplier;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
@@ -57,7 +57,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.function.Function;
 
-public class ScriptService extends AbstractComponent implements Closeable, ClusterStateListener {
+public class ScriptService extends AbstractComponent implements Closeable, ClusterStateApplier {
 
     static final String DISABLE_DYNAMIC_SCRIPTING_SETTING = "script.disable_dynamic";
 
@@ -508,7 +508,7 @@ public class ScriptService extends AbstractComponent implements Closeable, Clust
     }
 
     @Override
-    public void clusterChanged(ClusterChangedEvent event) {
+    public void applyClusterState(ClusterChangedEvent event) {
         clusterState = event.state();
     }