Quellcode durchsuchen

Merge pull request #14275 from s1monw/renew_syn_on_merge

Flush big merges automatically if shard is inactive
Simon Willnauer vor 10 Jahren
Ursprung
Commit
a2e3dc59d8

+ 16 - 0
core/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -54,6 +54,7 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -79,6 +80,7 @@ public abstract class Engine implements Closeable {
     protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
     protected final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock());
     protected volatile Throwable failedEngine = null;
+    protected volatile long lastWriteNanos;
 
     protected Engine(EngineConfig engineConfig) {
         Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine");
@@ -562,6 +564,9 @@ public abstract class Engine implements Closeable {
 
 
     public interface EventListener {
+        /**
+         * Called when a fatal exception occurred
+         */
         default void onFailedEngine(String reason, @Nullable Throwable t) {}
     }
 
@@ -1040,4 +1045,15 @@ public abstract class Engine implements Closeable {
 
     public void onSettingsChanged() {
     }
+
+    /**
+     * Returns the timestamp of the last write in nanoseconds.
+     * Note: this time might not be absolutely accurate since the {@link Operation#startTime()} is used which might be
+     * slightly inaccurate.
+     * @see System#nanoTime()
+     * @see Operation#startTime()
+     */
+    public long getLastWriteNanos() {
+        return this.lastWriteNanos;
+    }
 }

+ 10 - 1
core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

@@ -57,6 +57,7 @@ public final class EngineConfig {
     private volatile boolean compoundOnFlush = true;
     private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
     private volatile boolean enableGcDeletes = true;
+    private final TimeValue flushMergesAfter = TimeValue.timeValueMinutes(5);
     private final String codecName;
     private final ThreadPool threadPool;
     private final ShardIndexingService indexingService;
@@ -118,7 +119,7 @@ public final class EngineConfig {
                         Settings indexSettings, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
                         MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer,
                         Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
-                        TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig) {
+                        TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig, TimeValue flushMergesAfter) {
         this.shardId = shardId;
         this.indexSettings = indexSettings;
         this.threadPool = threadPool;
@@ -399,4 +400,12 @@ public final class EngineConfig {
     public boolean isCreate() {
         return create;
     }
+
+    /**
+     * Returns a {@link TimeValue} at what time interval after the last write modification to the engine finished merges
+     * should be automatically flushed. This is used to free up transient disk usage of potentially large segments that
+     * are written after the engine became inactive from an indexing perspective.
+     */
+    public TimeValue getFlushMergesAfter() { return flushMergesAfter; }
+
 }

+ 51 - 0
core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -353,6 +353,7 @@ public class InternalEngine extends Engine {
 
     private boolean innerIndex(Index index) throws IOException {
         synchronized (dirtyLock(index.uid())) {
+            lastWriteNanos  = index.startTime();
             final long currentVersion;
             final boolean deleted;
             VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
@@ -464,6 +465,7 @@ public class InternalEngine extends Engine {
 
     private void innerDelete(Delete delete) throws IOException {
         synchronized (dirtyLock(delete.uid())) {
+            lastWriteNanos = delete.startTime();
             final long currentVersion;
             final boolean deleted;
             VersionValue versionValue = versionMap.getUnderLock(delete.uid().bytes());
@@ -569,6 +571,29 @@ public class InternalEngine extends Engine {
         }
     }
 
+    final boolean tryRenewSyncCommit() {
+        boolean renewed = false;
+        try (ReleasableLock lock = writeLock.acquire()) {
+            ensureOpen();
+            String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID);
+            if (syncId != null && translog.totalOperations() == 0 && indexWriter.hasUncommittedChanges()) {
+                logger.trace("start renewing sync commit [{}]", syncId);
+                commitIndexWriter(indexWriter, translog, syncId);
+                logger.debug("successfully sync committed. sync id [{}].", syncId);
+                lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
+                renewed = true;
+            }
+        } catch (IOException ex) {
+            maybeFailEngine("renew sync commit", ex);
+            throw new EngineException(shardId, "failed to renew sync commit", ex);
+        }
+        if (renewed) { // refresh outside of the write lock
+            refresh("renew sync commit");
+        }
+
+        return renewed;
+    }
+
     @Override
     public CommitId flush() throws EngineException {
         return flush(false, false);
@@ -1055,6 +1080,32 @@ public class InternalEngine extends Engine {
                     deactivateThrottling();
                 }
             }
+            if (indexWriter.hasPendingMerges() == false && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
+                // NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the writer
+                // we deadlock on engine#close for instance.
+                engineConfig.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {
+                    @Override
+                    public void onFailure(Throwable t) {
+                        if (isClosed.get() == false) {
+                            logger.warn("failed to flush after merge has finished");
+                        }
+                    }
+
+                    @Override
+                    protected void doRun() throws Exception {
+                        // if we have no pending merges and we are supposed to flush once merges have finished
+                        // we try to renew a sync commit which is the case when we are having a big merge after we
+                        // are inactive. If that didn't work we go and do a real flush which is ok since it only doesn't work
+                        // if we either have records in the translog or if we don't have a sync ID at all...
+                        // maybe even more important, we flush after all merges finish and we are inactive indexing-wise to
+                        // free up transient disk usage of the (presumably biggish) segments that were just merged
+                        if (tryRenewSyncCommit() == false) {
+                            flush();
+                        }
+                    }
+                });
+
+            }
         }
 
         @Override

+ 8 - 13
core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -192,7 +192,6 @@ public class IndexShard extends AbstractIndexShardComponent {
      *  IndexingMemoryController}). */
     private final AtomicBoolean active = new AtomicBoolean();
 
-    private volatile long lastWriteNS;
     private final IndexingMemoryController indexingMemoryController;
 
     @Inject
@@ -240,12 +239,13 @@ public class IndexShard extends AbstractIndexShardComponent {
         } else {
             cachingPolicy = new UsageTrackingQueryCachingPolicy();
         }
+
+        this.indexingMemoryController = provider.getIndexingMemoryController();
         this.engineConfig = newEngineConfig(translogConfig, cachingPolicy);
         this.flushThresholdOperations = this.indexSettings.getAsInt(INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS, this.indexSettings.getAsInt("index.translog.flush_threshold", Integer.MAX_VALUE));
         this.flushThresholdSize = this.indexSettings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB));
         this.disableFlush = this.indexSettings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false);
         this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId);
-        this.indexingMemoryController = provider.getIndexingMemoryController();
 
         this.searcherWrapper = provider.getIndexSearcherWrapper();
         this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, provider.getQueryParserService(), indexingService, mapperService, indexFieldDataService);
@@ -457,7 +457,7 @@ public class IndexShard extends AbstractIndexShardComponent {
      */
     public boolean index(Engine.Index index) {
         ensureWriteAllowed(index);
-        markLastWrite(index);
+        markLastWrite();
         index = indexingService.preIndex(index);
         final boolean created;
         try {
@@ -482,7 +482,7 @@ public class IndexShard extends AbstractIndexShardComponent {
 
     public void delete(Engine.Delete delete) {
         ensureWriteAllowed(delete);
-        markLastWrite(delete);
+        markLastWrite();
         delete = indexingService.preDelete(delete);
         try {
             if (logger.isTraceEnabled()) {
@@ -902,14 +902,8 @@ public class IndexShard extends AbstractIndexShardComponent {
         }
     }
 
-    /** Returns timestamp of last indexing operation */
-    public long getLastWriteNS() {
-        return lastWriteNS;
-    }
-
     /** Records timestamp of the last write operation, possibly switching {@code active} to true if we were inactive. */
-    private void markLastWrite(Engine.Operation op) {
-        lastWriteNS = op.startTime();
+    private void markLastWrite() {
         if (active.getAndSet(true) == false) {
             // We are currently inactive, but a new write operation just showed up, so we now notify IMC
             // to wake up and fix our indexing buffer.  We could do this async instead, but cost should
@@ -1029,7 +1023,8 @@ public class IndexShard extends AbstractIndexShardComponent {
      *  indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so.  This returns true
      *  if the shard is inactive. */
     public boolean checkIdle(long inactiveTimeNS) {
-        if (System.nanoTime() - lastWriteNS >= inactiveTimeNS) {
+        Engine engineOrNull = getEngineOrNull();
+        if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
             boolean wasActive = active.getAndSet(false);
             if (wasActive) {
                 updateBufferSize(IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER);
@@ -1459,7 +1454,7 @@ public class IndexShard extends AbstractIndexShardComponent {
         };
         return new EngineConfig(shardId,
                 threadPool, indexingService, indexSettings, warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
-                mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig);
+                mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, indexingMemoryController.getInactiveTime());
     }
 
     private static class IndexShardOperationCounter extends AbstractRefCounted {

+ 4 - 0
core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java

@@ -422,4 +422,8 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
     private static enum ShardStatusChangeType {
         ADDED, DELETED, BECAME_ACTIVE, BECAME_INACTIVE
     }
+
+    public TimeValue getInactiveTime() {
+        return inactiveTime;
+    }
 }

+ 63 - 12
core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -28,16 +28,7 @@ import org.apache.lucene.codecs.Codec;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.NumericDocValuesField;
 import org.apache.lucene.document.TextField;
-import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
-import org.apache.lucene.index.LiveIndexWriterConfig;
-import org.apache.lucene.index.LogByteSizeMergePolicy;
-import org.apache.lucene.index.MergePolicy;
-import org.apache.lucene.index.NoMergePolicy;
-import org.apache.lucene.index.SnapshotDeletionPolicy;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.index.TieredMergePolicy;
+import org.apache.lucene.index.*;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.TermQuery;
@@ -62,6 +53,7 @@ import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexSettings;
@@ -118,6 +110,7 @@ import java.util.Map;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -276,7 +269,7 @@ public class InternalEngineTests extends ESTestCase {
             public void onFailedEngine(String reason, @Nullable Throwable t) {
                 // we don't need to notify anybody in this test
             }
-        }, new TranslogHandler(shardId.index().getName(), logger), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
+        }, new TranslogHandler(shardId.index().getName(), logger), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5));
         try {
             config.setCreate(Lucene.indexExists(store.directory()) == false);
         } catch (IOException e) {
@@ -796,6 +789,64 @@ public class InternalEngineTests extends ESTestCase {
         }
     }
 
+    public void testRenewSyncFlush() throws Exception {
+        final int iters = randomIntBetween(2, 5); // run this a couple of times to get some coverage
+        for (int i = 0; i < iters; i++) {
+            try (Store store = createStore();
+                 InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
+                         new LogDocMergePolicy()), false)) {
+                final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
+                ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
+                Engine.Index doc1 = new Engine.Index(newUid("1"), doc);
+                engine.index(doc1);
+                assertEquals(engine.getLastWriteNanos(), doc1.startTime());
+                engine.flush();
+                Engine.Index doc2 = new Engine.Index(newUid("2"), doc);
+                engine.index(doc2);
+                assertEquals(engine.getLastWriteNanos(), doc2.startTime());
+                engine.flush();
+                final boolean forceMergeFlushes = randomBoolean();
+                if (forceMergeFlushes) {
+                    engine.index(new Engine.Index(newUid("3"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime() - engine.engineConfig.getFlushMergesAfter().nanos()));
+                } else {
+                    engine.index(new Engine.Index(newUid("3"), doc));
+                }
+                Engine.CommitId commitID = engine.flush();
+                assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID),
+                        Engine.SyncedFlushResult.SUCCESS);
+                assertEquals(3, engine.segments(false).size());
+
+                engine.forceMerge(false, 1, false, false, false);
+                if (forceMergeFlushes == false) {
+                    engine.refresh("make all segments visible");
+                    assertEquals(4, engine.segments(false).size());
+                    assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
+                    assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
+                    assertTrue(engine.tryRenewSyncCommit());
+                    assertEquals(1, engine.segments(false).size());
+                } else {
+                    assertBusy(() -> assertEquals(1, engine.segments(false).size()));
+                }
+                assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
+                assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
+
+                if (randomBoolean()) {
+                    Engine.Index doc4 = new Engine.Index(newUid("4"), doc);
+                    engine.index(doc4);
+                    assertEquals(engine.getLastWriteNanos(), doc4.startTime());
+                } else {
+                    Engine.Delete delete = new Engine.Delete(doc1.type(), doc1.id(), doc1.uid());
+                    engine.delete(delete);
+                    assertEquals(engine.getLastWriteNanos(), delete.startTime());
+                }
+                assertFalse(engine.tryRenewSyncCommit());
+                engine.flush();
+                assertNull(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID));
+                assertNull(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID));
+            }
+        }
+    }
+
     public void testSycnedFlushSurvivesEngineRestart() throws IOException {
         final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
         ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
@@ -1923,7 +1974,7 @@ public class InternalEngineTests extends ESTestCase {
         EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettings()
                 , null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(),
                 config.getAnalyzer(), config.getSimilarity(), new CodecService(INDEX_SETTINGS, null), config.getEventListener()
-        , config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
+        , config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5));
 
         try {
             new InternalEngine(brokenConfig, false);

+ 2 - 1
core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java

@@ -43,6 +43,7 @@ import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexSettings;
@@ -232,7 +233,7 @@ public class ShadowEngineTests extends ESTestCase {
             @Override
             public void onFailedEngine(String reason, @Nullable Throwable t) {
                 // we don't need to notify anybody in this test
-        }}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
+        }}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5));
         try {
             config.setCreate(Lucene.indexExists(store.directory()) == false);
         } catch (IOException e) {

+ 2 - 1
core/src/test/java/org/elasticsearch/index/shard/IndexSearcherWrapperTests.java

@@ -32,6 +32,7 @@ import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.IOUtils;
 import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.EngineConfig;
 import org.elasticsearch.index.engine.EngineException;
@@ -45,7 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 /**
  */
 public class IndexSearcherWrapperTests extends ESTestCase {
-    private static final EngineConfig ENGINE_CONFIG = new EngineConfig(null, null, null, Settings.EMPTY, null, null, null, null, null, null, new DefaultSimilarity(), null, null, null, null, QueryCachingPolicy.ALWAYS_CACHE, null);
+    private static final EngineConfig ENGINE_CONFIG = new EngineConfig(null, null, null, Settings.EMPTY, null, null, null, null, null, null, new DefaultSimilarity(), null, null, null, null, QueryCachingPolicy.ALWAYS_CACHE, null, TimeValue.timeValueMinutes(5));
 
     public void testReaderCloseListenerIsCalled() throws IOException {
         Directory dir = newDirectory();