瀏覽代碼

Made possible to dynamically update `discovery.zen.publish_timeout` cluster setting

`discovery.zen.publish_timeout` controls how long the master node is going to try and wait for all the nodes to respond to a cluster state publish before going ahead with the following updates in the queue (default 30s). Up until now changing the settings required restarting each node. The setting is now dynamic and can be changed through the cluster update settings api.

Closes #5063
Luca Cavanna 11 年之前
父節點
當前提交
766134f3c7

+ 2 - 0
src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java

@@ -25,6 +25,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
 import org.elasticsearch.cluster.routing.allocation.decider.*;
 import org.elasticsearch.common.inject.AbstractModule;
+import org.elasticsearch.discovery.DiscoverySettings;
 import org.elasticsearch.discovery.zen.elect.ElectMasterService;
 import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
 import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService;
@@ -81,6 +82,7 @@ public class ClusterDynamicSettingsModule extends AbstractModule {
         clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, Validator.MEMORY_SIZE);
         clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
         clusterDynamicSettings.addDynamicSetting(DestructiveOperations.REQUIRES_NAME);
+        clusterDynamicSettings.addDynamicSetting(DiscoverySettings.PUBLISH_TIMEOUT, Validator.TIME_NON_NEGATIVE);
     }
 
     public void addDynamicSettings(String... settings) {

+ 18 - 0
src/main/java/org/elasticsearch/cluster/settings/Validator.java

@@ -55,6 +55,24 @@ public interface Validator {
         }
     };
 
+    public static final Validator TIME_NON_NEGATIVE = new Validator() {
+        @Override
+        public String validate(String setting, String value) {
+            try {
+                TimeValue timeValue = TimeValue.parseTimeValue(value, null);
+                if (timeValue == null) {
+                    return "cannot parse value [" + value + "] as time";
+                }
+                if (timeValue.millis() < 0) {
+                    return "cannot parse value [" + value + "] as non negative time";
+                }
+            } catch (ElasticsearchParseException ex) {
+                return "cannot parse value [" + value + "] as time";
+            }
+            return null;
+        }
+    };
+
     public static final Validator FLOAT = new Validator() {
         @Override
         public String validate(String setting, String value) {

+ 0 - 3
src/main/java/org/elasticsearch/discovery/Discovery.java

@@ -26,7 +26,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.component.LifecycleComponent;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.node.service.NodeService;
 import org.elasticsearch.rest.RestStatus;
 
@@ -39,8 +38,6 @@ public interface Discovery extends LifecycleComponent<Discovery> {
 
     final ClusterBlock NO_MASTER_BLOCK = new ClusterBlock(2, "no master", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
 
-    public static final TimeValue DEFAULT_PUBLISH_TIMEOUT = TimeValue.timeValueSeconds(30);
-
     DiscoveryNode localNode();
 
     void addListener(InitialStateDiscoveryListener listener);

+ 64 - 0
src/main/java/org/elasticsearch/discovery/DiscoverySettings.java

@@ -0,0 +1,64 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.discovery;
+
+import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.node.settings.NodeSettingsService;
+
+/**
+ * Exposes common discovery settings that may be supported by all the different discovery implementations
+ */
+public class DiscoverySettings extends AbstractComponent {
+
+    public static final String PUBLISH_TIMEOUT = "discovery.zen.publish_timeout";
+
+    private static final TimeValue DEFAULT_PUBLISH_TIMEOUT = TimeValue.timeValueSeconds(30);
+
+    private volatile TimeValue publishTimeout = DEFAULT_PUBLISH_TIMEOUT;
+
+    @Inject
+    public DiscoverySettings(Settings settings, NodeSettingsService nodeSettingsService) {
+        super(settings);
+        nodeSettingsService.addListener(new ApplySettings());
+    }
+
+    /**
+     * Returns the current publish timeout
+     */
+    public TimeValue getPublishTimeout() {
+        return publishTimeout;
+    }
+
+    private class ApplySettings implements NodeSettingsService.Listener {
+        @Override
+        public void onRefreshSettings(Settings settings) {
+            TimeValue newPublishTimeout = settings.getAsTime(PUBLISH_TIMEOUT, null);
+            if (newPublishTimeout != null) {
+                if (newPublishTimeout.millis() != publishTimeout.millis()) {
+                    logger.info("updating [{}] from [{}] to [{}]", PUBLISH_TIMEOUT, publishTimeout, newPublishTimeout);
+                    publishTimeout = newPublishTimeout;
+                }
+            }
+        }
+    }
+}

+ 4 - 4
src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java

@@ -62,7 +62,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
     private final ClusterName clusterName;
     private final Version version;
 
-    private final TimeValue publishTimeout;
+    private final DiscoverySettings discoverySettings;
 
     private DiscoveryNode localNode;
 
@@ -76,15 +76,14 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
 
     @Inject
     public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
-                          DiscoveryNodeService discoveryNodeService, Version version) {
+                          DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings) {
         super(settings);
         this.clusterName = clusterName;
         this.clusterService = clusterService;
         this.transportService = transportService;
         this.discoveryNodeService = discoveryNodeService;
         this.version = version;
-
-        this.publishTimeout = settings.getAsTime("discovery.zen.publish_timeout", DEFAULT_PUBLISH_TIMEOUT);
+        this.discoverySettings = discoverySettings;
     }
 
     @Override
@@ -336,6 +335,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
                 }
             }
 
+            TimeValue publishTimeout = discoverySettings.getPublishTimeout();
             if (publishTimeout.millis() > 0) {
                 try {
                     boolean awaited = publishResponseHandler.awaitAllNodes(publishTimeout);

+ 3 - 2
src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

@@ -45,6 +45,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.discovery.Discovery;
 import org.elasticsearch.discovery.DiscoveryService;
+import org.elasticsearch.discovery.DiscoverySettings;
 import org.elasticsearch.discovery.InitialStateDiscoveryListener;
 import org.elasticsearch.discovery.zen.elect.ElectMasterService;
 import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
@@ -118,7 +119,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
     @Inject
     public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
                         TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService,
-                        DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version) {
+                        DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version, DiscoverySettings discoverySettings) {
         super(settings);
         this.clusterName = clusterName;
         this.threadPool = threadPool;
@@ -146,7 +147,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
         this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService);
         this.nodesFD.addListener(new NodeFailureListener());
 
-        this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener());
+        this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings);
         this.pingService.setNodesProvider(this);
         this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());
 

+ 5 - 6
src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java

@@ -33,6 +33,7 @@ import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler;
 import org.elasticsearch.discovery.ClusterStatePublishResponseHandler;
 import org.elasticsearch.discovery.Discovery;
+import org.elasticsearch.discovery.DiscoverySettings;
 import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.*;
@@ -59,18 +60,15 @@ public class PublishClusterStateAction extends AbstractComponent {
     private final TransportService transportService;
     private final DiscoveryNodesProvider nodesProvider;
     private final NewClusterStateListener listener;
-
-    private final TimeValue publishTimeout;
+    private final DiscoverySettings discoverySettings;
 
     public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider,
-                                     NewClusterStateListener listener) {
+                                     NewClusterStateListener listener, DiscoverySettings discoverySettings) {
         super(settings);
         this.transportService = transportService;
         this.nodesProvider = nodesProvider;
         this.listener = listener;
-
-        this.publishTimeout = settings.getAsTime("discovery.zen.publish_timeout", Discovery.DEFAULT_PUBLISH_TIMEOUT);
-
+        this.discoverySettings = discoverySettings;
         transportService.registerHandler(PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequestHandler());
     }
 
@@ -137,6 +135,7 @@ public class PublishClusterStateAction extends AbstractComponent {
             }
         }
 
+        TimeValue publishTimeout = discoverySettings.getPublishTimeout();
         if (publishTimeout.millis() > 0) {
             // only wait if the publish timeout is configured...
             try {

+ 2 - 1
src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsTests.java

@@ -29,6 +29,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.discovery.DiscoverySettings;
 import org.elasticsearch.test.ElasticsearchIntegrationTest;
 import org.junit.Test;
 
@@ -45,7 +46,7 @@ public class AckClusterUpdateSettingsTests extends ElasticsearchIntegrationTest
     protected Settings nodeSettings(int nodeOrdinal) {
         //to test that the acknowledgement mechanism is working we better disable the wait for publish
         //otherwise the operation is most likely acknowledged even if it doesn't support ack
-        return ImmutableSettings.builder().put("discovery.zen.publish_timeout", 0).build();
+        return ImmutableSettings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT, 0).build();
     }
 
     @Test

+ 2 - 1
src/test/java/org/elasticsearch/cluster/ack/AckTests.java

@@ -43,6 +43,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.discovery.DiscoverySettings;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.warmer.IndexWarmersMetaData;
 import org.elasticsearch.test.ElasticsearchIntegrationTest;
@@ -61,7 +62,7 @@ public class AckTests extends ElasticsearchIntegrationTest {
     protected Settings nodeSettings(int nodeOrdinal) {
         //to test that the acknowledgement mechanism is working we better disable the wait for publish
         //otherwise the operation is most likely acknowledged even if it doesn't support ack
-        return ImmutableSettings.builder().put("discovery.zen.publish_timeout", 0).build();
+        return ImmutableSettings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT, 0).build();
     }
 
     @Test

+ 26 - 2
src/test/java/org/elasticsearch/cluster/settings/ClusterSettingsTests.java

@@ -23,12 +23,12 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResp
 import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.discovery.DiscoverySettings;
 import org.elasticsearch.test.ElasticsearchIntegrationTest;
 import org.hamcrest.Matchers;
 import org.junit.Test;
 
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.*;
 
 @ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numNodes = 1)
 public class ClusterSettingsTests extends ElasticsearchIntegrationTest {
@@ -99,4 +99,28 @@ public class ClusterSettingsTests extends ElasticsearchIntegrationTest {
         assertThat(response3.getPersistentSettings().get(key1), notNullValue());
         assertThat(response3.getPersistentSettings().get(key2), notNullValue());
     }
+
+    @Test
+    public void testUpdateDiscoveryPublishTimeout() {
+        ClusterUpdateSettingsResponse response = client().admin().cluster()
+                .prepareUpdateSettings()
+                .setTransientSettings(ImmutableSettings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT, "1s").build())
+                .get();
+
+        assertThat(response.getTransientSettings().getAsMap().get(DiscoverySettings.PUBLISH_TIMEOUT), equalTo("1s"));
+
+        response = client().admin().cluster()
+                .prepareUpdateSettings()
+                .setTransientSettings(ImmutableSettings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT, "whatever").build())
+                .get();
+
+        assertThat(response.getTransientSettings().getAsMap().entrySet(), Matchers.emptyIterable());
+
+        response = client().admin().cluster()
+                .prepareUpdateSettings()
+                .setTransientSettings(ImmutableSettings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT, -1).build())
+                .get();
+
+        assertThat(response.getTransientSettings().getAsMap().entrySet(), Matchers.emptyIterable());
+    }
 }