Browse Source

Check if an analytics event data stream exists before installing pipeline. (#95621)

Aurélien FOUCRET 2 years ago
parent
commit
062542b2c9

+ 5 - 0
docs/changelog/95621.yaml

@@ -0,0 +1,5 @@
+pr: 95621
+summary: Check if an analytics event data stream exists before installing pipeline
+area: Application
+type: bug
+issues: []

+ 18 - 0
x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/analytics/AnalyticsIngestPipelineRegistry.java

@@ -7,6 +7,8 @@
 package org.elasticsearch.xpack.application.analytics;
 
 import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.application.utils.ingest.PipelineRegistry;
@@ -14,6 +16,7 @@ import org.elasticsearch.xpack.application.utils.ingest.PipelineTemplateConfigur
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 import static org.elasticsearch.xpack.application.analytics.AnalyticsConstants.EVENT_DATA_STREAM_INDEX_PREFIX;
 import static org.elasticsearch.xpack.application.analytics.AnalyticsConstants.ROOT_RESOURCE_PATH;
@@ -50,4 +53,19 @@ public class AnalyticsIngestPipelineRegistry extends PipelineRegistry {
     protected List<PipelineTemplateConfiguration> getIngestPipelineConfigs() {
         return INGEST_PIPELINES;
     }
+
+    @Override
+    protected boolean isClusterReady(ClusterChangedEvent event) {
+        return super.isClusterReady(event) && (isIngestPipelineInstalled(event.state()) || hasAnalyticsEventDataStream(event.state()));
+    }
+
+    private boolean hasAnalyticsEventDataStream(ClusterState state) {
+        Set<String> dataStreamNames = state.metadata().dataStreams().keySet();
+
+        return dataStreamNames.stream().anyMatch(dataStreamName -> dataStreamName.startsWith(EVENT_DATA_STREAM_INDEX_PREFIX));
+    }
+
+    private boolean isIngestPipelineInstalled(ClusterState state) {
+        return ingestPipelineExists(state, EVENT_DATA_STREAM_INGEST_PIPELINE_NAME);
+    }
 }

+ 15 - 8
x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/utils/ingest/PipelineRegistry.java

@@ -62,25 +62,32 @@ public abstract class PipelineRegistry implements ClusterStateListener {
 
     @Override
     public void clusterChanged(ClusterChangedEvent event) {
+
+        if (isClusterReady(event)) {
+            addIngestPipelinesIfMissing(event.state());
+        }
+    }
+
+    protected abstract String getOrigin();
+
+    protected abstract List<PipelineTemplateConfiguration> getIngestPipelineConfigs();
+
+    protected boolean isClusterReady(ClusterChangedEvent event) {
         ClusterState state = event.state();
         if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
             // wait until recovered from disk, so the cluster state view is consistent
-            return;
+            return false;
         }
 
         DiscoveryNode masterNode = event.state().getNodes().getMasterNode();
         if (masterNode == null || state.nodes().isLocalNodeElectedMaster() == false) {
             // no master node elected or current node is not master
-            return;
+            return false;
         }
 
-        addIngestPipelinesIfMissing(state);
+        return true;
     }
 
-    protected abstract String getOrigin();
-
-    protected abstract List<PipelineTemplateConfiguration> getIngestPipelineConfigs();
-
     private void addIngestPipelinesIfMissing(ClusterState state) {
         for (PipelineTemplateConfiguration pipelineTemplateConfig : getIngestPipelineConfigs()) {
             PipelineConfiguration newPipeline = pipelineTemplateConfig.load();
@@ -121,7 +128,7 @@ public abstract class PipelineRegistry implements ClusterStateListener {
         }
     }
 
-    private static boolean ingestPipelineExists(ClusterState state, String pipelineId) {
+    protected boolean ingestPipelineExists(ClusterState state, String pipelineId) {
         Optional<IngestMetadata> maybeMeta = Optional.ofNullable(state.metadata().custom(IngestMetadata.TYPE));
         return maybeMeta.isPresent() && maybeMeta.get().getPipelines().containsKey(pipelineId);
     }

+ 59 - 5
x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/analytics/AnalyticsIngestPipelineRegistryTests.java

@@ -18,6 +18,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlocks;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -25,8 +26,11 @@ import org.elasticsearch.cluster.node.TestDiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.TriFunction;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.collect.MapBuilder;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.ingest.IngestMetadata;
 import org.elasticsearch.ingest.PipelineConfiguration;
 import org.elasticsearch.test.ClusterServiceUtils;
@@ -123,6 +127,24 @@ public class AnalyticsIngestPipelineRegistryTests extends ESTestCase {
         assertBusy(() -> assertThat(calledTimes.get(), equalTo(registry.getIngestPipelineConfigs().size())));
     }
 
+    public void testThatPipelinesAreNotInstalledWhenNoAnalyticsCollectionExist() {
+        DiscoveryNode node = TestDiscoveryNode.create("node");
+        DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
+
+        ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), nodes, false);
+
+        client.setVerifier((action, request, listener) -> {
+            if (action instanceof PutPipelineAction) {
+                fail("no behavioral analytics collection exists, pipeline should not be installed");
+            } else {
+                fail("client called with unexpected request: " + request.toString());
+            }
+            return null;
+        });
+
+        registry.clusterChanged(event);
+    }
+
     public void testThatNewerPipelinesAreNotUpgraded() throws Exception {
         DiscoveryNode node = TestDiscoveryNode.create("node");
         DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
@@ -211,7 +233,15 @@ public class AnalyticsIngestPipelineRegistryTests extends ESTestCase {
     }
 
     private ClusterChangedEvent createClusterChangedEvent(Map<String, Integer> existingIngestPipelines, DiscoveryNodes nodes) {
-        ClusterState cs = createClusterState(existingIngestPipelines, nodes);
+        return createClusterChangedEvent(existingIngestPipelines, nodes, true);
+    }
+
+    private ClusterChangedEvent createClusterChangedEvent(
+        Map<String, Integer> existingIngestPipelines,
+        DiscoveryNodes nodes,
+        boolean withDataStreams
+    ) {
+        ClusterState cs = createClusterState(existingIngestPipelines, nodes, withDataStreams);
         ClusterChangedEvent realEvent = new ClusterChangedEvent(
             "created-from-test",
             cs,
@@ -223,21 +253,45 @@ public class AnalyticsIngestPipelineRegistryTests extends ESTestCase {
         return event;
     }
 
-    private ClusterState createClusterState(Map<String, Integer> existingIngestPipelines, DiscoveryNodes nodes) {
+    private ClusterState createClusterState(Map<String, Integer> existingIngestPipelines, DiscoveryNodes nodes, boolean withDataStreams) {
         Map<String, PipelineConfiguration> pipelines = new HashMap<>();
         for (Map.Entry<String, Integer> e : existingIngestPipelines.entrySet()) {
             pipelines.put(e.getKey(), createMockPipelineConfiguration(e.getKey(), e.getValue()));
         }
 
+        Metadata.Builder metadataBuilder = Metadata.builder()
+            .transientSettings(Settings.EMPTY)
+            .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines));
+
+        if (withDataStreams) {
+            DataStream dataStream = createDataStream();
+            metadataBuilder.dataStreams(
+                MapBuilder.<String, DataStream>newMapBuilder().put(dataStream.getName(), dataStream).map(),
+                Collections.emptyMap()
+            );
+        }
+
         return ClusterState.builder(new ClusterName("test"))
-            .metadata(
-                Metadata.builder().transientSettings(Settings.EMPTY).putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines)).build()
-            )
+            .metadata(metadataBuilder)
             .blocks(new ClusterBlocks.Builder().build())
             .nodes(nodes)
             .build();
     }
 
+    private DataStream createDataStream() {
+        return new DataStream(
+            AnalyticsConstants.EVENT_DATA_STREAM_INDEX_PREFIX + randomIdentifier(),
+            randomList(1, 10, () -> new Index(randomIdentifier(), randomIdentifier())),
+            0,
+            Collections.emptyMap(),
+            false,
+            false,
+            false,
+            false,
+            IndexMode.STANDARD
+        );
+    }
+
     private PipelineConfiguration createMockPipelineConfiguration(String pipelineId, int version) {
         try (XContentBuilder configBuilder = JsonXContent.contentBuilder().startObject().field("version", version).endObject()) {
             BytesReference config = BytesReference.bytes(configBuilder);

+ 5 - 2
x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/GeoIpUpgradeIT.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.upgrades;
 
 import org.apache.http.util.EntityUtils;
+import org.elasticsearch.Version;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 import org.hamcrest.Matchers;
@@ -17,6 +18,8 @@ import java.nio.charset.StandardCharsets;
 public class GeoIpUpgradeIT extends AbstractUpgradeTestCase {
 
     public void testGeoIpDownloader() throws Exception {
+        assumeTrue("Disabled until PR #95621 is backported to branch " + Version.V_8_8_0, UPGRADE_FROM_VERSION.onOrBefore(Version.V_8_7_0));
+
         if (CLUSTER_TYPE == ClusterType.UPGRADED) {
             assertBusy(() -> {
                 Response response = client().performRequest(new Request("GET", "_cat/tasks"));
@@ -26,8 +29,8 @@ public class GeoIpUpgradeIT extends AbstractUpgradeTestCase {
             assertBusy(() -> {
                 Response response = client().performRequest(new Request("GET", "_ingest/geoip/stats"));
                 String tasks = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
-                // The geoip downloader should be executed since a geoip processors is present in behavioral analytics default pipeline:
-                assertThat(tasks, Matchers.containsString("failed_downloads\":1"));
+                // The geoip downloader doesn't actually do anything since there are no geoip processors:
+                assertThat(tasks, Matchers.containsString("failed_downloads\":0"));
                 assertThat(tasks, Matchers.containsString("successful_downloads\":0"));
             });
         }