Преглед изворни кода

Expose tragic event to translog, close translog once we hit a tragic even and fail engine if we hit one too

Simon Willnauer пре 10 година
родитељ
комит
6cefdc82f6

+ 5 - 1
core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -782,9 +782,13 @@ public class InternalEngine extends Engine {
             // but we are double-checking it's failed and closed
             if (indexWriter.isOpen() == false && indexWriter.getTragicException() != null) {
                 failEngine("already closed by tragic event", indexWriter.getTragicException());
+            } else if (translog.isOpen() == false && translog.getTragicException() != null) {
+                failEngine("already closed by tragic event", translog.getTragicException());
             }
             return true;
-        } else if (t != null && indexWriter.isOpen() == false && indexWriter.getTragicException() == t) {
+        } else if (t != null &&
+            ((indexWriter.isOpen() == false && indexWriter.getTragicException() == t)
+                || (translog.isOpen() == false && translog.getTragicException() == t))) {
             // this spot on - we are handling the tragic event exception here so we have to fail the engine
             // right away
             failEngine(source, t);

+ 30 - 4
core/src/main/java/org/elasticsearch/index/translog/Translog.java

@@ -279,7 +279,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         }
     }
 
-    boolean isOpen() {
+    /** Returns {@code true} if this {@code Translog} is still open. */
+    public boolean isOpen() {
         return closed.get() == false;
     }
 
@@ -397,7 +398,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
      * @see Index
      * @see org.elasticsearch.index.translog.Translog.Delete
      */
-    public Location add(Operation operation) throws TranslogException {
+    public Location add(Operation operation) throws IOException {
         final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
         try {
             final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out);
@@ -419,7 +420,14 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
                 assert current.assertBytesAtLocation(location, bytes);
                 return location;
             }
-        } catch (AlreadyClosedException ex) {
+        } catch (AlreadyClosedException | IOException ex) {
+            if (current.getTragicException() != null) {
+                try {
+                    close();
+                } catch (Exception inner) {
+                    ex.addSuppressed(inner);
+                }
+            }
             throw ex;
         } catch (Throwable e) {
             throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
@@ -433,6 +441,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
      * Snapshots are fixed in time and will not be updated with future operations.
      */
     public Snapshot newSnapshot() {
+        ensureOpen();
         try (ReleasableLock lock = readLock.acquire()) {
             ArrayList<TranslogReader> toOpen = new ArrayList<>();
             toOpen.addAll(recoveredTranslogs);
@@ -497,6 +506,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
             if (closed.get() == false) {
                 current.sync();
             }
+        } catch (AlreadyClosedException | IOException ex) {
+            if (current.getTragicException() != null) {
+                try {
+                    close();
+                } catch (Exception inner) {
+                    ex.addSuppressed(inner);
+                }
+            }
+            throw ex;
         }
     }
 
@@ -1296,6 +1314,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
                 throw new IllegalStateException("already committing a translog with generation: " + currentCommittingTranslog.getGeneration());
             }
             final TranslogWriter oldCurrent = current;
+            oldCurrent.ensureOpen();
             oldCurrent.sync();
             currentCommittingTranslog = current.immutableReader();
             Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
@@ -1391,7 +1410,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
 
     private void ensureOpen() {
         if (closed.get()) {
-            throw new AlreadyClosedException("translog is already closed");
+            throw new AlreadyClosedException("translog is already closed", current.getTragicException());
         }
     }
 
@@ -1406,4 +1425,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         return TranslogWriter.ChannelFactory.DEFAULT;
     }
 
+    /** If this {@code Translog} was closed as a side-effect of a tragic exception,
+     *  e.g. disk full while flushing a new segment, this returns the root cause exception.
+     *  Otherwise (no tragic exception has occurred) it returns null. */
+    public Throwable getTragicException() {
+        return current.getTragicException();
+    }
+
 }

+ 11 - 5
core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java

@@ -56,7 +56,7 @@ public class TranslogWriter extends TranslogReader {
     /* the offset in bytes written to the file */
     protected volatile long writtenOffset;
     /* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */
-    private volatile Throwable tragicEvent;
+    private volatile Throwable tragedy;
 
 
     public TranslogWriter(ShardId shardId, long generation, ChannelReference channelReference) throws IOException {
@@ -94,6 +94,12 @@ public class TranslogWriter extends TranslogReader {
             throw throwable;
         }
     }
+    /** If this {@code TranslogWriter} was closed as a side-effect of a tragic exception,
+     *  e.g. disk full while flushing a new segment, this returns the root cause exception.
+     *  Otherwise (no tragic exception has occurred) it returns null. */
+    public Throwable getTragicException() {
+        return tragedy;
+    }
 
     public enum Type {
 
@@ -125,10 +131,10 @@ public class TranslogWriter extends TranslogReader {
     protected final void closeWithTragicEvent(Throwable throwable) throws IOException {
         try (ReleasableLock lock = writeLock.acquire()) {
             if (throwable != null) {
-                if (tragicEvent == null) {
-                    tragicEvent = throwable;
+                if (tragedy == null) {
+                    tragedy = throwable;
                 } else {
-                    tragicEvent.addSuppressed(throwable);
+                    tragedy.addSuppressed(throwable);
                 }
             }
             close();
@@ -316,7 +322,7 @@ public class TranslogWriter extends TranslogReader {
 
     protected final void ensureOpen() {
         if (isClosed()) {
-            throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed", tragicEvent);
+            throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed", tragedy);
         }
     }
 }

+ 13 - 16
core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

@@ -124,7 +124,7 @@ public class TranslogTests extends ESTestCase {
         return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null);
     }
 
-    protected void addToTranslogAndList(Translog translog, ArrayList<Translog.Operation> list, Translog.Operation op) {
+    protected void addToTranslogAndList(Translog translog, ArrayList<Translog.Operation> list, Translog.Operation op) throws IOException {
         list.add(op);
         translog.add(op);
     }
@@ -335,7 +335,7 @@ public class TranslogTests extends ESTestCase {
         }
     }
 
-    public void testSnapshot() {
+    public void testSnapshot() throws IOException {
         ArrayList<Translog.Operation> ops = new ArrayList<>();
         Translog.Snapshot snapshot = translog.newSnapshot();
         assertThat(snapshot, SnapshotMatchers.size(0));
@@ -394,7 +394,7 @@ public class TranslogTests extends ESTestCase {
             Translog.Snapshot snapshot = translog.newSnapshot();
             fail("translog is closed");
         } catch (AlreadyClosedException ex) {
-            assertThat(ex.getMessage(), containsString("translog-1.tlog is already closed can't increment"));
+            assertEquals(ex.getMessage(), "translog is already closed");
         }
     }
 
@@ -639,7 +639,7 @@ public class TranslogTests extends ESTestCase {
             final String threadId = "writer_" + i;
             writers[i] = new Thread(new AbstractRunnable() {
                 @Override
-                public void doRun() throws BrokenBarrierException, InterruptedException {
+                public void doRun() throws BrokenBarrierException, InterruptedException, IOException {
                     barrier.await();
                     int counter = 0;
                     while (run.get()) {
@@ -1287,7 +1287,6 @@ public class TranslogTests extends ESTestCase {
 
     public void testFailFlush() throws IOException {
         Path tempDir = createTempDir();
-        final AtomicBoolean failWrite = new AtomicBoolean();
         final AtomicBoolean simulateDiskFull = new AtomicBoolean();
         TranslogConfig config = getTranslogConfig(tempDir);
         Translog translog = new Translog(config) {
@@ -1303,9 +1302,6 @@ public class TranslogTests extends ESTestCase {
 
                             @Override
                             public int write(ByteBuffer src) throws IOException {
-                                if (failWrite.get()) {
-                                    throw new IOException("boom");
-                                }
                                 if (simulateDiskFull.get()) {
                                     if (src.limit() > 1) {
                                         final int pos = src.position();
@@ -1337,11 +1333,8 @@ public class TranslogTests extends ESTestCase {
                 opsSynced++;
             } catch (IOException ex) {
                 failed = true;
+                assertFalse(translog.isOpen());
                 assertEquals("no space left on device", ex.getMessage());
-            } catch (TranslogException ex) {
-                // we catch IOExceptions in Translog#add -- that's how we got here
-                failed = true;
-                assertTrue(ex.toString(), ex.getMessage().startsWith("Failed to write operation"));
              }
             simulateDiskFull.set(randomBoolean());
         }
@@ -1362,16 +1355,20 @@ public class TranslogTests extends ESTestCase {
             fail("already closed");
         } catch (AlreadyClosedException ex) {
             // all is well
+            assertNotNull(ex.getCause());
+            assertSame(translog.getTragicException(), ex.getCause());
         }
 
         try {
-            translog.close();
-            if (opsAdded != opsSynced) {
-                fail("already closed");
-            }
+            translog.commit();
+            fail("already closed");
         } catch (AlreadyClosedException ex) {
             assertNotNull(ex.getCause());
+            assertSame(translog.getTragicException(), ex.getCause());
         }
+
+        assertFalse(translog.isOpen());
+        translog.close(); // we are closed
         config.setTranslogGeneration(translogGeneration);
         try (Translog tlog = new Translog(config)){
             assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration());