瀏覽代碼

Fix merge scheduler config settings (#23391)

Change Setting#get(Settings, Settings) to fallback only if the setting is present in the secondary.
This is needed to fix setting that relies on other settings.
Replace IT with uts.
Jim Ferenczi 8 年之前
父節點
當前提交
d27a55866c

+ 1 - 2
core/src/main/java/org/elasticsearch/common/settings/Setting.java

@@ -42,7 +42,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
@@ -378,7 +377,7 @@ public class Setting<T> extends ToXContentToBytes {
             return get(primary);
         }
         if (fallbackSetting == null) {
-            return get(secondary);
+            return exists(secondary) ? get(secondary) : get(primary);
         }
         if (exists(secondary)) {
             return get(secondary);

+ 3 - 2
core/src/main/java/org/elasticsearch/index/MergeSchedulerConfig.java

@@ -69,8 +69,9 @@ public final class MergeSchedulerConfig {
     private volatile int maxMergeCount;
 
     MergeSchedulerConfig(IndexSettings indexSettings) {
-        setMaxThreadAndMergeCount(indexSettings.getValue(MAX_THREAD_COUNT_SETTING),
-            indexSettings.getValue(MAX_MERGE_COUNT_SETTING));
+        int maxThread = indexSettings.getValue(MAX_THREAD_COUNT_SETTING);
+        int maxMerge = indexSettings.getValue(MAX_MERGE_COUNT_SETTING);
+        setMaxThreadAndMergeCount(maxThread, maxMerge);
         this.autoThrottle = indexSettings.getValue(AUTO_THROTTLE_SETTING);
     }
 

+ 182 - 0
core/src/test/java/org/elasticsearch/index/MergeSchedulerSettingsTests.java

@@ -0,0 +1,182 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.filter.RegexFilter;
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ESTestCase;
+
+import static org.elasticsearch.common.util.concurrent.EsExecutors.PROCESSORS_SETTING;
+import static org.elasticsearch.index.IndexSettingsTests.newIndexMeta;
+import static org.elasticsearch.index.MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING;
+import static org.elasticsearch.index.MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING;
+import static org.hamcrest.core.StringContains.containsString;
+
+public class MergeSchedulerSettingsTests extends ESTestCase {
+    private static class MockAppender extends AbstractAppender {
+        public boolean sawUpdateMaxThreadCount;
+        public boolean sawUpdateAutoThrottle;
+
+        MockAppender(final String name) throws IllegalAccessException {
+            super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null);
+        }
+
+        @Override
+        public void append(LogEvent event) {
+            String message = event.getMessage().getFormattedMessage();
+            if (event.getLevel() == Level.TRACE && event.getLoggerName().endsWith("lucene.iw")) {
+            }
+            if (event.getLevel() == Level.INFO
+                && message.contains("updating [index.merge.scheduler.max_thread_count] from [10000] to [1]")) {
+                sawUpdateMaxThreadCount = true;
+            }
+            if (event.getLevel() == Level.INFO
+                && message.contains("updating [index.merge.scheduler.auto_throttle] from [true] to [false]")) {
+                sawUpdateAutoThrottle = true;
+            }
+        }
+
+        @Override
+        public boolean ignoreExceptions() {
+            return false;
+        }
+
+    }
+
+    public void testUpdateAutoThrottleSettings() throws Exception {
+        MockAppender mockAppender = new MockAppender("testUpdateAutoThrottleSettings");
+        mockAppender.start();
+        final Logger settingsLogger = Loggers.getLogger("org.elasticsearch.common.settings.IndexScopedSettings");
+        Loggers.addAppender(settingsLogger, mockAppender);
+        Loggers.setLevel(settingsLogger, Level.TRACE);
+        try {
+            Settings.Builder builder = Settings.builder()
+                .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
+                .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
+                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
+                .put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
+                .put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
+                .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1")
+                .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "2")
+                .put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), "true");
+            IndexSettings settings = new IndexSettings(newIndexMeta("index", builder.build()), Settings.EMPTY);
+            assertEquals(settings.getMergeSchedulerConfig().isAutoThrottle(), true);
+            builder.put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), "false");
+            settings.updateIndexMetaData(newIndexMeta("index", builder.build()));
+            // Make sure we log the change:
+            assertTrue(mockAppender.sawUpdateAutoThrottle);
+            assertEquals(settings.getMergeSchedulerConfig().isAutoThrottle(), false);
+        } finally {
+            Loggers.removeAppender(settingsLogger, mockAppender);
+            mockAppender.stop();
+            Loggers.setLevel(settingsLogger, (Level) null);
+        }
+    }
+
+    // #6882: make sure we can change index.merge.scheduler.max_thread_count live
+    public void testUpdateMergeMaxThreadCount() throws Exception {
+        MockAppender mockAppender = new MockAppender("testUpdateAutoThrottleSettings");
+        mockAppender.start();
+        final Logger settingsLogger = Loggers.getLogger("org.elasticsearch.common.settings.IndexScopedSettings");
+        Loggers.addAppender(settingsLogger, mockAppender);
+        Loggers.setLevel(settingsLogger, Level.TRACE);
+        try {
+            Settings.Builder builder = Settings.builder()
+                .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
+                .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
+                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
+                .put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
+                .put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
+                .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "10000")
+                .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "10000");
+            IndexSettings settings = new IndexSettings(newIndexMeta("index", builder.build()), Settings.EMPTY);
+            assertEquals(settings.getMergeSchedulerConfig().getMaxMergeCount(), 10000);
+            assertEquals(settings.getMergeSchedulerConfig().getMaxThreadCount(), 10000);
+            settings.updateIndexMetaData(newIndexMeta("index", builder.build()));
+            assertFalse(mockAppender.sawUpdateMaxThreadCount);
+            builder.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1");
+            settings.updateIndexMetaData(newIndexMeta("index", builder.build()));
+            // Make sure we log the change:
+            assertTrue(mockAppender.sawUpdateMaxThreadCount);
+        } finally {
+            Loggers.removeAppender(settingsLogger, mockAppender);
+            mockAppender.stop();
+            Loggers.setLevel(settingsLogger, (Level) null);
+        }
+    }
+
+    private static IndexMetaData createMetaData(int maxThreadCount, int maxMergeCount, int numProc) {
+        Settings.Builder builder = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT);
+        if (maxThreadCount != -1) {
+            builder.put(MAX_THREAD_COUNT_SETTING.getKey(), maxThreadCount);
+        }
+        if (maxMergeCount != -1) {
+            builder.put(MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount);
+        }
+        if (numProc != -1) {
+            builder.put(PROCESSORS_SETTING.getKey(), numProc);
+        }
+        return newIndexMeta("index", builder.build());
+    }
+
+    public void testMaxThreadAndMergeCount() {
+        IllegalArgumentException exc =
+            expectThrows(IllegalArgumentException.class,
+                () -> new MergeSchedulerConfig(new IndexSettings(createMetaData(10, 4, -1), Settings.EMPTY)));
+        assertThat(exc.getMessage(), containsString("maxThreadCount (= 10) should be <= maxMergeCount (= 4)"));
+
+        IndexSettings settings = new IndexSettings(createMetaData(-1, -1, 2), Settings.EMPTY);
+        assertEquals(1, settings.getMergeSchedulerConfig().getMaxThreadCount());
+        assertEquals(6, settings.getMergeSchedulerConfig().getMaxMergeCount());
+
+        settings = new IndexSettings(createMetaData(4, 10, -1), Settings.EMPTY);
+        assertEquals(4, settings.getMergeSchedulerConfig().getMaxThreadCount());
+        assertEquals(10, settings.getMergeSchedulerConfig().getMaxMergeCount());
+        IndexMetaData newMetaData = createMetaData(15, 20, -1);
+
+        settings.updateIndexMetaData(newMetaData);
+        assertEquals(15, settings.getMergeSchedulerConfig().getMaxThreadCount());
+        assertEquals(20, settings.getMergeSchedulerConfig().getMaxMergeCount());
+
+        settings.updateIndexMetaData(createMetaData(40, 50, -1));
+        assertEquals(40, settings.getMergeSchedulerConfig().getMaxThreadCount());
+        assertEquals(50, settings.getMergeSchedulerConfig().getMaxMergeCount());
+
+        settings.updateIndexMetaData(createMetaData(40, -1, -1));
+        assertEquals(40, settings.getMergeSchedulerConfig().getMaxThreadCount());
+        assertEquals(45, settings.getMergeSchedulerConfig().getMaxMergeCount());
+
+        final IndexSettings finalSettings = settings;
+        exc = expectThrows(IllegalArgumentException.class,
+            () -> finalSettings.updateIndexMetaData(createMetaData(40, 30, -1)));
+        assertThat(exc.getMessage(), containsString("maxThreadCount (= 40) should be <= maxMergeCount (= 30)"));
+
+        exc = expectThrows(IllegalArgumentException.class,
+            () -> finalSettings.updateIndexMetaData(createMetaData(-1, 3, -1)));
+        assertThat(exc.getMessage(), containsString("should be <= maxMergeCount (= 3)"));
+    }
+}

+ 0 - 195
core/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsIT.java

@@ -19,26 +19,14 @@
 
 package org.elasticsearch.indices.settings;
 
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.core.LogEvent;
-import org.apache.logging.log4j.core.appender.AbstractAppender;
-import org.apache.logging.log4j.core.filter.RegexFilter;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
 import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
-import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
-import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.IndexModule;
 import org.elasticsearch.index.IndexService;
-import org.elasticsearch.index.MergePolicyConfig;
-import org.elasticsearch.index.MergeSchedulerConfig;
 import org.elasticsearch.index.engine.VersionConflictEngineException;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.plugins.Plugin;
@@ -237,189 +225,6 @@ public class UpdateSettingsIT extends ESIntegTestCase {
 
     }
 
-    private static class MockAppender extends AbstractAppender {
-        public boolean sawUpdateMaxThreadCount;
-        public boolean sawUpdateAutoThrottle;
-
-        MockAppender(final String name) throws IllegalAccessException {
-            super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null);
-        }
-
-        @Override
-        public void append(LogEvent event) {
-            String message = event.getMessage().getFormattedMessage();
-            if (event.getLevel() == Level.TRACE && event.getLoggerName().endsWith("lucene.iw")) {
-            }
-            if (event.getLevel() == Level.INFO
-                && message.contains("updating [index.merge.scheduler.max_thread_count] from [10000] to [1]")) {
-                sawUpdateMaxThreadCount = true;
-            }
-            if (event.getLevel() == Level.INFO
-                && message.contains("updating [index.merge.scheduler.auto_throttle] from [true] to [false]")) {
-                sawUpdateAutoThrottle = true;
-            }
-        }
-
-        @Override
-        public boolean ignoreExceptions() {
-            return false;
-        }
-
-    }
-
-    public void testUpdateAutoThrottleSettings() throws Exception {
-        MockAppender mockAppender = new MockAppender("testUpdateAutoThrottleSettings");
-        mockAppender.start();
-        Logger rootLogger = LogManager.getRootLogger();
-        Loggers.addAppender(rootLogger, mockAppender);
-        Level savedLevel = rootLogger.getLevel();
-        Loggers.setLevel(rootLogger, Level.TRACE);
-
-        try {
-            // No throttling at first, only 1 non-replicated shard, force lots of merging:
-            assertAcked(prepareCreate("test")
-                        .setSettings(Settings.builder()
-                                     .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
-                                     .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
-                                     .put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
-                                     .put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
-                                     .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1")
-                                     .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "2")
-                                     .put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), "true")));
-
-            // Disable auto throttle:
-            client()
-                .admin()
-                .indices()
-                .prepareUpdateSettings("test")
-                .setSettings(Settings.builder().put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), "false"))
-                .get();
-
-            // if a node has processed the cluster state update but not yet returned from the update task, it might still log messages;
-            // these log messages will race with the stopping of the appender so we wait to ensure these tasks are done processing
-            assertBusy(() -> {
-                for (final ClusterService service : internalCluster().getInstances(ClusterService.class)) {
-                    assertThat(service.numberOfPendingTasks(), equalTo(0));
-                }
-            });
-
-            // Make sure we log the change:
-            assertTrue(mockAppender.sawUpdateAutoThrottle);
-
-            // Make sure setting says it is in fact changed:
-            GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test").get();
-            assertThat(getSettingsResponse.getSetting("test", MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey()), equalTo("false"));
-        } finally {
-            Loggers.setLevel(rootLogger, savedLevel);
-            Loggers.removeAppender(rootLogger, mockAppender);
-            // don't call stop here some node might still use this reference at this point causing tests to fail.
-            // this is only relevant in integ tests, unittest can control what uses a logger and what doesn't
-            // mockAppender.stop();
-        }
-    }
-
-    public void testInvalidMergeMaxThreadCount() throws IllegalAccessException {
-        CreateIndexRequestBuilder createBuilder = prepareCreate("test")
-            .setSettings(Settings.builder()
-                .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
-                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
-                .put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
-                .put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
-                .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "100")
-                .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "10")
-            );
-        IllegalArgumentException exc = expectThrows(IllegalArgumentException.class,
-            () -> createBuilder.get());
-        assertThat(exc.getMessage(), equalTo("maxThreadCount (= 100) should be <= maxMergeCount (= 10)"));
-
-        assertAcked(prepareCreate("test")
-            .setSettings(Settings.builder()
-                .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
-                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
-                .put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
-                .put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
-                .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "100")
-                .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "100")
-            ));
-
-        {
-            UpdateSettingsRequestBuilder updateBuilder =
-                client()
-                    .admin()
-                    .indices()
-                    .prepareUpdateSettings("test")
-                    .setSettings(Settings.builder().put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1000"));
-            exc = expectThrows(IllegalArgumentException.class,
-                () -> updateBuilder.get());
-            assertThat(exc.getMessage(), equalTo("maxThreadCount (= 1000) should be <= maxMergeCount (= 100)"));
-        }
-
-        {
-            UpdateSettingsRequestBuilder updateBuilder =
-                client()
-                    .admin()
-                    .indices()
-                    .prepareUpdateSettings("test")
-                    .setSettings(Settings.builder().put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "10"));
-            exc = expectThrows(IllegalArgumentException.class,
-                () -> updateBuilder.get());
-            assertThat(exc.getMessage(), equalTo("maxThreadCount (= 100) should be <= maxMergeCount (= 10)"));
-        }
-    }
-
-    // #6882: make sure we can change index.merge.scheduler.max_thread_count live
-    public void testUpdateMergeMaxThreadCount() throws Exception {
-        MockAppender mockAppender = new MockAppender("testUpdateMergeMaxThreadCount");
-        mockAppender.start();
-        Logger rootLogger = LogManager.getRootLogger();
-        Level savedLevel = rootLogger.getLevel();
-        Loggers.addAppender(rootLogger, mockAppender);
-        Loggers.setLevel(rootLogger, Level.TRACE);
-
-        try {
-
-            assertAcked(prepareCreate("test")
-                        .setSettings(Settings.builder()
-                                     .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
-                                     .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
-                                     .put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
-                                     .put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
-                                     .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "10000")
-                                     .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "10000")));
-
-            assertFalse(mockAppender.sawUpdateMaxThreadCount);
-            // Now make a live change to reduce allowed merge threads:
-            client()
-                .admin()
-                .indices()
-                .prepareUpdateSettings("test")
-                .setSettings(Settings.builder().put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1"))
-                .get();
-
-            // if a node has processed the cluster state update but not yet returned from the update task, it might still log messages;
-            // these log messages will race with the stopping of the appender so we wait to ensure these tasks are done processing
-            assertBusy(() -> {
-                for (final ClusterService service : internalCluster().getInstances(ClusterService.class)) {
-                    assertThat(service.numberOfPendingTasks(), equalTo(0));
-                }
-            });
-
-            // Make sure we log the change:
-            assertTrue(mockAppender.sawUpdateMaxThreadCount);
-
-            // Make sure setting says it is in fact changed:
-            GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test").get();
-            assertThat(getSettingsResponse.getSetting("test", MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey()), equalTo("1"));
-
-        } finally {
-            Loggers.setLevel(rootLogger, savedLevel);
-            Loggers.removeAppender(rootLogger, mockAppender);
-            // don't call stop here some node might still use this reference at this point causing tests to fail.
-            // this is only relevant in integ tests, unittest can control what uses a logger and what doesn't
-            // mockAppender.stop();
-        }
-    }
-
     public void testUpdateSettingsWithBlocks() {
         createIndex("test");
         ensureGreen("test");