Преглед изворни кода

Make setting index.translog.sync_interval be dynamic (#37382)

Currently, we cannot update index setting index.translog.sync_interval if index is open, because it's
not dynamic which can be updated for closed index only.

Closes #32763
Like пре 6 година
родитељ
комит
3c352a8596

+ 21 - 13
server/src/main/java/org/elasticsearch/index/IndexService.java

@@ -199,7 +199,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
         this.trimTranslogTask = new AsyncTrimTranslogTask(this);
         this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
         this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
-        rescheduleFsyncTask(indexSettings.getTranslogDurability());
+        updateFsyncTaskIfNecessary();
     }
 
     public int numberOfShards() {
@@ -640,8 +640,6 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
 
     @Override
     public synchronized void updateMetaData(final IndexMetaData currentIndexMetaData, final IndexMetaData newIndexMetaData) {
-        final Translog.Durability oldTranslogDurability = indexSettings.getTranslogDurability();
-
         final boolean updateIndexMetaData = indexSettings.updateIndexMetaData(newIndexMetaData);
 
         if (Assertions.ENABLED
@@ -693,20 +691,23 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
                 });
                 rescheduleRefreshTasks();
             }
-            final Translog.Durability durability = indexSettings.getTranslogDurability();
-            if (durability != oldTranslogDurability) {
-                rescheduleFsyncTask(durability);
-            }
+            updateFsyncTaskIfNecessary();
         }
     }
 
-    private void rescheduleFsyncTask(Translog.Durability durability) {
-        try {
-            if (fsyncTask != null) {
-                fsyncTask.close();
+    private void updateFsyncTaskIfNecessary() {
+        if (indexSettings.getTranslogDurability() == Translog.Durability.REQUEST) {
+            try {
+                if (fsyncTask != null) {
+                    fsyncTask.close();
+                }
+            } finally {
+                fsyncTask = null;
             }
-        } finally {
-            fsyncTask = durability == Translog.Durability.REQUEST ? null : new AsyncTranslogFSync(this);
+        } else if (fsyncTask == null) {
+            fsyncTask = new AsyncTranslogFSync(this);
+        } else {
+            fsyncTask.updateIfNeeded();
         }
     }
 
@@ -868,6 +869,13 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
             indexService.maybeFSyncTranslogs();
         }
 
+        void updateIfNeeded() {
+            final TimeValue newInterval = indexService.getIndexSettings().getTranslogSyncInterval();
+            if (newInterval.equals(getInterval()) == false) {
+                setInterval(newInterval);
+            }
+        }
+
         @Override
         public String toString() {
             return "translog_sync";

+ 7 - 2
server/src/main/java/org/elasticsearch/index/IndexSettings.java

@@ -62,7 +62,7 @@ public final class IndexSettings {
         Setting.boolSetting("index.query.parse.allow_unmapped_fields", true, Property.IndexScope);
     public static final Setting<TimeValue> INDEX_TRANSLOG_SYNC_INTERVAL_SETTING =
         Setting.timeSetting("index.translog.sync_interval", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(100),
-            Property.IndexScope);
+            Property.Dynamic, Property.IndexScope);
     public static final Setting<TimeValue> INDEX_SEARCH_IDLE_AFTER =
         Setting.timeSetting("index.search.idle.after", TimeValue.timeValueSeconds(30),
             TimeValue.timeValueMinutes(0), Property.IndexScope, Property.Dynamic);
@@ -316,7 +316,7 @@ public final class IndexSettings {
     private final boolean queryStringAllowLeadingWildcard;
     private final boolean defaultAllowUnmappedFields;
     private volatile Translog.Durability durability;
-    private final TimeValue syncInterval;
+    private volatile TimeValue syncInterval;
     private volatile TimeValue refreshInterval;
     private volatile ByteSizeValue flushThresholdSize;
     private volatile TimeValue translogRetentionAge;
@@ -501,6 +501,7 @@ public final class IndexSettings {
             MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING, mergeSchedulerConfig::setMaxThreadAndMergeCount);
         scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.AUTO_THROTTLE_SETTING, mergeSchedulerConfig::setAutoThrottle);
         scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability);
+        scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_SYNC_INTERVAL_SETTING, this::setTranslogSyncInterval);
         scopedSettings.addSettingsUpdateConsumer(MAX_RESULT_WINDOW_SETTING, this::setMaxResultWindow);
         scopedSettings.addSettingsUpdateConsumer(MAX_INNER_RESULT_WINDOW_SETTING, this::setMaxInnerResultWindow);
         scopedSettings.addSettingsUpdateConsumer(MAX_ADJACENCY_MATRIX_FILTERS_SETTING, this::setMaxAdjacencyMatrixFilters);
@@ -701,6 +702,10 @@ public final class IndexSettings {
         return syncInterval;
     }
 
+    public void setTranslogSyncInterval(TimeValue translogSyncInterval) {
+        this.syncInterval = translogSyncInterval;
+    }
+    
     /**
      * Returns this interval in which the shards of this index are asynchronously refreshed. {@code -1} means async refresh is disabled.
      */

+ 37 - 3
server/src/test/java/org/elasticsearch/index/IndexServiceTests.java

@@ -21,6 +21,7 @@ package org.elasticsearch.index;
 
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.TopDocs;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.compress.CompressedXContent;
 import org.elasticsearch.common.settings.Settings;
@@ -320,9 +321,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
         assertTrue(indexService.getRefreshTask().mustReschedule());
         client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get();
         IndexShard shard = indexService.getShard(0);
-        assertBusy(() -> {
-            assertFalse(shard.isSyncNeeded());
-        });
+        assertBusy(() -> assertFalse(shard.isSyncNeeded()));
     }
 
     public void testRescheduleAsyncFsync() throws Exception {
@@ -394,4 +393,39 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
             assertEquals("failed to parse value [0ms] for setting [index.translog.sync_interval], must be >= [100ms]", ex.getMessage());
         }
     }
+
+    public void testUpdateSyncIntervalDynamically() {
+        Settings settings = Settings.builder()
+            .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10s") // very often :)
+            .build();
+        IndexService indexService = createIndex("test", settings);
+        ensureGreen("test");
+        assertNull(indexService.getFsyncTask());
+
+        Settings.Builder builder = Settings.builder().put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "5s")
+            .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC.name());
+
+        client()
+            .admin()
+            .indices()
+            .prepareUpdateSettings("test")
+            .setSettings(builder)
+            .get();
+
+        assertNotNull(indexService.getFsyncTask());
+        assertTrue(indexService.getFsyncTask().mustReschedule());
+
+        IndexMetaData indexMetaData = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test");
+        assertEquals("5s", indexMetaData.getSettings().get(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey()));
+
+        client().admin().indices().prepareClose("test").get();
+        client()
+            .admin()
+            .indices()
+            .prepareUpdateSettings("test")
+            .setSettings(Settings.builder().put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "20s"))
+            .get();
+        indexMetaData = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test");
+        assertEquals("20s", indexMetaData.getSettings().get(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey()));
+    }
 }

+ 1 - 0
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java

@@ -402,6 +402,7 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
         nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
         nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
         nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING);
+        nonReplicatedSettings.add(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING);
         nonReplicatedSettings.add(IndexSettings.INDEX_GC_DELETES_SETTING);
         nonReplicatedSettings.add(IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD);