1
0
Эх сурвалжийг харах

Refactored indices aliases api to make use of the new recently introduced generic ack mechanism

Closes #4114
Luca Cavanna 12 жил өмнө
parent
commit
b2ad34cf99

+ 49 - 0
src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesClusterStateUpdateRequest.java

@@ -0,0 +1,49 @@
+/*
+ * Licensed to ElasticSearch and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. ElasticSearch licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.elasticsearch.action.admin.indices.alias;
+
+import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
+import org.elasticsearch.cluster.metadata.AliasAction;
+
+/**
+ * Cluster state update request that allows to add or remove aliases
+ */
+public class IndicesAliasesClusterStateUpdateRequest extends ClusterStateUpdateRequest<IndicesAliasesClusterStateUpdateRequest> {
+
+    AliasAction[] actions;
+
+    IndicesAliasesClusterStateUpdateRequest() {
+
+    }
+
+    /**
+     * Returns the alias actions to be performed
+     */
+    public AliasAction[] actions() {
+        return actions;
+    }
+
+    /**
+     * Sets the alias actions to be executed
+     */
+    public IndicesAliasesClusterStateUpdateRequest actions(AliasAction[] actions) {
+        this.actions = actions;
+        return this;
+    }
+}

+ 4 - 33
src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesRequest.java

@@ -22,12 +22,11 @@ package org.elasticsearch.action.admin.indices.alias;
 import com.google.common.collect.Lists;
 import org.elasticsearch.ElasticSearchGenerationException;
 import org.elasticsearch.action.ActionRequestValidationException;
-import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
+import org.elasticsearch.action.support.master.AcknowledgedRequest;
 import org.elasticsearch.cluster.metadata.AliasAction;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
@@ -41,17 +40,14 @@ import java.util.Map;
 
 import static org.elasticsearch.action.ValidateActions.addValidationError;
 import static org.elasticsearch.cluster.metadata.AliasAction.readAliasAction;
-import static org.elasticsearch.common.unit.TimeValue.readTimeValue;
 
 /**
  * A request to add/remove aliases for one or more indices.
  */
-public class IndicesAliasesRequest extends MasterNodeOperationRequest<IndicesAliasesRequest> {
+public class IndicesAliasesRequest extends AcknowledgedRequest<IndicesAliasesRequest> {
 
     private List<AliasAction> aliasActions = Lists.newArrayList();
 
-    private TimeValue timeout = TimeValue.timeValueSeconds(10);
-
     public IndicesAliasesRequest() {
 
     }
@@ -147,31 +143,6 @@ public class IndicesAliasesRequest extends MasterNodeOperationRequest<IndicesAli
         return aliasActions();
     }
 
-    /**
-     * Timeout to wait till the put mapping gets acknowledged of all current cluster nodes. Defaults to
-     * <tt>10s</tt>.
-     */
-    TimeValue timeout() {
-        return timeout;
-    }
-
-    /**
-     * Timeout to wait till the alias operations get acknowledged of all current cluster nodes. Defaults to
-     * <tt>10s</tt>.
-     */
-    public IndicesAliasesRequest timeout(TimeValue timeout) {
-        this.timeout = timeout;
-        return this;
-    }
-
-    /**
-     * Timeout to wait till the alias operations get acknowledged of all current cluster nodes. Defaults to
-     * <tt>10s</tt>.
-     */
-    public IndicesAliasesRequest timeout(String timeout) {
-        return timeout(TimeValue.parseTimeValue(timeout, TimeValue.timeValueSeconds(10)));
-    }
-
     @Override
     public ActionRequestValidationException validate() {
         ActionRequestValidationException validationException = null;
@@ -196,7 +167,7 @@ public class IndicesAliasesRequest extends MasterNodeOperationRequest<IndicesAli
         for (int i = 0; i < size; i++) {
             aliasActions.add(readAliasAction(in));
         }
-        timeout = readTimeValue(in);
+        readTimeout(in, null);
     }
 
     @Override
@@ -206,6 +177,6 @@ public class IndicesAliasesRequest extends MasterNodeOperationRequest<IndicesAli
         for (AliasAction aliasAction : aliasActions) {
             aliasAction.writeTo(out);
         }
-        timeout.writeTo(out);
+        writeTimeout(out, null);
     }
 }

+ 2 - 13
src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesRequestBuilder.java

