浏览代码

Make Watch transport actions project-aware (#129612)

Sam Xiao 4 月之前
父节点
当前提交
706e7f3b6f

+ 13 - 9
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportGetWatcherSettingsAction.java

@@ -10,12 +10,13 @@ package org.elasticsearch.xpack.watcher.transport.actions;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ChannelActionListener;
-import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
-import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction;
+import org.elasticsearch.cluster.ProjectState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.project.ProjectResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -32,7 +33,7 @@ import static org.elasticsearch.xpack.core.watcher.transport.actions.put.UpdateW
 import static org.elasticsearch.xpack.watcher.transport.actions.TransportUpdateWatcherSettingsAction.WATCHER_INDEX_NAME;
 import static org.elasticsearch.xpack.watcher.transport.actions.TransportUpdateWatcherSettingsAction.WATCHER_INDEX_REQUEST;
 
-public class TransportGetWatcherSettingsAction extends TransportLocalClusterStateAction<
+public class TransportGetWatcherSettingsAction extends TransportLocalProjectMetadataAction<
     GetWatcherSettingsAction.Request,
     GetWatcherSettingsAction.Response> {
 
@@ -49,14 +50,16 @@ public class TransportGetWatcherSettingsAction extends TransportLocalClusterStat
         TransportService transportService,
         ClusterService clusterService,
         ActionFilters actionFilters,
-        IndexNameExpressionResolver indexNameExpressionResolver
+        IndexNameExpressionResolver indexNameExpressionResolver,
+        ProjectResolver projectResolver
     ) {
         super(
             GetWatcherSettingsAction.NAME,
             actionFilters,
             transportService.getTaskManager(),
             clusterService,
-            EsExecutors.DIRECT_EXECUTOR_SERVICE
+            EsExecutors.DIRECT_EXECUTOR_SERVICE,
+            projectResolver
         );
         this.indexNameExpressionResolver = indexNameExpressionResolver;
 
@@ -74,11 +77,11 @@ public class TransportGetWatcherSettingsAction extends TransportLocalClusterStat
     protected void localClusterStateOperation(
         Task task,
         GetWatcherSettingsAction.Request request,
-        ClusterState state,
+        ProjectState state,
         ActionListener<GetWatcherSettingsAction.Response> listener
     ) {
         ((CancellableTask) task).ensureNotCancelled();
-        IndexMetadata metadata = state.metadata().getProject().index(WATCHER_INDEX_NAME);
+        IndexMetadata metadata = state.metadata().index(WATCHER_INDEX_NAME);
         if (metadata == null) {
             listener.onResponse(new GetWatcherSettingsAction.Response(Settings.EMPTY));
         } else {
@@ -103,15 +106,16 @@ public class TransportGetWatcherSettingsAction extends TransportLocalClusterStat
     }
 
     @Override
-    protected ClusterBlockException checkBlock(GetWatcherSettingsAction.Request request, ClusterState state) {
+    protected ClusterBlockException checkBlock(GetWatcherSettingsAction.Request request, ProjectState state) {
         ClusterBlockException globalBlock = state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
         if (globalBlock != null) {
             return globalBlock;
         }
         return state.blocks()
             .indicesBlockedException(
+                state.projectId(),
                 ClusterBlockLevel.METADATA_READ,
-                indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state, WATCHER_INDEX_REQUEST)
+                indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state.metadata(), WATCHER_INDEX_REQUEST)
             );
     }
 }

+ 7 - 2
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportUpdateWatcherSettingsAction.java

@@ -22,6 +22,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService;
+import org.elasticsearch.cluster.project.ProjectResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -55,6 +56,7 @@ public class TransportUpdateWatcherSettingsAction extends TransportMasterNodeAct
     private static final Logger logger = LogManager.getLogger(TransportUpdateWatcherSettingsAction.class);
     private final MetadataUpdateSettingsService updateSettingsService;
     private final IndexNameExpressionResolver indexNameExpressionResolver;
+    private final ProjectResolver projectResolver;
 
     @Inject
     public TransportUpdateWatcherSettingsAction(
@@ -63,7 +65,8 @@ public class TransportUpdateWatcherSettingsAction extends TransportMasterNodeAct
         ThreadPool threadPool,
         ActionFilters actionFilters,
         MetadataUpdateSettingsService updateSettingsService,
-        IndexNameExpressionResolver indexNameExpressionResolver
+        IndexNameExpressionResolver indexNameExpressionResolver,
+        ProjectResolver projectResolver
     ) {
         super(
             UpdateWatcherSettingsAction.NAME,
@@ -77,6 +80,7 @@ public class TransportUpdateWatcherSettingsAction extends TransportMasterNodeAct
         );
         this.updateSettingsService = updateSettingsService;
         this.indexNameExpressionResolver = indexNameExpressionResolver;
+        this.projectResolver = projectResolver;
     }
 
     @FixForMultiProject(description = "Don't use default project id to update settings")
@@ -87,7 +91,7 @@ public class TransportUpdateWatcherSettingsAction extends TransportMasterNodeAct
         ClusterState state,
         ActionListener<AcknowledgedResponse> listener
     ) {
-        final IndexMetadata watcherIndexMd = state.metadata().getProject().index(WATCHER_INDEX_NAME);
+        final IndexMetadata watcherIndexMd = projectResolver.getProjectMetadata(state.metadata()).index(WATCHER_INDEX_NAME);
         if (watcherIndexMd == null) {
             // Index does not exist, so fail fast
             listener.onFailure(new ResourceNotFoundException("no Watches found on which to modify settings"));
@@ -131,6 +135,7 @@ public class TransportUpdateWatcherSettingsAction extends TransportMasterNodeAct
         }
         return state.blocks()
             .indicesBlockedException(
+                projectResolver.getProjectId(),
                 ClusterBlockLevel.METADATA_WRITE,
                 indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state, WATCHER_INDEX_REQUEST)
             );

+ 6 - 2
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportWatcherServiceAction.java

@@ -18,6 +18,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.project.ProjectResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.injection.guice.Inject;
@@ -34,13 +35,15 @@ import static org.elasticsearch.core.Strings.format;
 public class TransportWatcherServiceAction extends AcknowledgedTransportMasterNodeAction<WatcherServiceRequest> {
 
     private static final Logger logger = LogManager.getLogger(TransportWatcherServiceAction.class);
+    private final ProjectResolver projectResolver;
 
     @Inject
     public TransportWatcherServiceAction(
         TransportService transportService,
         ClusterService clusterService,
         ThreadPool threadPool,
-        ActionFilters actionFilters
+        ActionFilters actionFilters,
+        ProjectResolver projectResolver
     ) {
         super(
             WatcherServiceAction.NAME,
@@ -51,6 +54,7 @@ public class TransportWatcherServiceAction extends AcknowledgedTransportMasterNo
             WatcherServiceRequest::new,
             threadPool.executor(ThreadPool.Names.MANAGEMENT)
         );
+        this.projectResolver = projectResolver;
     }
 
     @Override
@@ -73,7 +77,7 @@ public class TransportWatcherServiceAction extends AcknowledgedTransportMasterNo
                     XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);
 
                     WatcherMetadata newWatcherMetadata = new WatcherMetadata(manuallyStopped);
-                    final var project = clusterState.metadata().getProject();
+                    final var project = projectResolver.getProjectMetadata(clusterState);
                     WatcherMetadata currentMetadata = project.custom(WatcherMetadata.TYPE);
 
                     // adhere to the contract of returning the original state if nothing has changed

+ 7 - 2
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportWatcherStatsAction.java

@@ -10,6 +10,7 @@ import org.elasticsearch.action.FailedNodeException;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.nodes.TransportNodesAction;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.project.ProjectResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.injection.guice.Inject;
@@ -42,6 +43,7 @@ public class TransportWatcherStatsAction extends TransportNodesAction<
     private final ExecutionService executionService;
     private final TriggerService triggerService;
     private final WatcherLifeCycleService lifeCycleService;
+    private final ProjectResolver projectResolver;
 
     @Inject
     public TransportWatcherStatsAction(
@@ -51,7 +53,8 @@ public class TransportWatcherStatsAction extends TransportNodesAction<
         ActionFilters actionFilters,
         WatcherLifeCycleService lifeCycleService,
         ExecutionService executionService,
-        TriggerService triggerService
+        TriggerService triggerService,
+        ProjectResolver projectResolver
     ) {
         super(
             WatcherStatsAction.NAME,
@@ -64,6 +67,7 @@ public class TransportWatcherStatsAction extends TransportNodesAction<
         this.lifeCycleService = lifeCycleService;
         this.executionService = executionService;
         this.triggerService = triggerService;
+        this.projectResolver = projectResolver;
     }
 
     @Override
@@ -106,7 +110,8 @@ public class TransportWatcherStatsAction extends TransportNodesAction<
     }
 
     private WatcherMetadata getWatcherMetadata() {
-        WatcherMetadata watcherMetadata = clusterService.state().getMetadata().getProject().custom(WatcherMetadata.TYPE);
+
+        WatcherMetadata watcherMetadata = projectResolver.getProjectMetadata(clusterService.state()).custom(WatcherMetadata.TYPE);
         if (watcherMetadata == null) {
             watcherMetadata = new WatcherMetadata(false);
         }

+ 9 - 2
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/TransportWatcherStatsActionTests.java

@@ -10,8 +10,11 @@ import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.metadata.ProjectId;
+import org.elasticsearch.cluster.metadata.ProjectMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
+import org.elasticsearch.cluster.project.TestProjectResolvers;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.test.ESTestCase;
@@ -48,6 +51,7 @@ public class TransportWatcherStatsActionTests extends ESTestCase {
 
     @Before
     public void setupTransportAction() {
+        ProjectId projectId = randomProjectIdOrDefault();
         threadPool = new TestThreadPool("TransportWatcherStatsActionTests");
         TransportService transportService = mock(TransportService.class);
         when(transportService.getThreadPool()).thenReturn(threadPool);
@@ -60,8 +64,10 @@ public class TransportWatcherStatsActionTests extends ESTestCase {
         when(clusterService.getClusterName()).thenReturn(clusterName);
 
         ClusterState clusterState = mock(ClusterState.class);
-        when(clusterState.getMetadata()).thenReturn(Metadata.EMPTY_METADATA);
         when(clusterService.state()).thenReturn(clusterState);
+        Metadata metadata = Metadata.builder().put(ProjectMetadata.builder(projectId).build()).build();
+        when(clusterState.getMetadata()).thenReturn(metadata);
+        when(clusterState.metadata()).thenReturn(metadata);
 
         WatcherLifeCycleService watcherLifeCycleService = mock(WatcherLifeCycleService.class);
         when(watcherLifeCycleService.getState()).thenReturn(() -> WatcherState.STARTED);
@@ -91,7 +97,8 @@ public class TransportWatcherStatsActionTests extends ESTestCase {
             new ActionFilters(Collections.emptySet()),
             watcherLifeCycleService,
             executionService,
-            triggerService
+            triggerService,
+            TestProjectResolvers.singleProject(projectId)
         );
     }