瀏覽代碼

Remove the ability to fsync on every operation and only schedule fsync task if really needed

This commit limits the `index.translog.sync_interval` to a value not less than `100ms` and
removes the support for fsync on every operation which used to be enabled if `index.translog.sync_interval` was set to `0s`
Now this pr also only schedules an async fsync if the durability is set to `async`. By default not async task is scheduled.

Closes #16152
Simon Willnauer 9 年之前
父節點
當前提交
84ce9f3618

+ 15 - 6
core/src/main/java/org/elasticsearch/index/IndexService.java

@@ -108,7 +108,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
     private final IndexingSlowLog slowLog;
     private final IndexingOperationListener[] listeners;
     private volatile AsyncRefreshTask refreshTask;
-    private final AsyncTranslogFSync fsyncTask;
+    private volatile AsyncTranslogFSync fsyncTask;
     private final SearchSlowLog searchSlowLog;
 
     public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
@@ -147,11 +147,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
         this.listeners[0] = slowLog;
         System.arraycopy(listenersIn, 0, this.listeners, 1, listenersIn.length);
         // kick off async ops for the first shard in this index
-        if (this.indexSettings.getTranslogSyncInterval().millis() != 0) {
-            this.fsyncTask = new AsyncTranslogFSync(this);
-        } else {
-            this.fsyncTask = null;
-        }
+        this.fsyncTask = indexSettings.getTranslogDurability() == Translog.Durability.REQUEST ? null : new AsyncTranslogFSync(this);
         this.refreshTask = new AsyncRefreshTask(this);
         searchSlowLog = new SearchSlowLog(indexSettings);
     }
@@ -565,6 +561,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
     }
 
     public synchronized void updateMetaData(final IndexMetaData metadata) {
+        final Translog.Durability oldTranslogDurability = indexSettings.getTranslogDurability();
         if (indexSettings.updateIndexMetaData(metadata)) {
             for (final IndexShard shard : this.shards.values()) {
                 try {
@@ -576,6 +573,18 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
             if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) {
                 rescheduleRefreshTasks();
             }
+            final Translog.Durability durability = indexSettings.getTranslogDurability();
+            if (durability != oldTranslogDurability) {
+                rescheduleFsyncTask(durability);
+            }
+        }
+    }
+
+    private void rescheduleFsyncTask(Translog.Durability durability) {
+        try {
+            IOUtils.closeWhileHandlingException(fsyncTask);
+        } finally {
+            fsyncTask = durability == Translog.Durability.REQUEST ? null : new AsyncTranslogFSync(this);
         }
     }
 

+ 1 - 1
core/src/main/java/org/elasticsearch/index/IndexSettings.java

@@ -55,7 +55,7 @@ public final class IndexSettings {
     public static final Setting<Boolean> QUERY_STRING_ANALYZE_WILDCARD = Setting.boolSetting("indices.query.query_string.analyze_wildcard", false, false, Setting.Scope.CLUSTER);
     public static final Setting<Boolean> QUERY_STRING_ALLOW_LEADING_WILDCARD = Setting.boolSetting("indices.query.query_string.allowLeadingWildcard", true, false, Setting.Scope.CLUSTER);
     public static final Setting<Boolean> ALLOW_UNMAPPED = Setting.boolSetting("index.query.parse.allow_unmapped_fields", true, false, Setting.Scope.INDEX);
-    public static final Setting<TimeValue> INDEX_TRANSLOG_SYNC_INTERVAL_SETTING = Setting.timeSetting("index.translog.sync_interval", TimeValue.timeValueSeconds(5), false, Setting.Scope.INDEX);
+    public static final Setting<TimeValue> INDEX_TRANSLOG_SYNC_INTERVAL_SETTING = Setting.timeSetting("index.translog.sync_interval", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(100), false, Setting.Scope.INDEX);
     public static final Setting<Translog.Durability> INDEX_TRANSLOG_DURABILITY_SETTING = new Setting<>("index.translog.durability", Translog.Durability.REQUEST.name(), (value) -> Translog.Durability.valueOf(value.toUpperCase(Locale.ROOT)), true, Setting.Scope.INDEX);
     public static final Setting<Boolean> INDEX_WARMER_ENABLED_SETTING = Setting.boolSetting("index.warmer.enabled", true, true, Setting.Scope.INDEX);
     public static final Setting<Boolean> INDEX_TTL_DISABLE_PURGE_SETTING = Setting.boolSetting("index.ttl.disable_purge", false, true, Setting.Scope.INDEX);

+ 0 - 3
core/src/main/java/org/elasticsearch/index/translog/Translog.java

@@ -429,9 +429,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
             try (ReleasableLock lock = readLock.acquire()) {
                 ensureOpen();
                 Location location = current.add(bytes);
-                if (config.isSyncOnEachOperation()) {
-                    current.sync();
-                }
                 assert assertBytesAtLocation(location, bytes);
                 return location;
             }

+ 0 - 7
core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java

@@ -65,13 +65,6 @@ public final class TranslogConfig {
         this.bigArrays = bigArrays;
     }
 
-    /**
-     * Returns <code>true</code> iff each low level operation shoudl be fsynced
-     */
-    public boolean isSyncOnEachOperation() {
-        return indexSettings.getTranslogSyncInterval().millis() == 0;
-    }
-
     /**
      * Returns the index indexSettings
      */

+ 40 - 4
core/src/test/java/org/elasticsearch/index/IndexServiceTests.java

@@ -39,6 +39,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -264,7 +265,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
     }
 
     public void testFsyncTaskIsRunning() throws IOException {
-        IndexService indexService = createIndex("test", Settings.EMPTY);
+        IndexService indexService = createIndex("test", Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC).build());
         IndexService.AsyncTranslogFSync fsyncTask = indexService.getFsyncTask();
         assertNotNull(fsyncTask);
         assertEquals(5000, fsyncTask.getInterval().millis());
@@ -274,6 +275,9 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
         indexService.close("simon says", false);
         assertFalse(fsyncTask.isScheduled());
         assertTrue(fsyncTask.isClosed());
+
+        indexService = createIndex("test1", Settings.EMPTY);
+        assertNull(indexService.getFsyncTask());
     }
 
     public void testRefreshActuallyWorks() throws Exception {
@@ -307,7 +311,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
 
     public void testAsyncFsyncActuallyWorks() throws Exception {
         Settings settings = Settings.builder()
-            .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10ms") // very often :)
+            .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "100ms") // very often :)
             .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)
             .build();
         IndexService indexService = createIndex("test", settings);
@@ -320,11 +324,43 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
         });
     }
 
-    public void testNoFsyncTaskIfDisabled() {
+    public void testRescheduleAsyncFsync() throws Exception {
         Settings settings = Settings.builder()
-            .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "0ms") // disable
+            .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "100ms") // very often :)
+            .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
             .build();
         IndexService indexService = createIndex("test", settings);
+        ensureGreen("test");
+        assertNull(indexService.getFsyncTask());
+        IndexMetaData metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)).build();
+        indexService.updateMetaData(metaData);
+        assertNotNull(indexService.getFsyncTask());
+        assertTrue(indexService.getRefreshTask().mustReschedule());
+        client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}").get();
+        IndexShard shard = indexService.getShard(0);
+        assertBusy(() -> {
+            assertFalse(shard.getTranslog().syncNeeded());
+        });
+
+        metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)).build();
+        indexService.updateMetaData(metaData);
         assertNull(indexService.getFsyncTask());
+
+        metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)).build();
+        indexService.updateMetaData(metaData);
+        assertNotNull(indexService.getFsyncTask());
+
+    }
+
+    public void testIllegalFsyncInterval() {
+        Settings settings = Settings.builder()
+            .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "0ms") // disable
+            .build();
+        try {
+            createIndex("test", settings);
+            fail();
+        } catch (IllegalArgumentException ex) {
+            assertEquals("Failed to parse value [0ms] for setting [index.translog.sync_interval] must be >= 100ms", ex.getMessage());
+        }
     }
 }

+ 0 - 1
core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

@@ -1393,7 +1393,6 @@ public class TranslogTests extends ESTestCase {
         Path tempDir = createTempDir();
         final FailSwitch fail = new FailSwitch();
         TranslogConfig config = getTranslogConfig(tempDir);
-        assumeFalse("this won't work if we sync on any op", config.isSyncOnEachOperation());
         Translog translog = getFailableTranslog(fail, config, false, true);
         LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
         translog.add(new Translog.Index("test", "1", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));

+ 3 - 3
docs/reference/index-modules/translog.asciidoc

@@ -37,8 +37,8 @@ The data in the transaction log is only persisted to disk when the translog is
 ++fsync++ed and committed.  In the event of hardware failure, any data written
 since the previous translog commit will be lost.
 
-By default, Elasticsearch ++fsync++s and commits the translog every 5 seconds
-and at the end of every <<docs-index_,index>>, <<docs-delete,delete>>,
+By default, Elasticsearch ++fsync++s and commits the translog every 5 seconds if `index.translog.durability` is set
+to `async` or if set to `request` (default) at the end of every <<docs-index_,index>>, <<docs-delete,delete>>,
 <<docs-update,update>>, or  <<docs-bulk,bulk>> request.  In fact, Elasticsearch
 will only report success of an index, delete, update, or bulk request to the
 client after the transaction log has been successfully ++fsync++ed and committed
@@ -50,7 +50,7 @@ control the behaviour of the transaction log:
 `index.translog.sync_interval`::
 
 How often the translog is ++fsync++ed to disk and committed, regardless of
-write operations. Defaults to `5s`.
+write operations. Defaults to `5s`. Values less than `100ms` are not allowed.
 
 `index.translog.durability`::
 +

+ 4 - 0
docs/reference/migration/migrate_3_0.asciidoc

@@ -232,6 +232,10 @@ The `index.translog.flush_threshold_ops` setting is not supported anymore. In or
 growth use `index.translog.flush_threshold_size` instead. Changing the translog type with `index.translog.fs.type` is not supported
 anymore, the `buffered` implementation is now the only available option and uses a fixed `8kb` buffer.
 
+The translog by default is fsynced on a request basis such that the ability to fsync on every operation is not necessary anymore. In-fact it can
+be a performance bottleneck and it's trappy since it enabled by a special value set on `index.translog.sync_interval`. `index.translog.sync_interval`
+now doesn't accept a value less than `100ms` which prevents fsyncing too often if async durability is enabled. The special value `0` is not supported anymore.
+
 ==== Request Cache Settings
 
 The deprecated settings `index.cache.query.enable` and `indices.cache.query.size` have been removed and are replaced with

+ 1 - 5
test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

@@ -527,11 +527,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
         }
 
         if (random.nextBoolean()) {
-            if (rarely(random)) {
-                builder.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), 0); // 0 has special meaning to sync each op
-            } else {
-                builder.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), RandomInts.randomIntBetween(random, 100, 5000), TimeUnit.MILLISECONDS);
-            }
+            builder.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), RandomInts.randomIntBetween(random, 100, 5000), TimeUnit.MILLISECONDS);
         }
 
         return builder;