@@ -20,11 +20,10 @@
 package org.elasticsearch.action.admin.indices.alias;
 
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
+import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
 import org.elasticsearch.client.IndicesAdminClient;
 import org.elasticsearch.client.internal.InternalIndicesAdminClient;
 import org.elasticsearch.cluster.metadata.AliasAction;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.query.FilterBuilder;
 
 import java.util.Map;
@@ -32,7 +31,7 @@ import java.util.Map;
 /**
  *
  */
-public class IndicesAliasesRequestBuilder extends MasterNodeOperationRequestBuilder<IndicesAliasesRequest, IndicesAliasesResponse, IndicesAliasesRequestBuilder> {
+public class IndicesAliasesRequestBuilder extends AcknowledgedRequestBuilder<IndicesAliasesRequest, IndicesAliasesResponse, IndicesAliasesRequestBuilder> {
 
     public IndicesAliasesRequestBuilder(IndicesAdminClient indicesClient) {
         super((InternalIndicesAdminClient) indicesClient, new IndicesAliasesRequest());
@@ -106,16 +105,6 @@ public class IndicesAliasesRequestBuilder extends MasterNodeOperationRequestBuil
         return this;
     }
 
-    /**
-     * Sets operation timeout.
-     *
-     * @param timeout
-     */
-    public IndicesAliasesRequestBuilder setTimeout(TimeValue timeout) {
-        request.timeout(timeout);
-        return this;
-    }
-
     @Override
     protected void doExecute(ActionListener<IndicesAliasesResponse> listener) {
         ((IndicesAdminClient) client).aliases(request, listener);

+ 5 - 11
src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesResponse.java

@@ -19,7 +19,7 @@
 
 package org.elasticsearch.action.admin.indices.alias;
 
-import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 
@@ -28,31 +28,25 @@ import java.io.IOException;
 /**
  * A response for a add/remove alias action.
  */
-public class IndicesAliasesResponse extends ActionResponse {
-
-    private boolean acknowledged;
+public class IndicesAliasesResponse extends AcknowledgedResponse {
 
     IndicesAliasesResponse() {
 
     }
 
     IndicesAliasesResponse(boolean acknowledged) {
-        this.acknowledged = acknowledged;
-    }
-
-    public boolean isAcknowledged() {
-        return acknowledged;
+        super(acknowledged);
     }
 
     @Override
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);
-        acknowledged = in.readBoolean();
+        readAcknowledged(in, null);
     }
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
-        out.writeBoolean(acknowledged);
+        writeAcknowledged(out, null);
     }
 }

+ 11 - 4
src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java

@@ -25,6 +25,8 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
 import org.elasticsearch.cluster.ClusterService;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
+import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.AliasAction;
@@ -37,7 +39,7 @@ import org.elasticsearch.transport.TransportService;
 import java.util.Set;
 
 /**
- *
+ * Add/remove aliases action
  */
 public class TransportIndicesAliasesAction extends TransportMasterNodeOperationAction<IndicesAliasesRequest, IndicesAliasesResponse> {
 
@@ -82,10 +84,15 @@ public class TransportIndicesAliasesAction extends TransportMasterNodeOperationA
 
     @Override
     protected void masterOperation(final IndicesAliasesRequest request, final ClusterState state, final ActionListener<IndicesAliasesResponse> listener) throws ElasticSearchException {
-        indexAliasesService.indicesAliases(new MetaDataIndexAliasesService.Request(request.aliasActions().toArray(new AliasAction[request.aliasActions().size()]), request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataIndexAliasesService.Listener() {
+
+        IndicesAliasesClusterStateUpdateRequest updateRequest = new IndicesAliasesClusterStateUpdateRequest()
+                .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
+                .actions(request.aliasActions().toArray(new AliasAction[request.aliasActions().size()]));
+
+        indexAliasesService.indicesAliases(updateRequest, new ClusterStateUpdateListener() {
             @Override
-            public void onResponse(MetaDataIndexAliasesService.Response response) {
-                listener.onResponse(new IndicesAliasesResponse(response.acknowledged()));
+            public void onResponse(ClusterStateUpdateResponse response) {
+                listener.onResponse(new IndicesAliasesResponse(response.isAcknowledged()));
             }
 
             @Override

+ 0 - 1
src/main/java/org/elasticsearch/cluster/ClusterModule.java

@@ -76,7 +76,6 @@ public class ClusterModule extends AbstractModule implements SpawnModules {
         bind(NodeMappingCreatedAction.class).asEagerSingleton();
         bind(NodeMappingRefreshAction.class).asEagerSingleton();
         bind(MappingUpdatedAction.class).asEagerSingleton();
-        bind(NodeAliasesUpdatedAction.class).asEagerSingleton();
         bind(NodeIndicesStateUpdatedAction.class).asEagerSingleton();
 
         bind(ClusterInfoService.class).to(InternalClusterInfoService.class).asEagerSingleton();

+ 0 - 156
src/main/java/org/elasticsearch/cluster/action/index/NodeAliasesUpdatedAction.java

@@ -1,156 +0,0 @@
-/*
- * Licensed to ElasticSearch and Shay Banon under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. ElasticSearch licenses this
- * file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.cluster.action.index;
-
-import org.elasticsearch.ElasticSearchException;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.node.DiscoveryNodes;
-import org.elasticsearch.common.component.AbstractComponent;
-import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.*;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-/**
- *
- */
-public class NodeAliasesUpdatedAction extends AbstractComponent {
-
-    private final ThreadPool threadPool;
-    private final TransportService transportService;
-    private final List<Listener> listeners = new CopyOnWriteArrayList<Listener>();
-
-    @Inject
-    public NodeAliasesUpdatedAction(Settings settings, ThreadPool threadPool, TransportService transportService) {
-        super(settings);
-        this.threadPool = threadPool;
-        this.transportService = transportService;
-        transportService.registerHandler(NodeAliasesUpdatedTransportHandler.ACTION, new NodeAliasesUpdatedTransportHandler());
-    }
-
-    public void add(final Listener listener, TimeValue timeout) {
-        listeners.add(listener);
-        threadPool.schedule(timeout, ThreadPool.Names.GENERIC, new Runnable() {
-            @Override
-            public void run() {
-                boolean removed = listeners.remove(listener);
-                if (removed) {
-                    listener.onTimeout();
-                }
-            }
-        });
-    }
-
-    public void remove(Listener listener) {
-        listeners.remove(listener);
-    }
-
-    public void nodeAliasesUpdated(final ClusterState clusterState, final NodeAliasesUpdatedResponse response) throws ElasticSearchException {
-        DiscoveryNodes nodes = clusterState.nodes();
-        if (nodes.localNodeMaster()) {
-            threadPool.generic().execute(new Runnable() {
-                @Override
-                public void run() {
-                    innerNodeAliasesUpdated(response);
-                }
-            });
-        } else {
-            transportService.sendRequest(clusterState.nodes().masterNode(),
-                    NodeAliasesUpdatedTransportHandler.ACTION, response, EmptyTransportResponseHandler.INSTANCE_SAME);
-        }
-    }
-
-    private void innerNodeAliasesUpdated(NodeAliasesUpdatedResponse response) {
-        for (Listener listener : listeners) {
-            listener.onAliasesUpdated(response);
-        }
-    }
-
-
-    public static interface Listener {
-        void onAliasesUpdated(NodeAliasesUpdatedResponse response);
-
-        void onTimeout();
-    }
-
-    private class NodeAliasesUpdatedTransportHandler extends BaseTransportRequestHandler<NodeAliasesUpdatedResponse> {
-
-        static final String ACTION = "cluster/nodeAliasesUpdated";
-
-        @Override
-        public NodeAliasesUpdatedResponse newInstance() {
-            return new NodeAliasesUpdatedResponse();
-        }
-
-        @Override
-        public void messageReceived(NodeAliasesUpdatedResponse response, TransportChannel channel) throws Exception {
-            innerNodeAliasesUpdated(response);
-            channel.sendResponse(TransportResponse.Empty.INSTANCE);
-        }
-
-        @Override
-        public String executor() {
-            return ThreadPool.Names.SAME;
-        }
-    }
-
-    public static class NodeAliasesUpdatedResponse extends TransportRequest {
-
-        private String nodeId;
-        private long version;
-
-        NodeAliasesUpdatedResponse() {
-        }
-
-        public NodeAliasesUpdatedResponse(String nodeId, long version) {
-            this.nodeId = nodeId;
-            this.version = version;
-        }
-
-        public String nodeId() {
-            return nodeId;
-        }
-
-        public long version() {
-            return version;
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            super.writeTo(out);
-            out.writeString(nodeId);
-            out.writeLong(version);
-        }
-
-        @Override
-        public void readFrom(StreamInput in) throws IOException {
-            super.readFrom(in);
-            nodeId = in.readString();
-            version = in.readLong();
-        }
-    }
-}

+ 37 - 100
src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java

@@ -22,18 +22,20 @@ package org.elasticsearch.cluster.metadata;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.elasticsearch.ElasticSearchIllegalArgumentException;
-import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
+import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest;
+import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
 import org.elasticsearch.cluster.ClusterService;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
-import org.elasticsearch.cluster.action.index.NodeAliasesUpdatedAction;
+import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
+import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.CountDown;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.index.Index;
@@ -48,7 +50,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- *
+ * Service responsible for submitting add and remove aliases requests
  */
 public class MetaDataIndexAliasesService extends AbstractComponent {
 
@@ -56,22 +58,39 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
 
     private final IndicesService indicesService;
 
-    private final NodeAliasesUpdatedAction aliasOperationPerformedAction;
-
     @Inject
-    public MetaDataIndexAliasesService(Settings settings, ClusterService clusterService, IndicesService indicesService, NodeAliasesUpdatedAction aliasOperationPerformedAction) {
+    public MetaDataIndexAliasesService(Settings settings, ClusterService clusterService, IndicesService indicesService) {
         super(settings);
         this.clusterService = clusterService;
         this.indicesService = indicesService;
-        this.aliasOperationPerformedAction = aliasOperationPerformedAction;
     }
 
-    public void indicesAliases(final Request request, final Listener listener) {
-        clusterService.submitStateUpdateTask("index-aliases", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
+    public void indicesAliases(final IndicesAliasesClusterStateUpdateRequest request, final ClusterStateUpdateListener listener) {
+        clusterService.submitStateUpdateTask("index-aliases", Priority.URGENT, new AckedClusterStateUpdateTask() {
+
+            @Override
+            public boolean mustAck(DiscoveryNode discoveryNode) {
+                return true;
+            }
+
+            @Override
+            public void onAllNodesAcked(@Nullable Throwable t) {
+                listener.onResponse(new ClusterStateUpdateResponse(true));
+            }
+
+            @Override
+            public void onAckTimeout() {
+                listener.onResponse(new ClusterStateUpdateResponse(false));
+            }
+
+            @Override
+            public TimeValue ackTimeout() {
+                return request.ackTimeout();
+            }
 
             @Override
             public TimeValue timeout() {
-                return request.masterTimeout;
+                return request.masterNodeTimeout();
             }
 
             @Override
@@ -84,7 +103,7 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
                 List<String> indicesToClose = Lists.newArrayList();
                 Map<String, IndexService> indices = Maps.newHashMap();
                 try {
-                    for (AliasAction aliasAction : request.actions) {
+                    for (AliasAction aliasAction : request.actions()) {
                         if (!Strings.hasText(aliasAction.alias()) || !Strings.hasText(aliasAction.index())) {
                             throw new ElasticSearchIllegalArgumentException("Index name and alias name are required");
                         }
@@ -101,7 +120,7 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
 
                     boolean changed = false;
                     MetaData.Builder builder = MetaData.builder(currentState.metaData());
-                    for (AliasAction aliasAction : request.actions) {
+                    for (AliasAction aliasAction : request.actions()) {
                         IndexMetaData indexMetaData = builder.get(aliasAction.index());
                         if (indexMetaData == null) {
                             throw new IndexMissingException(new Index(aliasAction.index()));
@@ -175,20 +194,11 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
                         ClusterState updatedState = ClusterState.builder(currentState).metaData(builder).build();
                         // even though changes happened, they resulted in 0 actual changes to metadata
                         // i.e. remove and add the same alias to the same index
-                        if (updatedState.metaData().aliases().equals(currentState.metaData().aliases())) {
-                            return currentState;
+                        if (!updatedState.metaData().aliases().equals(currentState.metaData().aliases())) {
+                            return updatedState;
                         }
-                        // wait for responses from other nodes if needed
-                        int responseCount = updatedState.nodes().size();
-                        long version = updatedState.version() + 1;
-                        logger.trace("waiting for [{}] notifications with version [{}]", responseCount, version);
-                        aliasOperationPerformedAction.add(new CountDownListener(responseCount, listener, version), request.timeout);
-
-                        return updatedState;
-                    } else {
-                        // Nothing to do
-                        return currentState;
                     }
+                    return currentState;
                 } finally {
                     for (String index : indicesToClose) {
                         indicesService.removeIndex(index, "created for alias processing");
@@ -198,81 +208,8 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
 
             @Override
             public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                if (oldState == newState) {
-                    // we didn't do anything, callback
-                    listener.onResponse(new Response(true));
-                }
-            }
-        });
-    }
-
-    public static interface Listener {
 
-        void onResponse(Response response);
-
-        void onFailure(Throwable t);
-    }
-
-    public static class Request {
-
-        final AliasAction[] actions;
-
-        final TimeValue timeout;
-        TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT;
-
-        public Request(AliasAction[] actions, TimeValue timeout) {
-            this.actions = actions;
-            this.timeout = timeout;
-        }
-
-        public Request masterTimeout(TimeValue masterTimeout) {
-            this.masterTimeout = masterTimeout;
-            return this;
-        }
-    }
-
-    public static class Response {
-        private final boolean acknowledged;
-
-        public Response(boolean acknowledged) {
-            this.acknowledged = acknowledged;
-        }
-
-        public boolean acknowledged() {
-            return acknowledged;
-        }
-    }
-
-    private class CountDownListener implements NodeAliasesUpdatedAction.Listener {
-
-        private final CountDown countDown;
-        private final Listener listener;
-        private final long version;
-
-        public CountDownListener(int countDown, Listener listener, long version) {
-            this.countDown = new CountDown(countDown);
-            this.listener = listener;
-            this.version = version;
-        }
-
-        @Override
-        public void onAliasesUpdated(NodeAliasesUpdatedAction.NodeAliasesUpdatedResponse response) {
-            if (version <= response.version()) {
-                logger.trace("Received NodeAliasesUpdatedResponse with version [{}] from [{}]", response.version(), response.nodeId());
-                if (countDown.countDown()) {
-                    aliasOperationPerformedAction.remove(this);
-                    logger.trace("NodeAliasUpdated was acknowledged by all expected nodes, returning");
-                    listener.onResponse(new Response(true));
-                }
             }
-        }
-
-        @Override
-        public void onTimeout() {
-            if (countDown.fastForward()) {
-                aliasOperationPerformedAction.remove(this);
-                listener.onResponse(new Response(false));
-            }
-        }
+        });
     }
 }

+ 1 - 6
src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

@@ -83,7 +83,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
     private final NodeIndexDeletedAction nodeIndexDeletedAction;
     private final NodeMappingCreatedAction nodeMappingCreatedAction;
     private final NodeMappingRefreshAction nodeMappingRefreshAction;
-    private final NodeAliasesUpdatedAction nodeAliasesUpdatedAction;
     private final NodeIndicesStateUpdatedAction nodeIndicesStateUpdatedAction;
 
     // a map of mappings type we have seen per index due to cluster state
@@ -114,7 +113,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
                                       ShardStateAction shardStateAction,
                                       NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction,
                                       NodeMappingCreatedAction nodeMappingCreatedAction, NodeMappingRefreshAction nodeMappingRefreshAction,
-                                      NodeAliasesUpdatedAction nodeAliasesUpdatedAction, NodeIndicesStateUpdatedAction nodeIndicesStateUpdatedAction) {
+                                      NodeIndicesStateUpdatedAction nodeIndicesStateUpdatedAction) {
         super(settings);
         this.indicesService = indicesService;
         this.clusterService = clusterService;
@@ -125,7 +124,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
         this.nodeIndexDeletedAction = nodeIndexDeletedAction;
         this.nodeMappingCreatedAction = nodeMappingCreatedAction;
         this.nodeMappingRefreshAction = nodeMappingRefreshAction;
-        this.nodeAliasesUpdatedAction = nodeAliasesUpdatedAction;
         this.nodeIndicesStateUpdatedAction = nodeIndicesStateUpdatedAction;
     }
 
@@ -471,9 +469,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
                     }
                 }
             }
-            // Notify client that alias changes were applied
-            nodeAliasesUpdatedAction.nodeAliasesUpdated(event.state(),
-                    new NodeAliasesUpdatedAction.NodeAliasesUpdatedResponse(event.state().nodes().localNodeId(), event.state().version()));
         }
     }
 

+ 2 - 30
src/main/java/org/elasticsearch/rest/action/admin/indices/alias/RestIndicesAliasesAction.java

@@ -20,14 +20,12 @@
 package org.elasticsearch.rest.action.admin.indices.alias;
 
 import org.elasticsearch.ElasticSearchIllegalArgumentException;
-import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.metadata.AliasAction;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.rest.*;
@@ -36,10 +34,7 @@ import java.io.IOException;
 import java.util.Map;
 
 import static org.elasticsearch.cluster.metadata.AliasAction.newAddAliasAction;
-import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
 import static org.elasticsearch.rest.RestRequest.Method.POST;
-import static org.elasticsearch.rest.RestStatus.OK;
-import static org.elasticsearch.rest.action.support.RestXContentBuilder.restContentBuilder;
 
 /**
  *
@@ -65,7 +60,7 @@ public class RestIndicesAliasesAction extends BaseRestHandler {
             //         { remove : { index : "test1", alias : "alias1" } }
             //     ]
             // }
-            indicesAliasesRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
+            indicesAliasesRequest.timeout(request.paramAsTime("timeout", indicesAliasesRequest.timeout()));
             parser = XContentFactory.xContent(request.content()).createParser(request.content());
             XContentParser.Token token = parser.nextToken();
             if (token == null) {
@@ -148,29 +143,6 @@ public class RestIndicesAliasesAction extends BaseRestHandler {
         } finally {
             parser.close();
         }
-        client.admin().indices().aliases(indicesAliasesRequest, new ActionListener<IndicesAliasesResponse>() {
-            @Override
-            public void onResponse(IndicesAliasesResponse response) {
-                try {
-                    XContentBuilder builder = restContentBuilder(request);
-                    builder.startObject()
-                            .field("ok", true)
-                            .field("acknowledged", response.isAcknowledged())
-                            .endObject();
-                    channel.sendResponse(new XContentRestResponse(request, OK, builder));
-                } catch (Throwable e) {
-                    onFailure(e);
-                }
-            }
-
-            @Override
-            public void onFailure(Throwable e) {
-                try {
-                    channel.sendResponse(new XContentThrowableRestResponse(request, e));
-                } catch (IOException e1) {
-                    logger.error("Failed to send failure response", e1);
-                }
-            }
-        });
+        client.admin().indices().aliases(indicesAliasesRequest, new AcknowledgedRestResponseActionListener<IndicesAliasesResponse>(request, channel, logger));
     }
 }

+ 29 - 0
src/test/java/org/elasticsearch/cluster/ack/AckTests.java

@@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
 import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
 import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
 import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse;
 import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
 import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse;
@@ -33,6 +34,7 @@ import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse;
 import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.AliasMetaData;
 import org.elasticsearch.cluster.routing.*;
 import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
 import org.elasticsearch.common.settings.ImmutableSettings;
@@ -47,6 +49,7 @@ import java.util.Map;
 import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
 import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
 import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.hamcrest.Matchers.*;
 
 @ClusterScope(scope = SUITE)
@@ -383,4 +386,30 @@ public class AckTests extends ElasticsearchIntegrationTest {
         //removes the allocation exclude settings
         client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("cluster.routing.allocation.exclude._id", "")).get();
     }
+
+    @Test
+    public void testIndicesAliasesAcknowledgement() {
+        createIndex("test");
+
+        //testing acknowledgement when trying to submit an existing alias too
+        //in that case it would not make any change, but we are sure about the cluster state
+        //as the previous operation was acknowledged
+        for (int i = 0; i < 2; i++) {
+            assertAcked(client().admin().indices().prepareAliases().addAlias("test", "alias"));
+
+            for (Client client : clients()) {
+                ClusterState clusterState = client.admin().cluster().prepareState().setLocal(true).get().getState();
+                AliasMetaData aliasMetaData = clusterState.metaData().aliases().get("alias").get("test");
+                assertThat(aliasMetaData.alias(), equalTo("alias"));
+            }
+        }
+    }
+
+    @Test
+    public void testIndicesAliasesNoAcknowledgement() {
+        createIndex("test");
+
+        IndicesAliasesResponse indicesAliasesResponse = client().admin().indices().prepareAliases().addAlias("test", "alias").setTimeout("0s").get();
+        assertThat(indicesAliasesResponse.isAcknowledged(), equalTo(false));
+    }
 }

+ 5 - 0
src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java

@@ -36,6 +36,7 @@ import org.elasticsearch.action.count.CountResponse;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
+import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.BytesStreamInput;
@@ -64,6 +65,10 @@ import static org.junit.Assert.fail;
  */
 public class ElasticsearchAssertions {
 
+    public static void assertAcked(AcknowledgedRequestBuilder<?, ?, ?> builder) {
+        assertAcked(builder.get());
+    }
+
     public static void assertAcked(AcknowledgedResponse response) {
         assertThat(response.getClass().getSimpleName() + " failed - not acked", response.isAcknowledged(), equalTo(true));
         assertVersionSerializable(response);