Procházet zdrojové kódy

Snapshotting and sync could cause a dead lock TranslogWriter (#18481)

 #18360 introduced an extra lock in order to allow writes while syncing the translog. This caused a potential deadlock with snapshotting code where we first acquire the instance lock, followed by a sync (which acquires the syncLock). However, the sync logic acquires the syncLock first, followed by the instance lock.

I considered solving this by not syncing the translog on snapshot - I think we can get away with just flushing it. That however will create subtleties around snapshoting and whether operations in them are persisted. I opted instead to have slightly uglier code with nest synchronized, where the scope of the change is contained to the TranslogWriter class alone.
Boaz Leskes před 9 roky
rodič
revize
34ef5306d2

+ 44 - 28
core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java

@@ -39,7 +39,6 @@ import java.nio.file.OpenOption;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
 
 public class TranslogWriter extends BaseTranslogReader implements Closeable {
 
@@ -154,7 +153,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
     /**
      * returns true if there are buffered ops
      */
-    public boolean syncNeeded() { return totalOffset != lastSyncedOffset; }
+    public boolean syncNeeded() {
+        return totalOffset != lastSyncedOffset;
+    }
 
     @Override
     public int totalOperations() {
@@ -169,40 +170,55 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
     /**
      * closes this writer and transfers it's underlying file channel to a new immutable reader
      */
-    public synchronized TranslogReader closeIntoReader() throws IOException {
-        try {
-            sync(); // sync before we close..
-        } catch (IOException e) {
-            closeWithTragicEvent(e);
-            throw e;
-        }
-        if (closed.compareAndSet(false, true)) {
-            boolean success = false;
-            try {
-                final TranslogReader reader = new TranslogReader(generation, channel, path, firstOperationOffset, getWrittenOffset(), operationCounter);
-                success = true;
-                return reader;
-            } finally {
-                if (success == false) {
-                    // close the channel, as we are closed and failed to create a new reader
-                    IOUtils.closeWhileHandlingException(channel);
+    public TranslogReader closeIntoReader() throws IOException {
+        // make sure to acquire the sync lock first, to prevent dead locks with threads calling
+        // syncUpTo() , where the sync lock is acquired first, following by the synchronize(this)
+        //
+        // Note: While this is not strictly needed as this method is called while blocking all ops on the translog,
+        //       we do this to for correctness and preventing future issues.
+        synchronized (syncLock) {
+            synchronized (this) {
+                try {
+                    sync(); // sync before we close..
+                } catch (IOException e) {
+                    closeWithTragicEvent(e);
+                    throw e;
+                }
+                if (closed.compareAndSet(false, true)) {
+                    boolean success = false;
+                    try {
+                        final TranslogReader reader = new TranslogReader(generation, channel, path, firstOperationOffset, getWrittenOffset(), operationCounter);
+                        success = true;
+                        return reader;
+                    } finally {
+                        if (success == false) {
+                            // close the channel, as we are closed and failed to create a new reader
+                            IOUtils.closeWhileHandlingException(channel);
+                        }
+                    }
+                } else {
+                    throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", tragedy);
                 }
             }
-        } else {
-            throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]", tragedy);
         }
     }
 
 
     @Override
-    public synchronized Translog.Snapshot newSnapshot() {
-        ensureOpen();
-        try {
-            sync();
-        } catch (IOException e) {
-            throw new TranslogException(shardId, "exception while syncing before creating a snapshot", e);
+    public Translog.Snapshot newSnapshot() {
+        // make sure to acquire the sync lock first, to prevent dead locks with threads calling
+        // syncUpTo() , where the sync lock is acquired first, following by the synchronize(this)
+        synchronized (syncLock) {
+            synchronized (this) {
+                ensureOpen();
+                try {
+                    sync();
+                } catch (IOException e) {
+                    throw new TranslogException(shardId, "exception while syncing before creating a snapshot", e);
+                }
+                return super.newSnapshot();
+            }
         }
-        return super.newSnapshot();
     }
 
     private long getWrittenOffset() throws IOException {

+ 16 - 13
core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

@@ -42,7 +42,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
-import org.elasticsearch.index.Index;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.test.ESTestCase;
@@ -138,8 +137,8 @@ public class TranslogTests extends ESTestCase {
 
     private TranslogConfig getTranslogConfig(Path path) {
         Settings build = Settings.builder()
-                .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
-                .build();
+            .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
+            .build();
         ByteSizeValue bufferSize = randomBoolean() ? TranslogConfig.DEFAULT_BUFFER_SIZE : new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES);
         return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.getIndex(), build), BigArrays.NON_RECYCLING_INSTANCE, bufferSize);
     }
@@ -330,9 +329,9 @@ public class TranslogTests extends ESTestCase {
         assertEquals(6, copy.estimatedNumberOfOperations());
         assertEquals(431, copy.getTranslogSizeInBytes());
         assertEquals("\"translog\"{\n" +
-                "  \"operations\" : 6,\n" +
-                "  \"size_in_bytes\" : 431\n" +
-                "}", copy.toString().trim());
+            "  \"operations\" : 6,\n" +
+            "  \"size_in_bytes\" : 431\n" +
+            "}", copy.toString().trim());
 
         try {
             new TranslogStats(1, -1);
@@ -604,7 +603,8 @@ public class TranslogTests extends ESTestCase {
         final List<Throwable> errors = new CopyOnWriteArrayList<>();
         logger.debug("using [{}] readers. [{}] writers. flushing every ~[{}] ops.", readers.length, writers.length, flushEveryOps);
         for (int i = 0; i < writers.length; i++) {
-            final String threadId = "writer_" + i;
+            final String threadName = "writer_" + i;
+            final int threadId = i;
             writers[i] = new Thread(new AbstractRunnable() {
                 @Override
                 public void doRun() throws BrokenBarrierException, InterruptedException, IOException {
@@ -629,18 +629,21 @@ public class TranslogTests extends ESTestCase {
                         if (existing != null) {
                             fail("duplicate op [" + op + "], old entry at " + location);
                         }
+                        if (id % writers.length == threadId) {
+                            translog.ensureSynced(location);
+                        }
                         writtenOpsLatch.get().countDown();
                         counter++;
                     }
-                    logger.debug("--> [{}] done. wrote [{}] ops.", threadId, counter);
+                    logger.debug("--> [{}] done. wrote [{}] ops.", threadName, counter);
                 }
 
                 @Override
                 public void onFailure(Throwable t) {
-                    logger.error("--> writer [{}] had an error", t, threadId);
+                    logger.error("--> writer [{}] had an error", t, threadName);
                     errors.add(t);
                 }
-            }, threadId);
+            }, threadName);
             writers[i].start();
         }
 
@@ -1262,12 +1265,12 @@ public class TranslogTests extends ESTestCase {
                         case CREATE:
                         case INDEX:
                             op = new Translog.Index("test", threadId + "_" + opCount,
-                                    randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
+                                randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
                             break;
                         case DELETE:
                             op = new Translog.Delete(new Term("_uid", threadId + "_" + opCount),
-                                    1 + randomInt(100000),
-                                    randomFrom(VersionType.values()));
+                                1 + randomInt(100000),
+                                randomFrom(VersionType.values()));
                             break;
                         default:
                             throw new ElasticsearchException("not supported op type");