瀏覽代碼

Only Allow Enabling Streams If No Conflicting Indices Exist (#132064)

* New logic to check for conflicting indices before enabling streams plus tests

* Update docs/changelog/132064.yaml

* Fix tests

* More YAML test fixes

* Update changelog entry

* Normalize to single quotes in gradle file and switch to enum for stream index prefix

* Speed up index checking by pre-computing stream names and prefix and not using a stream

* Fix race condition in YAML tests

* Fix race condition in YAML tests - Take 2
Luke Whiting 2 月之前
父節點
當前提交
6508617c61

+ 5 - 0
docs/changelog/132064.yaml

@@ -0,0 +1,5 @@
+pr: 132064
+summary: Only Allow Enabling Streams If No Conflicting Indices Exist
+area: Data streams
+type: enhancement
+issues: []

+ 1 - 1
modules/streams/build.gradle

@@ -20,7 +20,7 @@ esplugin {
 
 restResources {
   restApi {
-    include '_common', 'streams', "bulk", "index", "ingest", "indices", "delete_by_query", "search"
+    include '_common', 'streams', 'bulk', 'index', 'ingest', 'indices', 'delete_by_query', 'search'
   }
 }
 

+ 38 - 8
modules/streams/src/main/java/org/elasticsearch/rest/streams/logs/TransportLogsStreamsToggleActivation.java

@@ -11,6 +11,7 @@ package org.elasticsearch.rest.streams.logs;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.AcknowledgedRequest;
@@ -22,12 +23,15 @@ import org.elasticsearch.cluster.SequentialAckingBatchedTaskExecutor;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.ProjectId;
+import org.elasticsearch.cluster.metadata.ProjectMetadata;
 import org.elasticsearch.cluster.metadata.StreamsMetadata;
 import org.elasticsearch.cluster.project.ProjectResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
 import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.streams.StreamType;
 import org.elasticsearch.injection.guice.Inject;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -75,19 +79,45 @@ public class TransportLogsStreamsToggleActivation extends AcknowledgedTransportM
         LogsStreamsActivationToggleAction.Request request,
         ClusterState state,
         ActionListener<AcknowledgedResponse> listener
-    ) throws Exception {
+    ) {
         ProjectId projectId = projectResolver.getProjectId();
-        StreamsMetadata streamsState = state.metadata().getProject(projectId).custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
+        ProjectMetadata projectMetadata = state.metadata().getProject(projectId);
+        StreamsMetadata streamsState = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
         boolean currentlyEnabled = streamsState.isLogsEnabled();
         boolean shouldEnable = request.shouldEnable();
-        if (shouldEnable != currentlyEnabled) {
-            StreamsMetadataUpdateTask updateTask = new StreamsMetadataUpdateTask(request, listener, projectId, shouldEnable);
-            String taskName = String.format(Locale.ROOT, "enable-streams-logs-[%s]", shouldEnable ? "enable" : "disable");
-            taskQueue.submitTask(taskName, updateTask, updateTask.timeout());
-        } else {
+
+        if (shouldEnable == currentlyEnabled) {
             logger.debug("Logs streams are already in the requested state: {}", shouldEnable);
             listener.onResponse(AcknowledgedResponse.TRUE);
+            return;
+        }
+
+        if (shouldEnable && logsIndexExists(projectMetadata)) {
+            listener.onFailure(
+                new ElasticsearchStatusException(
+                    "Cannot enable logs streams: indices named 'logs' or starting with 'logs.' already exist.",
+                    RestStatus.CONFLICT
+                )
+            );
+            return;
         }
+
+        StreamsMetadataUpdateTask updateTask = new StreamsMetadataUpdateTask(request, listener, projectId, shouldEnable);
+        String taskName = String.format(Locale.ROOT, "enable-streams-logs-[%s]", shouldEnable ? "enable" : "disable");
+        taskQueue.submitTask(taskName, updateTask, updateTask.timeout());
+    }
+
+    private boolean logsIndexExists(ProjectMetadata projectMetadata) {
+        String logsStreamName = StreamType.LOGS.getStreamName();
+        String logsStreamPrefix = logsStreamName + ".";
+
+        for (String name : projectMetadata.getConcreteAllIndices()) {
+            if (name.equals(logsStreamName) || name.startsWith(logsStreamPrefix)) {
+                return true;
+            }
+        }
+
+        return false;
     }
 
     @Override
@@ -111,7 +141,7 @@ public class TransportLogsStreamsToggleActivation extends AcknowledgedTransportM
         }
 
         @Override
-        public ClusterState execute(ClusterState currentState) throws Exception {
+        public ClusterState execute(ClusterState currentState) {
             return currentState.copyAndUpdateProject(
                 projectId,
                 builder -> builder.putCustom(StreamsMetadata.TYPE, new StreamsMetadata(enabled))

+ 41 - 0
modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/10_basic.yml

@@ -1,3 +1,8 @@
+---
+teardown:
+  - do:
+      streams.logs_disable: { }
+
 ---
 "Basic toggle of logs state enable to disable and back":
   - do:
@@ -24,6 +29,10 @@
       streams.status: { }
   - is_true: logs.enabled
 
+  - do:
+      streams.logs_disable: { }
+  - is_true: acknowledged
+
 ---
 "Check for repeated toggle to same state":
   - do:
@@ -41,3 +50,35 @@
   - do:
       streams.status: { }
   - is_true: logs.enabled
+
+  - do:
+      streams.logs_disable: { }
+  - is_true: acknowledged
+
+---
+"Check streams can't be enabled with existing logs indices":
+  - do:
+      indices.create:
+        index: logs
+
+  - do:
+      catch: conflict
+      streams.logs_enable: { }
+  - match: { error.reason: "Cannot enable logs streams: indices named 'logs' or starting with 'logs.' already exist." }
+
+  - do:
+      indices.delete:
+        index: logs
+
+  - do:
+      indices.create:
+        index: logs.test
+
+  - do:
+      catch: conflict
+      streams.logs_enable: { }
+  - match: { error.reason: "Cannot enable logs streams: indices named 'logs' or starting with 'logs.' already exist." }
+
+  - do:
+      indices.delete:
+        index: logs.test

+ 17 - 0
modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml

@@ -1,3 +1,8 @@
+---
+teardown:
+  - do:
+      streams.logs_disable: { }
+
 ---
 "Check User Can't Write To Substream Directly":
   - do:
@@ -71,6 +76,10 @@
   - match: { items.0.index.error.type: "illegal_argument_exception" }
   - match: { items.0.index.error.reason: "Pipeline [reroute-to-logs-foo] can't change the target index (from [bad-index] to [logs] child stream [logs.foo]) History: [bad-index]" }
 
+  - do:
+      indices.delete:
+        index: bad-index
+
 ---
 "Check Bulk Index With Script Processor To Substream Is Rejected":
   - do:
@@ -104,6 +113,10 @@
   - match: { items.0.index.error.type: "illegal_argument_exception" }
   - match: { items.0.index.error.reason: "Pipeline [script-to-logs-foo] can't change the target index (from [bad-index-script] to [logs] child stream [logs.foo]) History: [bad-index-script]" }
 
+  - do:
+      indices.delete:
+        index: bad-index-script
+
 ---
 "Check Delete By Query Directly On Substream After Reroute Succeeds":
   - do:
@@ -154,3 +167,7 @@
           query:
             match_all: {}
   - match: { hits.total.value: 0 }
+
+  - do:
+      indices.delete:
+        index: logs

+ 1 - 0
server/src/main/java/module-info.java

@@ -213,6 +213,7 @@ module org.elasticsearch.server {
     exports org.elasticsearch.common.regex;
     exports org.elasticsearch.common.scheduler;
     exports org.elasticsearch.common.settings;
+    exports org.elasticsearch.common.streams;
     exports org.elasticsearch.common.text;
     exports org.elasticsearch.common.time;
     exports org.elasticsearch.common.transport;