|
@@ -36,18 +36,6 @@ public class TransportWatcherServiceAction extends AcknowledgedTransportMasterNo
|
|
|
|
|
|
private static final Logger logger = LogManager.getLogger(TransportWatcherServiceAction.class);
|
|
|
|
|
|
- private static final AckedRequest ackedRequest = new AckedRequest() {
|
|
|
- @Override
|
|
|
- public TimeValue ackTimeout() {
|
|
|
- return AcknowledgedRequest.DEFAULT_ACK_TIMEOUT;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public TimeValue masterNodeTimeout() {
|
|
|
- return AcknowledgedRequest.DEFAULT_ACK_TIMEOUT;
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
@Inject
|
|
|
public TransportWatcherServiceAction(TransportService transportService, ClusterService clusterService,
|
|
|
ThreadPool threadPool, ActionFilters actionFilters,
|
|
@@ -59,20 +47,22 @@ public class TransportWatcherServiceAction extends AcknowledgedTransportMasterNo
|
|
|
@Override
|
|
|
protected void masterOperation(Task task, WatcherServiceRequest request, ClusterState state,
|
|
|
ActionListener<AcknowledgedResponse> listener) {
|
|
|
- switch (request.getCommand()) {
|
|
|
- case STOP:
|
|
|
- setWatcherMetadataAndWait(true, listener);
|
|
|
- break;
|
|
|
- case START:
|
|
|
- setWatcherMetadataAndWait(false, listener);
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
+ final boolean manuallyStopped = request.getCommand() == WatcherServiceRequest.Command.STOP;
|
|
|
+ final String source = manuallyStopped ? "update_watcher_manually_stopped" : "update_watcher_manually_started";
|
|
|
|
|
|
- private void setWatcherMetadataAndWait(boolean manuallyStopped, final ActionListener<AcknowledgedResponse> listener) {
|
|
|
- String source = manuallyStopped ? "update_watcher_manually_stopped" : "update_watcher_manually_started";
|
|
|
+ // TODO: make WatcherServiceRequest a real AckedRequest so that we have both a configurable timeout and master node timeout like
|
|
|
+ // we do elsewhere
|
|
|
+ clusterService.submitStateUpdateTask(source, new AckedClusterStateUpdateTask(new AckedRequest() {
|
|
|
+ @Override
|
|
|
+ public TimeValue ackTimeout() {
|
|
|
+ return AcknowledgedRequest.DEFAULT_ACK_TIMEOUT;
|
|
|
+ }
|
|
|
|
|
|
- clusterService.submitStateUpdateTask(source, new AckedClusterStateUpdateTask(ackedRequest, listener) {
|
|
|
+ @Override
|
|
|
+ public TimeValue masterNodeTimeout() {
|
|
|
+ return request.masterNodeTimeout();
|
|
|
+ }
|
|
|
+ }, listener) {
|
|
|
@Override
|
|
|
public ClusterState execute(ClusterState clusterState) {
|
|
|
XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);
|