Browse Source

Allow executing multiple periodic flushes while they are being made durable (#102571)

Closes ES-7200
Francisco Fernández Castaño 1 year ago
parent
commit
000ffaae05

+ 5 - 0
docs/changelog/102571.yaml

@@ -0,0 +1,5 @@
+pr: 102571
+summary: Allow executing multiple periodic flushes while they are being made durable
+area: Store
+type: enhancement
+issues: []

+ 4 - 6
server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java

@@ -79,20 +79,18 @@ public class TransportShardFlushAction extends TransportReplicationAction<ShardF
         IndexShard primary,
         ActionListener<PrimaryResult<ShardFlushRequest, ReplicationResponse>> listener
     ) {
-        ActionListener.completeWith(listener, () -> {
-            primary.flush(shardRequest.getRequest());
+        primary.flush(shardRequest.getRequest(), listener.map(flushed -> {
             logger.trace("{} flush request executed on primary", primary.shardId());
             return new PrimaryResult<>(shardRequest, new ReplicationResponse());
-        });
+        }));
     }
 
     @Override
     protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
-        ActionListener.completeWith(listener, () -> {
-            replica.flush(request.getRequest());
+        replica.flush(request.getRequest(), listener.map(flushed -> {
             logger.trace("{} flush request executed on replica", replica.shardId());
             return new ReplicaResult();
-        });
+        }));
     }
 
     // TODO: Remove this transition in 9.0

+ 61 - 48
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -1393,22 +1393,38 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
      *         If <code>false</code> is returned, no flush happened.
      */
     public boolean flush(FlushRequest request) {
+        PlainActionFuture<Boolean> future = new PlainActionFuture<>();
+        flush(request, future);
+        return future.actionGet();
+    }
+
+    /**
+     * Executes the given flush request against the engine.
+     *
+     * @param request the flush request
+     * @param listener to notify after full durability has been achieved.
+     *                 <code>false</code> if <code>waitIfOngoing==false</code>
+     *                 and an ongoing request is detected, else <code>true</code>.
+     *                 If <code>false</code> is returned, no flush happened.
+     */
+    public void flush(FlushRequest request, ActionListener<Boolean> listener) {
         final boolean waitIfOngoing = request.waitIfOngoing();
         final boolean force = request.force();
         logger.trace("flush with {}", request);
-        /*
-         * We allow flushes while recovery since we allow operations to happen while recovering and we want to keep the translog under
-         * control (up to deletes, which we do not GC). Yet, we do not use flush internally to clear deletes and flush the index writer
-         * since we use Engine#writeIndexingBuffer for this now.
-         */
-        verifyNotClosed();
-        final long time = System.nanoTime();
-        // TODO: Transition this method to async to support async flush
-        PlainActionFuture<Engine.FlushResult> future = new PlainActionFuture<>();
-        getEngine().flush(force, waitIfOngoing, future);
-        Engine.FlushResult flushResult = future.actionGet();
-        flushMetric.inc(System.nanoTime() - time);
-        return flushResult.flushPerformed();
+        ActionListener.run(listener, l -> {
+            /*
+             * We allow flushes while recovery since we allow operations to happen while recovering and we want to keep the translog under
+             * control (up to deletes, which we do not GC). Yet, we do not use flush internally to clear deletes and flush the index writer
+             * since we use Engine#writeIndexingBuffer for this now.
+             */
+            verifyNotClosed();
+            final long startTime = System.nanoTime();
+            getEngine().flush(
+                force,
+                waitIfOngoing,
+                ActionListener.runBefore(l.map(Engine.FlushResult::flushPerformed), () -> flushMetric.inc(System.nanoTime() - startTime))
+            );
+        });
     }
 
     /**
@@ -2291,25 +2307,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             boolean wasActive = active.getAndSet(false);
             if (wasActive) {
                 logger.debug("flushing shard on inactive");
-                threadPool.executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {
-                    @Override
-                    public void onFailure(Exception e) {
-                        if (state != IndexShardState.CLOSED) {
-                            active.set(true);
-                            logger.warn("failed to flush shard on inactive", e);
+                threadPool.executor(ThreadPool.Names.FLUSH)
+                    .execute(() -> flush(new FlushRequest().waitIfOngoing(false).force(false), new ActionListener<>() {
+                        @Override
+                        public void onResponse(Boolean flushed) {
+                            if (flushed == false) {
+                                // In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request
+                                // will retry (#87888)
+                                active.set(true);
+                            }
+                            periodicFlushMetric.inc();
                         }
-                    }
 
-                    @Override
-                    protected void doRun() {
-                        if (flush(new FlushRequest().waitIfOngoing(false).force(false)) == false) {
-                            // In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request
-                            // will retry (#87888)
-                            active.set(true);
+                        @Override
+                        public void onFailure(Exception e) {
+                            if (state != IndexShardState.CLOSED) {
+                                active.set(true);
+                                logger.warn("failed to flush shard on inactive", e);
+                            }
                         }
-                        periodicFlushMetric.inc();
-                    }
-                });
+                    }));
             }
         }
     }
@@ -3756,27 +3773,23 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                  */
                 if (shouldPeriodicallyFlush()) {
                     logger.debug("submitting async flush request");
-                    final AbstractRunnable flush = new AbstractRunnable() {
-                        @Override
-                        public void onFailure(final Exception e) {
-                            if (state != IndexShardState.CLOSED) {
-                                logger.warn("failed to flush index", e);
+                    threadPool.executor(ThreadPool.Names.FLUSH).execute(() -> {
+                        flush(new FlushRequest(), new ActionListener<>() {
+                            @Override
+                            public void onResponse(Boolean flushed) {
+                                periodicFlushMetric.inc();
                             }
-                        }
-
-                        @Override
-                        protected void doRun() {
-                            flush(new FlushRequest());
-                            periodicFlushMetric.inc();
-                        }
 
-                        @Override
-                        public void onAfter() {
-                            flushOrRollRunning.compareAndSet(true, false);
-                            afterWriteOperation();
-                        }
-                    };
-                    threadPool.executor(ThreadPool.Names.FLUSH).execute(flush);
+                            @Override
+                            public void onFailure(Exception e) {
+                                if (state != IndexShardState.CLOSED) {
+                                    logger.warn("failed to flush index", e);
+                                }
+                            }
+                        });
+                        flushOrRollRunning.compareAndSet(true, false);
+                        afterWriteOperation();
+                    });
                 } else if (shouldRollTranslogGeneration()) {
                     logger.debug("submitting async roll translog generation request");
                     final AbstractRunnable roll = new AbstractRunnable() {

+ 41 - 0
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -56,6 +56,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
@@ -77,6 +78,7 @@ import org.elasticsearch.index.engine.CommitStats;
 import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.EngineConfig;
+import org.elasticsearch.index.engine.EngineException;
 import org.elasticsearch.index.engine.EngineTestCase;
 import org.elasticsearch.index.engine.InternalEngine;
 import org.elasticsearch.index.engine.InternalEngineFactory;
@@ -173,6 +175,7 @@ import java.util.stream.Stream;
 import static java.util.Collections.emptySet;
 import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
 import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
+import static org.elasticsearch.index.IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING;
 import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
 import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
 import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
@@ -4152,6 +4155,44 @@ public class IndexShardTests extends IndexShardTestCase {
         closeShards(shard);
     }
 
+    public void testMultiplePeriodicFlushesCanBeTriggeredBeforeTheyAreDurable() throws Exception {
+        List<ActionListener<Engine.FlushResult>> pendingListeners = Collections.synchronizedList(new ArrayList<>());
+        // Ensure that a single document forces a flush after each write
+        var indexSettings = Settings.builder()
+            .put(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), ByteSizeValue.ofBytes(Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1))
+            .build();
+        var shardStarted = new AtomicBoolean();
+        var flushExecutedBarrier = new CyclicBarrier(2);
+        var shard = newStartedShard(true, indexSettings, config -> new InternalEngine(config) {
+            @Override
+            public void flush(boolean force, boolean waitIfOngoing, ActionListener<FlushResult> listener) throws EngineException {
+                if (shardStarted.get()) {
+                    super.flush(force, waitIfOngoing, ActionListener.noop());
+                    pendingListeners.add(listener);
+                    safeAwait(flushExecutedBarrier);
+                } else {
+                    super.flush(force, waitIfOngoing, listener);
+                }
+            }
+        });
+        shardStarted.set(true);
+
+        int numberOfFlushes = randomIntBetween(5, 10);
+        for (int i = 0; i < numberOfFlushes; i++) {
+            indexDoc(shard, "_doc", Integer.toString(i));
+            shard.afterWriteOperation();
+            safeAwait(flushExecutedBarrier);
+        }
+
+        assertThat(pendingListeners.size(), is(numberOfFlushes));
+        assertThat(shard.flushStats().getPeriodic(), is(equalTo(0L)));
+
+        pendingListeners.forEach(l -> l.onResponse(new Engine.FlushResult(true, 1)));
+        assertThat(shard.flushStats().getPeriodic(), is(equalTo((long) numberOfFlushes)));
+
+        closeShards(shard);
+    }
+
     public void testOnCloseStats() throws IOException {
         final IndexShard indexShard = newStartedShard(true);