Browse Source

Obey lock order if working with store to get metadata snapshots (#24787)

Today when we get a metadata snapshot from the index shard we ensure
that if there is no engine started on the shard that we lock the index
writer before we go and fetch the store metadata. Yet, if we concurrently
recover that shard, recovery finalization might fail since it can't acquire
the IW lock on the directory. This is mainly due to the wrong order of aquiring
the IW lock and the metadata lock. Fetching store metadata without a started engine
should block on the metadata lock in Store.java but since IndexShard locks the writer
first we get into a failed recovery dance especially in test. In production
this is less of an issue since we rarely get into this siutation if at all.

Closes #24481
Simon Willnauer 8 years ago
parent
commit
b17d23dc99

+ 1 - 5
core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -23,7 +23,6 @@ import org.apache.logging.log4j.Logger;
 import org.apache.lucene.index.CheckIndex;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.IndexOptions;
-import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
 import org.apache.lucene.index.SegmentInfos;
 import org.apache.lucene.index.SnapshotDeletionPolicy;
@@ -32,7 +31,6 @@ import org.apache.lucene.search.QueryCachingPolicy;
 import org.apache.lucene.search.Sort;
 import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
 import org.apache.lucene.store.AlreadyClosedException;
-import org.apache.lucene.store.Lock;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.ThreadInterruptedException;
 import org.elasticsearch.ElasticsearchException;
@@ -921,9 +919,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                 // That can be done out of mutex, since the engine can be closed half way.
                 Engine engine = getEngineOrNull();
                 if (engine == null) {
-                    try (Lock ignored = store.directory().obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
-                        return store.getMetadata(null);
-                    }
+                    return store.getMetadata(null, true);
                 }
             }
             indexCommit = deletionPolicy.snapshot();

+ 37 - 4
core/src/main/java/org/elasticsearch/index/store/Store.java

@@ -99,6 +99,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
@@ -240,7 +241,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
      * {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
      * {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
      * {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
-     *
+     * @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
+     *               directory
      * @throws CorruptIndexException      if the lucene index is corrupted. This can be caused by a checksum mismatch or an
      *                                    unexpected exception when opening the index reading the segments file.
      * @throws IndexFormatTooOldException if the lucene index is too old to be opened.
@@ -250,16 +252,47 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
      * @throws IndexNotFoundException     if the commit point can't be found in this store
      */
     public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
+        return getMetadata(commit, false);
+    }
+
+    /**
+     * Returns a new MetadataSnapshot for the given commit. If the given commit is <code>null</code>
+     * the latest commit point is used.
+     *
+     * Note that this method requires the caller verify it has the right to access the store and
+     * no concurrent file changes are happening. If in doubt, you probably want to use one of the following:
+     *
+     * {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
+     * {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
+     * {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
+     *
+     * @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
+     *               directory
+     * @param lockDirectory if <code>true</code> the index writer lock will be obtained before reading the snapshot. This should
+     *                      only be used if there is no started shard using this store.
+     * @throws CorruptIndexException      if the lucene index is corrupted. This can be caused by a checksum mismatch or an
+     *                                    unexpected exception when opening the index reading the segments file.
+     * @throws IndexFormatTooOldException if the lucene index is too old to be opened.
+     * @throws IndexFormatTooNewException if the lucene index is too new to be opened.
+     * @throws FileNotFoundException      if one or more files referenced by a commit are not present.
+     * @throws NoSuchFileException        if one or more files referenced by a commit are not present.
+     * @throws IndexNotFoundException     if the commit point can't be found in this store
+     */
+    public MetadataSnapshot getMetadata(IndexCommit commit, boolean lockDirectory) throws IOException {
         ensureOpen();
         failIfCorrupted();
-        metadataLock.readLock().lock();
-        try {
+        assert lockDirectory ? commit == null : true : "IW lock should not be obtained if there is a commit point available";
+        // if we lock the directory we also acquire the write lock since that makes sure that nobody else tries to lock the IW
+        // on this store at the same time.
+        java.util.concurrent.locks.Lock lock = lockDirectory ? metadataLock.writeLock() : metadataLock.readLock();
+        lock.lock();
+        try (Closeable ignored = lockDirectory ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : () -> {} ) {
             return new MetadataSnapshot(commit, directory, logger);
         } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
             markStoreCorrupted(ex);
             throw ex;
         } finally {
-            metadataLock.readLock().unlock();
+            lock.unlock();
         }
     }
 

+ 46 - 0
core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -1498,6 +1498,52 @@ public class IndexShardTests extends IndexShardTestCase {
         }
     }
 
+    /**
+     * here we are simulating the scenario that happens when we do async shard fetching from GatewaySerivce while we are finishing
+     * a recovery and concurrently clean files. This should always be possible without any exception. Yet there was a bug where IndexShard
+     * acquired the index writer lock before it called into the store that has it's own locking for metadata reads
+     */
+    public void testReadSnapshotConcurrently() throws IOException, InterruptedException {
+        IndexShard indexShard = newStartedShard();
+        indexDoc(indexShard, "doc", "0", "{\"foo\" : \"bar\"}");
+        if (randomBoolean()) {
+            indexShard.refresh("test");
+        }
+        indexDoc(indexShard, "doc", "1", "{\"foo\" : \"bar\"}");
+        indexShard.flush(new FlushRequest());
+        closeShards(indexShard);
+
+        final IndexShard newShard = reinitShard(indexShard);
+        Store.MetadataSnapshot storeFileMetaDatas = newShard.snapshotStoreMetadata();
+        assertTrue("at least 2 files, commit and data: " +storeFileMetaDatas.toString(), storeFileMetaDatas.size() > 1);
+        AtomicBoolean stop = new AtomicBoolean(false);
+        CountDownLatch latch = new CountDownLatch(1);
+        expectThrows(AlreadyClosedException.class, () -> newShard.getEngine()); // no engine
+        Thread thread = new Thread(() -> {
+            latch.countDown();
+            while(stop.get() == false){
+                try {
+                    Store.MetadataSnapshot readMeta = newShard.snapshotStoreMetadata();
+                    assertEquals(0, storeFileMetaDatas.recoveryDiff(readMeta).different.size());
+                    assertEquals(0, storeFileMetaDatas.recoveryDiff(readMeta).missing.size());
+                    assertEquals(storeFileMetaDatas.size(), storeFileMetaDatas.recoveryDiff(readMeta).identical.size());
+                } catch (IOException e) {
+                    throw new AssertionError(e);
+                }
+            }
+        });
+        thread.start();
+        latch.await();
+
+        int iters = iterations(10, 100);
+        for (int i = 0; i < iters; i++) {
+            newShard.store().cleanupAndVerify("test", storeFileMetaDatas);
+        }
+        assertTrue(stop.compareAndSet(false, true));
+        thread.join();
+        closeShards(newShard);
+    }
+
     /** A dummy repository for testing which just needs restore overridden */
     private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
         private final String indexName;