Quellcode durchsuchen

Closed shard should never open new engine (#47186)

We should not open new engines if a shard is closed. We break this
assumption in #45263 where we stop verifying the shard state before
creating an engine but only before swapping the engine reference.
We can fail to snapshot the store metadata or checkIndex a closed shard
if there's some IndexWriter holding the index lock.

Closes #47060
Nhat Nguyen vor 6 Jahren
Ursprung
Commit
d029e18c72

+ 25 - 49
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -1158,11 +1158,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             synchronized (engineMutex) {
                 // if the engine is not running, we can access the store directly, but we need to make sure no one starts
                 // the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
-                synchronized (mutex) {
-                    final Engine engine = getEngineOrNull();
-                    if (engine != null) {
-                        indexCommit = engine.acquireLastIndexCommit(false);
-                    }
+                final Engine engine = getEngineOrNull();
+                if (engine != null) {
+                    indexCommit = engine.acquireLastIndexCommit(false);
                 }
                 if (indexCommit == null) {
                     return store.getMetadata(null, true);
@@ -1286,9 +1284,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     }
 
     public void close(String reason, boolean flushEngine) throws IOException {
-        synchronized (mutex) {
+        synchronized (engineMutex) {
             try {
-                changeState(IndexShardState.CLOSED, reason);
+                synchronized (mutex) {
+                    changeState(IndexShardState.CLOSED, reason);
+                }
             } finally {
                 final Engine engine = this.currentEngineReference.getAndSet(null);
                 try {
@@ -1343,6 +1343,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
      * This is the first operation after the local checkpoint of the safe commit if exists.
      */
     public long recoverLocallyUpToGlobalCheckpoint() {
+        assert Thread.holdsLock(mutex) == false : "recover locally under mutex";
         if (state != IndexShardState.RECOVERING) {
             throw new IndexShardNotRecoveringException(shardId, state);
         }
@@ -1394,7 +1395,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                 getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint);
                 logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint));
             } finally {
-                synchronized (mutex) {
+                synchronized (engineMutex) {
                     IOUtils.close(currentEngineReference.getAndSet(null));
                 }
             }
@@ -1569,23 +1570,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             : "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
             + "] but got " + getRetentionLeases();
         synchronized (engineMutex) {
+            assert currentEngineReference.get() == null : "engine is running";
+            verifyNotClosed();
             // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
             final Engine newEngine = engineFactory.newReadWriteEngine(config);
-            synchronized (mutex) {
-                try {
-                    verifyNotClosed();
-                    assert currentEngineReference.get() == null : "engine is running";
-                    onNewEngine(newEngine);
-                    currentEngineReference.set(newEngine);
-                    // We set active because we are now writing operations to the engine; this way,
-                    // if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
-                    active.set(true);
-                } finally {
-                    if (currentEngineReference.get() != newEngine) {
-                        newEngine.close();
-                    }
-                }
-            }
+            onNewEngine(newEngine);
+            currentEngineReference.set(newEngine);
+            // We set active because we are now writing operations to the engine; this way,
+            // if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
+            active.set(true);
         }
         // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
         // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
@@ -1616,7 +1609,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
      * called if recovery has to be restarted after network error / delay **
      */
     public void performRecoveryRestart() throws IOException {
-        synchronized (mutex) {
+        assert Thread.holdsLock(mutex) == false : "restart recovery under mutex";
+        synchronized (engineMutex) {
             assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
             IOUtils.close(currentEngineReference.getAndSet(null));
             resetRecoveryStage();
@@ -3288,7 +3282,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
      * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
      */
     void resetEngineToGlobalCheckpoint() throws IOException {
-        assert Thread.holdsLock(engineMutex) == false : "resetting engine under mutex";
+        assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
         assert getActiveOperationsCount() == OPERATIONS_BLOCKED
             : "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']';
         sync(); // persist the global checkpoint to disk
@@ -3301,6 +3295,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         final long globalCheckpoint = getLastKnownGlobalCheckpoint();
         assert globalCheckpoint == getLastSyncedGlobalCheckpoint();
         synchronized (engineMutex) {
+            verifyNotClosed();
             // we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
             // acquireXXXCommit and close works.
             final Engine readOnlyEngine =
@@ -3328,7 +3323,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
 
                     @Override
                     public void close() throws IOException {
-                        assert Thread.holdsLock(mutex);
+                        assert Thread.holdsLock(engineMutex);
 
                         Engine newEngine = newEngineReference.get();
                         if (newEngine == currentEngineReference.get()) {
@@ -3338,28 +3333,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                         IOUtils.close(super::close, newEngine);
                     }
                 };
-            synchronized (mutex) {
-                try {
-                    verifyNotClosed();
-                    IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
-                } finally {
-                    if (currentEngineReference.get() != readOnlyEngine) {
-                        readOnlyEngine.close();
-                    }
-                }
-            }
-            final Engine newReadWriteEngine = engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker));
-            synchronized (mutex) {
-                try {
-                    verifyNotClosed();
-                    newEngineReference.set(newReadWriteEngine);
-                    onNewEngine(newReadWriteEngine);
-                } finally {
-                    if (newEngineReference.get() != newReadWriteEngine) {
-                        newReadWriteEngine.close(); // shard was closed
-                    }
-                }
-            }
+            IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
+            newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
+            onNewEngine(newEngineReference.get());
         }
         final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
             engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
@@ -3367,7 +3343,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             });
         newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
         newEngineReference.get().refresh("reset_engine");
-        synchronized (mutex) {
+        synchronized (engineMutex) {
             verifyNotClosed();
             IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));
             // We set active because we are now writing operations to the engine; this way,

+ 10 - 3
test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

@@ -133,6 +133,7 @@ import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import java.util.TimeZone;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -511,15 +512,21 @@ public abstract class ESTestCase extends LuceneTestCase {
     // TODO: can we do this cleaner???
 
     /** MockFSDirectoryService sets this: */
-    public static boolean checkIndexFailed;
+    public static final List<Exception> checkIndexFailures = new CopyOnWriteArrayList<>();
 
     @Before
     public final void resetCheckIndexStatus() throws Exception {
-        checkIndexFailed = false;
+        checkIndexFailures.clear();
     }
 
     public final void ensureCheckIndexPassed() {
-        assertFalse("at least one shard failed CheckIndex", checkIndexFailed);
+        if (checkIndexFailures.isEmpty() == false) {
+            final AssertionError e = new AssertionError("at least one shard failed CheckIndex");
+            for (Exception failure : checkIndexFailures) {
+                e.addSuppressed(failure);
+            }
+            throw e;
+        }
     }
 
     // -----------------------------------------------------------------

+ 7 - 5
test/framework/src/main/java/org/elasticsearch/test/store/MockFSDirectoryFactory.java

@@ -83,17 +83,19 @@ public class MockFSDirectoryFactory implements IndexStorePlugin.DirectoryFactory
                     CheckIndex.Status status = store.checkIndex(out);
                     out.flush();
                     if (!status.clean) {
-                        ESTestCase.checkIndexFailed = true;
-                        logger.warn("check index [failure] index files={}\n{}", Arrays.toString(dir.listAll()), os.bytes().utf8ToString());
-                        throw new IOException("index check failure");
+                        IOException failure = new IOException("failed to check index for shard " + shardId +
+                            ";index files [" + Arrays.toString(dir.listAll()) + "] os [" + os.bytes().utf8ToString() + "]");
+                        ESTestCase.checkIndexFailures.add(failure);
+                        throw failure;
                     } else {
                         if (logger.isDebugEnabled()) {
                             logger.debug("check index [success]\n{}", os.bytes().utf8ToString());
                         }
                     }
                 } catch (LockObtainFailedException e) {
-                    ESTestCase.checkIndexFailed = true;
-                    throw new IllegalStateException("IndexWriter is still open on shard " + shardId, e);
+                    IllegalStateException failure = new IllegalStateException("IndexWriter is still open on shard " + shardId, e);
+                    ESTestCase.checkIndexFailures.add(failure);
+                    throw failure;
                 }
             } catch (Exception e) {
                 logger.warn("failed to check index", e);