Browse Source

randomize flush interval so multiple shards won't flush at the sam time
- also, allow to update interval using update settings on an index

Shay Banon 11 years ago
parent
commit
4aa5ef139e

+ 3 - 0
docs/reference/index-modules/translog.asciidoc

@@ -18,6 +18,9 @@ a flush will happen. Defaults to `200mb`.
 
 |index.translog.flush_threshold_period |The period with no flush
 happening to force a flush. Defaults to `30m`.
+
+|index.translog.interval |How often to check if a flush is needed, randomized
+between the interval value and 2x the interval value. Defaults to `5s`.
 |=======================================================================
 
 Note: these parameters can be updated at runtime using the Index

+ 1 - 0
src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java

@@ -109,6 +109,7 @@ public class IndexDynamicSettingsModule extends AbstractModule {
         indexDynamicSettings.addDynamicSetting(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, Validator.DOUBLE_GTE_2);
         indexDynamicSettings.addDynamicSetting(TieredMergePolicyProvider.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT, Validator.NON_NEGATIVE_DOUBLE);
         indexDynamicSettings.addDynamicSetting(TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT);
+        indexDynamicSettings.addDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_INTERVAL, Validator.TIME);
         indexDynamicSettings.addDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, Validator.INTEGER);
         indexDynamicSettings.addDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, Validator.BYTES_SIZE);
         indexDynamicSettings.addDynamicSetting(TranslogService.INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD, Validator.TIME);

+ 20 - 17
src/main/java/org/elasticsearch/index/translog/TranslogService.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.index.translog;
 
+import jsr166y.ThreadLocalRandom;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -45,24 +46,16 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
 public class TranslogService extends AbstractIndexShardComponent {
 
     private final ThreadPool threadPool;
-
     private final IndexSettingsService indexSettingsService;
-
     private final IndexShard indexShard;
-
     private final Translog translog;
 
-    private int flushThresholdOperations;
-
-    private ByteSizeValue flushThresholdSize;
-
-    private TimeValue flushThresholdPeriod;
-
-    private boolean disableFlush;
-
-    private final TimeValue interval;
-
-    private ScheduledFuture future;
+    private volatile TimeValue interval;
+    private volatile int flushThresholdOperations;
+    private volatile ByteSizeValue flushThresholdSize;
+    private volatile TimeValue flushThresholdPeriod;
+    private volatile boolean disableFlush;
+    private volatile ScheduledFuture future;
 
     private final ApplySettings applySettings = new ApplySettings();
 
@@ -93,6 +86,7 @@ public class TranslogService extends AbstractIndexShardComponent {
         this.future.cancel(true);
     }
 
+    public static final String INDEX_TRANSLOG_FLUSH_INTERVAL = "index.translog.interval";
     public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS = "index.translog.flush_threshold_ops";
     public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";
     public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD = "index.translog.flush_threshold_period";
@@ -116,6 +110,11 @@ public class TranslogService extends AbstractIndexShardComponent {
                 logger.info("updating flush_threshold_period from [{}] to [{}]", TranslogService.this.flushThresholdPeriod, flushThresholdPeriod);
                 TranslogService.this.flushThresholdPeriod = flushThresholdPeriod;
             }
+            TimeValue interval = settings.getAsTime(INDEX_TRANSLOG_FLUSH_INTERVAL, TranslogService.this.interval);
+            if (!interval.equals(TranslogService.this.interval)) {
+                logger.info("updating interval from [{}] to [{}]", TranslogService.this.interval, interval);
+                TranslogService.this.interval = interval;
+            }
             boolean disableFlush = settings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, TranslogService.this.disableFlush);
             if (disableFlush != TranslogService.this.disableFlush) {
                 logger.info("updating disable_flush from [{}] to [{}]", TranslogService.this.disableFlush, disableFlush);
@@ -124,6 +123,10 @@ public class TranslogService extends AbstractIndexShardComponent {
         }
     }
 
+    private TimeValue computeNextInterval() {
+        return new TimeValue(interval.millis() + (ThreadLocalRandom.current().nextLong(interval.millis())));
+    }
+
     private class TranslogBasedFlush implements Runnable {
 
         private volatile long lastFlushTime = System.currentTimeMillis();
@@ -180,7 +183,7 @@ public class TranslogService extends AbstractIndexShardComponent {
         }
 
         private void reschedule() {
-            future = threadPool.schedule(interval, ThreadPool.Names.SAME, this);
+            future = threadPool.schedule(computeNextInterval(), ThreadPool.Names.SAME, this);
         }
 
         private void asyncFlushAndReschedule() {
@@ -193,13 +196,13 @@ public class TranslogService extends AbstractIndexShardComponent {
                         // we are being closed, or in created state, ignore
                     } catch (FlushNotAllowedEngineException e) {
                         // ignore this exception, we are not allowed to perform flush
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         logger.warn("failed to flush shard on translog threshold", e);
                     }
                     lastFlushTime = threadPool.estimatedTimeInMillis();
 
                     if (indexShard.state() != IndexShardState.CLOSED) {
-                        future = threadPool.schedule(interval, ThreadPool.Names.SAME, TranslogBasedFlush.this);
+                        future = threadPool.schedule(computeNextInterval(), ThreadPool.Names.SAME, TranslogBasedFlush.this);
                     }
                 }
             });