Просмотр исходного кода

Add one protected method to InternalEngine as a hook point (#105113)

This PR adds one new protected method to InternalEngine to allow further
customization for flush behaviour. 

Relates: ES-7759
Yang Wang 1 год назад
Родитель
Сommit
4eaff75ed8

+ 13 - 6
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -110,7 +110,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
@@ -139,7 +138,7 @@ public class InternalEngine extends Engine {
     private final ExternalReaderManager externalReaderManager;
     private final ElasticsearchReaderManager internalReaderManager;
 
-    private final Lock flushLock = new ReentrantLock();
+    private final ReentrantLock flushLock = new ReentrantLock();
     private final ReentrantLock optimizeLock = new ReentrantLock();
 
     // A uid (in the form of BytesRef) to the version map
@@ -2177,10 +2176,9 @@ public class InternalEngine extends Engine {
     protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionListener<FlushResult> listener) throws EngineException {
         ensureOpen(); // best-effort, a concurrent failEngine() can still happen but that's ok
         if (force && waitIfOngoing == false) {
-            assert false : "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing;
-            throw new IllegalArgumentException(
-                "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing
-            );
+            final String message = "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing;
+            assert false : message;
+            throw new IllegalArgumentException(message);
         }
         final long generation;
         if (flushLock.tryLock() == false) {
@@ -2252,6 +2250,8 @@ public class InternalEngine extends Engine {
             logger.trace("released flush lock");
         }
 
+        afterFlush(generation);
+
         // We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving
         // (e.g., moves backwards) we will at least still sometimes prune deleted tombstones:
         if (engineConfig.isEnableGcDeletes()) {
@@ -2261,6 +2261,10 @@ public class InternalEngine extends Engine {
         waitForCommitDurability(generation, listener.map(v -> new FlushResult(true, generation)));
     }
 
+    protected final boolean isFlushLockIsHeldByCurrentThread() {
+        return flushLock.isHeldByCurrentThread();
+    }
+
     protected boolean hasUncommittedChanges() {
         return indexWriter.hasUncommittedChanges();
     }
@@ -2288,6 +2292,8 @@ public class InternalEngine extends Engine {
         }
     }
 
+    protected void afterFlush(long generation) {}
+
     @Override
     public void rollTranslogGeneration() throws EngineException {
         try (var ignored = acquireEnsureOpenRef()) {
@@ -2865,6 +2871,7 @@ public class InternalEngine extends Engine {
      * @param translog the translog
      */
     protected void commitIndexWriter(final IndexWriter writer, final Translog translog) throws IOException {
+        assert isFlushLockIsHeldByCurrentThread();
         ensureCanFlush();
         try {
             final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();