Browse Source

Merge pull request #15420 from s1monw/fail_translog_on_tragic_event

Fail and close translog hard if writing to disk fails
Simon Willnauer 9 years ago
parent
commit
bc8745dcc2

+ 6 - 2
core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -781,10 +781,14 @@ public class InternalEngine extends Engine {
             // we need to fail the engine. it might have already been failed before
             // we need to fail the engine. it might have already been failed before
             // but we are double-checking it's failed and closed
             // but we are double-checking it's failed and closed
             if (indexWriter.isOpen() == false && indexWriter.getTragicException() != null) {
             if (indexWriter.isOpen() == false && indexWriter.getTragicException() != null) {
-                failEngine("already closed by tragic event", indexWriter.getTragicException());
+                failEngine("already closed by tragic event on the index writer", indexWriter.getTragicException());
+            } else if (translog.isOpen() == false && translog.getTragicException() != null) {
+                failEngine("already closed by tragic event on the translog", translog.getTragicException());
             }
             }
             return true;
             return true;
-        } else if (t != null && indexWriter.isOpen() == false && indexWriter.getTragicException() == t) {
+        } else if (t != null &&
+            ((indexWriter.isOpen() == false && indexWriter.getTragicException() == t)
+                || (translog.isOpen() == false && translog.getTragicException() == t))) {
             // this spot on - we are handling the tragic event exception here so we have to fail the engine
             // this spot on - we are handling the tragic event exception here so we have to fail the engine
             // right away
             // right away
             failEngine(source, t);
             failEngine(source, t);

+ 37 - 17
core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java

@@ -48,22 +48,27 @@ public final class BufferingTranslogWriter extends TranslogWriter {
     public Translog.Location add(BytesReference data) throws IOException {
     public Translog.Location add(BytesReference data) throws IOException {
         try (ReleasableLock lock = writeLock.acquire()) {
         try (ReleasableLock lock = writeLock.acquire()) {
             ensureOpen();
             ensureOpen();
-            operationCounter++;
             final long offset = totalOffset;
             final long offset = totalOffset;
             if (data.length() >= buffer.length) {
             if (data.length() >= buffer.length) {
                 flush();
                 flush();
                 // we use the channel to write, since on windows, writing to the RAF might not be reflected
                 // we use the channel to write, since on windows, writing to the RAF might not be reflected
                 // when reading through the channel
                 // when reading through the channel
-                data.writeTo(channel);
+                try {
+                    data.writeTo(channel);
+                } catch (Throwable ex) {
+                    closeWithTragicEvent(ex);
+                    throw ex;
+                }
                 writtenOffset += data.length();
                 writtenOffset += data.length();
                 totalOffset += data.length();
                 totalOffset += data.length();
-                return new Translog.Location(generation, offset, data.length());
-            }
-            if (data.length() > buffer.length - bufferCount) {
-                flush();
+            } else {
+                if (data.length() > buffer.length - bufferCount) {
+                    flush();
+                }
+                data.writeTo(bufferOs);
+                totalOffset += data.length();
             }
             }
-            data.writeTo(bufferOs);
-            totalOffset += data.length();
+            operationCounter++;
             return new Translog.Location(generation, offset, data.length());
             return new Translog.Location(generation, offset, data.length());
         }
         }
     }
     }
@@ -71,10 +76,17 @@ public final class BufferingTranslogWriter extends TranslogWriter {
     protected final void flush() throws IOException {
     protected final void flush() throws IOException {
         assert writeLock.isHeldByCurrentThread();
         assert writeLock.isHeldByCurrentThread();
         if (bufferCount > 0) {
         if (bufferCount > 0) {
+            ensureOpen();
             // we use the channel to write, since on windows, writing to the RAF might not be reflected
             // we use the channel to write, since on windows, writing to the RAF might not be reflected
             // when reading through the channel
             // when reading through the channel
-            Channels.writeToChannel(buffer, 0, bufferCount, channel);
-            writtenOffset += bufferCount;
+            final int bufferSize = bufferCount;
+            try {
+                Channels.writeToChannel(buffer, 0, bufferSize, channel);
+            } catch (Throwable ex) {
+                closeWithTragicEvent(ex);
+                throw ex;
+            }
+            writtenOffset += bufferSize;
             bufferCount = 0;
             bufferCount = 0;
         }
         }
     }
     }
@@ -102,20 +114,28 @@ public final class BufferingTranslogWriter extends TranslogWriter {
     }
     }
 
 
     @Override
     @Override
-    public void sync() throws IOException {
-        if (!syncNeeded()) {
-            return;
-        }
-        synchronized (this) {
+    public synchronized void sync() throws IOException {
+        if (syncNeeded()) {
+            ensureOpen(); // this call gives a better exception that the incRef if we are closed by a tragic event
             channelReference.incRef();
             channelReference.incRef();
             try {
             try {
+                final long offsetToSync;
+                final int opsCounter;
                 try (ReleasableLock lock = writeLock.acquire()) {
                 try (ReleasableLock lock = writeLock.acquire()) {
                     flush();
                     flush();
-                    lastSyncedOffset = totalOffset;
+                    offsetToSync = totalOffset;
+                    opsCounter = operationCounter;
                 }
                 }
                 // we can do this outside of the write lock but we have to protect from
                 // we can do this outside of the write lock but we have to protect from
                 // concurrent syncs
                 // concurrent syncs
-                checkpoint(lastSyncedOffset, operationCounter, channelReference);
+                ensureOpen(); // just for kicks - the checkpoint happens or not either way
+                try {
+                    checkpoint(offsetToSync, opsCounter, channelReference);
+                } catch (Throwable ex) {
+                    closeWithTragicEvent(ex);
+                    throw ex;
+                }
+                lastSyncedOffset = offsetToSync;
             } finally {
             } finally {
                 channelReference.decRef();
                 channelReference.decRef();
             }
             }

+ 63 - 30
core/src/main/java/org/elasticsearch/index/translog/Translog.java

@@ -115,7 +115,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
     private final Path location;
     private final Path location;
     private TranslogWriter current;
     private TranslogWriter current;
     private volatile ImmutableTranslogReader currentCommittingTranslog;
     private volatile ImmutableTranslogReader currentCommittingTranslog;
-    private long lastCommittedTranslogFileGeneration = -1; // -1 is safe as it will not cause an translog deletion.
+    private volatile long lastCommittedTranslogFileGeneration = -1; // -1 is safe as it will not cause an translog deletion.
     private final AtomicBoolean closed = new AtomicBoolean();
     private final AtomicBoolean closed = new AtomicBoolean();
     private final TranslogConfig config;
     private final TranslogConfig config;
     private final String translogUUID;
     private final String translogUUID;
@@ -279,7 +279,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         }
         }
     }
     }
 
 
-    boolean isOpen() {
+    /** Returns {@code true} if this {@code Translog} is still open. */
+    public boolean isOpen() {
         return closed.get() == false;
         return closed.get() == false;
     }
     }
 
 
@@ -288,10 +289,14 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         if (closed.compareAndSet(false, true)) {
         if (closed.compareAndSet(false, true)) {
             try (ReleasableLock lock = writeLock.acquire()) {
             try (ReleasableLock lock = writeLock.acquire()) {
                 try {
                 try {
-                    IOUtils.close(current, currentCommittingTranslog);
+                    current.sync();
                 } finally {
                 } finally {
-                    IOUtils.close(recoveredTranslogs);
-                    recoveredTranslogs.clear();
+                    try {
+                        IOUtils.close(current, currentCommittingTranslog);
+                    } finally {
+                        IOUtils.close(recoveredTranslogs);
+                        recoveredTranslogs.clear();
+                    }
                 }
                 }
             } finally {
             } finally {
                 FutureUtils.cancel(syncScheduler);
                 FutureUtils.cancel(syncScheduler);
@@ -354,7 +359,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
     TranslogWriter createWriter(long fileGeneration) throws IOException {
     TranslogWriter createWriter(long fileGeneration) throws IOException {
         TranslogWriter newFile;
         TranslogWriter newFile;
         try {
         try {
-            newFile = TranslogWriter.create(config.getType(), shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), new OnCloseRunnable(), config.getBufferSize());
+            newFile = TranslogWriter.create(config.getType(), shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), new OnCloseRunnable(), config.getBufferSize(), getChannelFactory());
         } catch (IOException e) {
         } catch (IOException e) {
             throw new TranslogException(shardId, "failed to create new translog file", e);
             throw new TranslogException(shardId, "failed to create new translog file", e);
         }
         }
@@ -393,7 +398,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
      * @see Index
      * @see Index
      * @see org.elasticsearch.index.translog.Translog.Delete
      * @see org.elasticsearch.index.translog.Translog.Delete
      */
      */
-    public Location add(Operation operation) throws TranslogException {
+    public Location add(Operation operation) throws IOException {
         final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
         final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
         try {
         try {
             final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out);
             final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out);
@@ -415,7 +420,14 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
                 assert current.assertBytesAtLocation(location, bytes);
                 assert current.assertBytesAtLocation(location, bytes);
                 return location;
                 return location;
             }
             }
-        } catch (AlreadyClosedException ex) {
+        } catch (AlreadyClosedException | IOException ex) {
+            if (current.getTragicException() != null) {
+                try {
+                    close();
+                } catch (Exception inner) {
+                    ex.addSuppressed(inner);
+                }
+            }
             throw ex;
             throw ex;
         } catch (Throwable e) {
         } catch (Throwable e) {
             throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
             throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
@@ -429,6 +441,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
      * Snapshots are fixed in time and will not be updated with future operations.
      * Snapshots are fixed in time and will not be updated with future operations.
      */
      */
     public Snapshot newSnapshot() {
     public Snapshot newSnapshot() {
+        ensureOpen();
         try (ReleasableLock lock = readLock.acquire()) {
         try (ReleasableLock lock = readLock.acquire()) {
             ArrayList<TranslogReader> toOpen = new ArrayList<>();
             ArrayList<TranslogReader> toOpen = new ArrayList<>();
             toOpen.addAll(recoveredTranslogs);
             toOpen.addAll(recoveredTranslogs);
@@ -493,6 +506,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
             if (closed.get() == false) {
             if (closed.get() == false) {
                 current.sync();
                 current.sync();
             }
             }
+        } catch (AlreadyClosedException | IOException ex) {
+            if (current.getTragicException() != null) {
+                try {
+                    close();
+                } catch (Exception inner) {
+                    ex.addSuppressed(inner);
+                }
+            }
+            throw ex;
         }
         }
     }
     }
 
 
@@ -520,6 +542,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
     public boolean ensureSynced(Location location) throws IOException {
     public boolean ensureSynced(Location location) throws IOException {
         try (ReleasableLock lock = readLock.acquire()) {
         try (ReleasableLock lock = readLock.acquire()) {
             if (location.generation == current.generation) { // if we have a new one it's already synced
             if (location.generation == current.generation) { // if we have a new one it's already synced
+                ensureOpen();
                 return current.syncUpTo(location.translogLocation + location.size);
                 return current.syncUpTo(location.translogLocation + location.size);
             }
             }
         }
         }
@@ -548,31 +571,29 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
     private final class OnCloseRunnable implements Callback<ChannelReference> {
     private final class OnCloseRunnable implements Callback<ChannelReference> {
         @Override
         @Override
         public void handle(ChannelReference channelReference) {
         public void handle(ChannelReference channelReference) {
-            try (ReleasableLock lock = writeLock.acquire()) {
-                if (isReferencedGeneration(channelReference.getGeneration()) == false) {
-                    Path translogPath = channelReference.getPath();
-                    assert channelReference.getPath().getParent().equals(location) : "translog files must be in the location folder: " + location + " but was: " + translogPath;
-                    // if the given translogPath is not the current we can safely delete the file since all references are released
-                    logger.trace("delete translog file - not referenced and not current anymore {}", translogPath);
-                    IOUtils.deleteFilesIgnoringExceptions(translogPath);
-                    IOUtils.deleteFilesIgnoringExceptions(translogPath.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration())));
+            if (isReferencedGeneration(channelReference.getGeneration()) == false) {
+                Path translogPath = channelReference.getPath();
+                assert channelReference.getPath().getParent().equals(location) : "translog files must be in the location folder: " + location + " but was: " + translogPath;
+                // if the given translogPath is not the current we can safely delete the file since all references are released
+                logger.trace("delete translog file - not referenced and not current anymore {}", translogPath);
+                IOUtils.deleteFilesIgnoringExceptions(translogPath);
+                IOUtils.deleteFilesIgnoringExceptions(translogPath.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration())));
 
 
-                }
-                try (DirectoryStream<Path> stream = Files.newDirectoryStream(location)) {
-                    for (Path path : stream) {
-                        Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(path.getFileName().toString());
-                        if (matcher.matches()) {
-                            long generation = Long.parseLong(matcher.group(1));
-                            if (isReferencedGeneration(generation) == false) {
-                                logger.trace("delete translog file - not referenced and not current anymore {}", path);
-                                IOUtils.deleteFilesIgnoringExceptions(path);
-                                IOUtils.deleteFilesIgnoringExceptions(path.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration())));
-                            }
+            }
+            try (DirectoryStream<Path> stream = Files.newDirectoryStream(location)) {
+                for (Path path : stream) {
+                    Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(path.getFileName().toString());
+                    if (matcher.matches()) {
+                        long generation = Long.parseLong(matcher.group(1));
+                        if (isReferencedGeneration(generation) == false) {
+                            logger.trace("delete translog file - not referenced and not current anymore {}", path);
+                            IOUtils.deleteFilesIgnoringExceptions(path);
+                            IOUtils.deleteFilesIgnoringExceptions(path.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration())));
                         }
                         }
                     }
                     }
-                } catch (IOException e) {
-                    logger.warn("failed to delete unreferenced translog files", e);
                 }
                 }
+            } catch (IOException e) {
+                logger.warn("failed to delete unreferenced translog files", e);
             }
             }
         }
         }
     }
     }
@@ -1294,6 +1315,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
                 throw new IllegalStateException("already committing a translog with generation: " + currentCommittingTranslog.getGeneration());
                 throw new IllegalStateException("already committing a translog with generation: " + currentCommittingTranslog.getGeneration());
             }
             }
             final TranslogWriter oldCurrent = current;
             final TranslogWriter oldCurrent = current;
+            oldCurrent.ensureOpen();
             oldCurrent.sync();
             oldCurrent.sync();
             currentCommittingTranslog = current.immutableReader();
             currentCommittingTranslog = current.immutableReader();
             Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
             Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
@@ -1389,7 +1411,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
 
 
     private void ensureOpen() {
     private void ensureOpen() {
         if (closed.get()) {
         if (closed.get()) {
-            throw new AlreadyClosedException("translog is already closed");
+            throw new AlreadyClosedException("translog is already closed", current.getTragicException());
         }
         }
     }
     }
 
 
@@ -1400,4 +1422,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         return outstandingViews.size();
         return outstandingViews.size();
     }
     }
 
 
+    TranslogWriter.ChannelFactory getChannelFactory() {
+        return TranslogWriter.ChannelFactory.DEFAULT;
+    }
+
+    /** If this {@code Translog} was closed as a side-effect of a tragic exception,
+     *  e.g. disk full while flushing a new segment, this returns the root cause exception.
+     *  Otherwise (no tragic exception has occurred) it returns null. */
+    public Throwable getTragicException() {
+        return current.getTragicException();
+    }
+
 }
 }

+ 4 - 4
core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java

@@ -140,16 +140,16 @@ public abstract class TranslogReader implements Closeable, Comparable<TranslogRe
     @Override
     @Override
     public void close() throws IOException {
     public void close() throws IOException {
         if (closed.compareAndSet(false, true)) {
         if (closed.compareAndSet(false, true)) {
-            doClose();
+            channelReference.decRef();
         }
         }
     }
     }
 
 
-    protected void doClose() throws IOException {
-        channelReference.decRef();
+    protected final boolean isClosed() {
+        return closed.get();
     }
     }
 
 
     protected void ensureOpen() {
     protected void ensureOpen() {
-        if (closed.get()) {
+        if (isClosed()) {
             throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed");
             throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed");
         }
         }
     }
     }

+ 48 - 15
core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.index.translog;
 package org.elasticsearch.index.translog;
 
 
 import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.OutputStreamDataOutput;
 import org.apache.lucene.store.OutputStreamDataOutput;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.IOUtils;
@@ -54,6 +55,9 @@ public class TranslogWriter extends TranslogReader {
     protected volatile int operationCounter;
     protected volatile int operationCounter;
     /* the offset in bytes written to the file */
     /* the offset in bytes written to the file */
     protected volatile long writtenOffset;
     protected volatile long writtenOffset;
+    /* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */
+    private volatile Throwable tragedy;
+
 
 
     public TranslogWriter(ShardId shardId, long generation, ChannelReference channelReference) throws IOException {
     public TranslogWriter(ShardId shardId, long generation, ChannelReference channelReference) throws IOException {
         super(generation, channelReference, channelReference.getChannel().position());
         super(generation, channelReference, channelReference.getChannel().position());
@@ -65,10 +69,10 @@ public class TranslogWriter extends TranslogReader {
         this.lastSyncedOffset = channelReference.getChannel().position();;
         this.lastSyncedOffset = channelReference.getChannel().position();;
     }
     }
 
 
-    public static TranslogWriter create(Type type, ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback<ChannelReference> onClose, int bufferSize) throws IOException {
+    public static TranslogWriter create(Type type, ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback<ChannelReference> onClose, int bufferSize, ChannelFactory channelFactory) throws IOException {
         final BytesRef ref = new BytesRef(translogUUID);
         final BytesRef ref = new BytesRef(translogUUID);
         final int headerLength = CodecUtil.headerLength(TRANSLOG_CODEC) + ref.length + RamUsageEstimator.NUM_BYTES_INT;
         final int headerLength = CodecUtil.headerLength(TRANSLOG_CODEC) + ref.length + RamUsageEstimator.NUM_BYTES_INT;
-        final FileChannel channel = FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
+        final FileChannel channel = channelFactory.open(file);
         try {
         try {
             // This OutputStreamDataOutput is intentionally not closed because
             // This OutputStreamDataOutput is intentionally not closed because
             // closing it will close the FileChannel
             // closing it will close the FileChannel
@@ -90,6 +94,12 @@ public class TranslogWriter extends TranslogReader {
             throw throwable;
             throw throwable;
         }
         }
     }
     }
+    /** If this {@code TranslogWriter} was closed as a side-effect of a tragic exception,
+     *  e.g. disk full while flushing a new segment, this returns the root cause exception.
+     *  Otherwise (no tragic exception has occurred) it returns null. */
+    public Throwable getTragicException() {
+        return tragedy;
+    }
 
 
     public enum Type {
     public enum Type {
 
 
@@ -118,6 +128,16 @@ public class TranslogWriter extends TranslogReader {
         }
         }
     }
     }
 
 
+    protected final void closeWithTragicEvent(Throwable throwable) throws IOException {
+        try (ReleasableLock lock = writeLock.acquire()) {
+            if (tragedy == null) {
+                tragedy = throwable;
+            } else {
+                tragedy.addSuppressed(throwable);
+            }
+            close();
+        }
+    }
 
 
     /**
     /**
      * add the given bytes to the translog and return the location they were written at
      * add the given bytes to the translog and return the location they were written at
@@ -127,9 +147,14 @@ public class TranslogWriter extends TranslogReader {
         try (ReleasableLock lock = writeLock.acquire()) {
         try (ReleasableLock lock = writeLock.acquire()) {
             ensureOpen();
             ensureOpen();
             position = writtenOffset;
             position = writtenOffset;
-            data.writeTo(channel);
+            try {
+                data.writeTo(channel);
+            } catch (Throwable e) {
+                closeWithTragicEvent(e);
+                throw e;
+            }
             writtenOffset = writtenOffset + data.length();
             writtenOffset = writtenOffset + data.length();
-            operationCounter = operationCounter + 1;
+            operationCounter++;;
         }
         }
         return new Translog.Location(generation, position, data.length());
         return new Translog.Location(generation, position, data.length());
     }
     }
@@ -143,12 +168,13 @@ public class TranslogWriter extends TranslogReader {
     /**
     /**
      * write all buffered ops to disk and fsync file
      * write all buffered ops to disk and fsync file
      */
      */
-    public void sync() throws IOException {
+    public synchronized void sync() throws IOException { // synchronized to ensure only one sync happens a time
         // check if we really need to sync here...
         // check if we really need to sync here...
         if (syncNeeded()) {
         if (syncNeeded()) {
             try (ReleasableLock lock = writeLock.acquire()) {
             try (ReleasableLock lock = writeLock.acquire()) {
+                ensureOpen();
+                checkpoint(writtenOffset, operationCounter, channelReference);
                 lastSyncedOffset = writtenOffset;
                 lastSyncedOffset = writtenOffset;
-                checkpoint(lastSyncedOffset, operationCounter, channelReference);
             }
             }
         }
         }
     }
     }
@@ -262,15 +288,6 @@ public class TranslogWriter extends TranslogReader {
         return false;
         return false;
     }
     }
 
 
-    @Override
-    protected final void doClose() throws IOException {
-        try (ReleasableLock lock = writeLock.acquire()) {
-            sync();
-        } finally {
-            super.doClose();
-        }
-    }
-
     @Override
     @Override
     protected void readBytes(ByteBuffer buffer, long position) throws IOException {
     protected void readBytes(ByteBuffer buffer, long position) throws IOException {
         try (ReleasableLock lock = readLock.acquire()) {
         try (ReleasableLock lock = readLock.acquire()) {
@@ -288,4 +305,20 @@ public class TranslogWriter extends TranslogReader {
         Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation);
         Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation);
         Checkpoint.write(checkpointFile, checkpoint, options);
         Checkpoint.write(checkpointFile, checkpoint, options);
     }
     }
+
+    static class ChannelFactory {
+
+        static final ChannelFactory DEFAULT = new ChannelFactory();
+
+        // only for testing until we have a disk-full FileSystemt
+        public FileChannel open(Path file) throws IOException {
+            return FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
+        }
+    }
+
+    protected final void ensureOpen() {
+        if (isClosed()) {
+            throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed", tragedy);
+        }
+    }
 }
 }

+ 2 - 3
core/src/test/java/org/elasticsearch/index/translog/BufferedTranslogTests.java

@@ -34,13 +34,12 @@ import java.nio.file.Path;
 public class BufferedTranslogTests extends TranslogTests {
 public class BufferedTranslogTests extends TranslogTests {
 
 
     @Override
     @Override
-    protected Translog create(Path path) throws IOException {
+    protected TranslogConfig getTranslogConfig(Path path) {
         Settings build = Settings.settingsBuilder()
         Settings build = Settings.settingsBuilder()
                 .put("index.translog.fs.type", TranslogWriter.Type.BUFFERED.name())
                 .put("index.translog.fs.type", TranslogWriter.Type.BUFFERED.name())
                 .put("index.translog.fs.buffer_size", 10 + randomInt(128 * 1024), ByteSizeUnit.BYTES)
                 .put("index.translog.fs.buffer_size", 10 + randomInt(128 * 1024), ByteSizeUnit.BYTES)
                 .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
                 .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
                 .build();
                 .build();
-        TranslogConfig translogConfig = new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null);
-        return new Translog(translogConfig);
+        return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null);
     }
     }
 }
 }

+ 130 - 7
core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

@@ -22,9 +22,11 @@ package org.elasticsearch.index.translog;
 import com.carrotsearch.randomizedtesting.generators.RandomPicks;
 import com.carrotsearch.randomizedtesting.generators.RandomPicks;
 import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.index.Term;
+import org.apache.lucene.mockfile.FilterFileChannel;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.ByteArrayDataOutput;
 import org.apache.lucene.store.ByteArrayDataOutput;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.LineFileDocs;
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.LuceneTestCase;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -110,16 +112,19 @@ public class TranslogTests extends ESTestCase {
         }
         }
     }
     }
 
 
-    protected Translog create(Path path) throws IOException {
+    private Translog create(Path path) throws IOException {
+        return new Translog(getTranslogConfig(path));
+    }
+
+    protected TranslogConfig getTranslogConfig(Path path) {
         Settings build = Settings.settingsBuilder()
         Settings build = Settings.settingsBuilder()
                 .put(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, TranslogWriter.Type.SIMPLE.name())
                 .put(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, TranslogWriter.Type.SIMPLE.name())
                 .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
                 .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
                 .build();
                 .build();
-        TranslogConfig translogConfig = new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null);
-        return new Translog(translogConfig);
+        return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null);
     }
     }
 
 
-    protected void addToTranslogAndList(Translog translog, ArrayList<Translog.Operation> list, Translog.Operation op) {
+    protected void addToTranslogAndList(Translog translog, ArrayList<Translog.Operation> list, Translog.Operation op) throws IOException {
         list.add(op);
         list.add(op);
         translog.add(op);
         translog.add(op);
     }
     }
@@ -330,7 +335,7 @@ public class TranslogTests extends ESTestCase {
         }
         }
     }
     }
 
 
-    public void testSnapshot() {
+    public void testSnapshot() throws IOException {
         ArrayList<Translog.Operation> ops = new ArrayList<>();
         ArrayList<Translog.Operation> ops = new ArrayList<>();
         Translog.Snapshot snapshot = translog.newSnapshot();
         Translog.Snapshot snapshot = translog.newSnapshot();
         assertThat(snapshot, SnapshotMatchers.size(0));
         assertThat(snapshot, SnapshotMatchers.size(0));
@@ -389,7 +394,7 @@ public class TranslogTests extends ESTestCase {
             Translog.Snapshot snapshot = translog.newSnapshot();
             Translog.Snapshot snapshot = translog.newSnapshot();
             fail("translog is closed");
             fail("translog is closed");
         } catch (AlreadyClosedException ex) {
         } catch (AlreadyClosedException ex) {
-            assertThat(ex.getMessage(), containsString("translog-1.tlog is already closed can't increment"));
+            assertEquals(ex.getMessage(), "translog is already closed");
         }
         }
     }
     }
 
 
@@ -634,7 +639,7 @@ public class TranslogTests extends ESTestCase {
             final String threadId = "writer_" + i;
             final String threadId = "writer_" + i;
             writers[i] = new Thread(new AbstractRunnable() {
             writers[i] = new Thread(new AbstractRunnable() {
                 @Override
                 @Override
-                public void doRun() throws BrokenBarrierException, InterruptedException {
+                public void doRun() throws BrokenBarrierException, InterruptedException, IOException {
                     barrier.await();
                     barrier.await();
                     int counter = 0;
                     int counter = 0;
                     while (run.get()) {
                     while (run.get()) {
@@ -1279,4 +1284,122 @@ public class TranslogTests extends ESTestCase {
             }
             }
         }
         }
     }
     }
+
+    public void testFailFlush() throws IOException {
+        Path tempDir = createTempDir();
+        final AtomicBoolean simulateDiskFull = new AtomicBoolean();
+        TranslogConfig config = getTranslogConfig(tempDir);
+        Translog translog = new Translog(config) {
+            @Override
+            TranslogWriter.ChannelFactory getChannelFactory() {
+                final TranslogWriter.ChannelFactory factory = super.getChannelFactory();
+
+                return new TranslogWriter.ChannelFactory() {
+                    @Override
+                    public FileChannel open(Path file) throws IOException {
+                        FileChannel channel = factory.open(file);
+                        return new FilterFileChannel(channel) {
+
+                            @Override
+                            public int write(ByteBuffer src) throws IOException {
+                                if (simulateDiskFull.get()) {
+                                    if (src.limit() > 1) {
+                                        final int pos = src.position();
+                                        final int limit = src.limit();
+                                        src.limit(limit / 2);
+                                        super.write(src);
+                                        src.position(pos);
+                                        src.limit(limit);
+                                        throw new IOException("__FAKE__ no space left on device");
+                                    }
+                                }
+                                return super.write(src);
+                            }
+                        };
+                    }
+                };
+            }
+        };
+
+        List<Translog.Location> locations = new ArrayList<>();
+        int opsSynced = 0;
+        int opsAdded = 0;
+        boolean failed = false;
+        while(failed == false) {
+            try {
+                locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
+                opsAdded++;
+                translog.sync();
+                opsSynced++;
+            } catch (IOException ex) {
+                failed = true;
+                assertFalse(translog.isOpen());
+                assertEquals("__FAKE__ no space left on device", ex.getMessage());
+             }
+            simulateDiskFull.set(randomBoolean());
+        }
+        simulateDiskFull.set(false);
+        if (randomBoolean()) {
+            try {
+                locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
+                fail("we are already closed");
+            } catch (AlreadyClosedException ex) {
+                assertNotNull(ex.getCause());
+                assertEquals(ex.getCause().getMessage(), "__FAKE__ no space left on device");
+            }
+
+        }
+        Translog.TranslogGeneration translogGeneration = translog.getGeneration();
+        try {
+            translog.newSnapshot();
+            fail("already closed");
+        } catch (AlreadyClosedException ex) {
+            // all is well
+            assertNotNull(ex.getCause());
+            assertSame(translog.getTragicException(), ex.getCause());
+        }
+
+        try {
+            translog.commit();
+            fail("already closed");
+        } catch (AlreadyClosedException ex) {
+            assertNotNull(ex.getCause());
+            assertSame(translog.getTragicException(), ex.getCause());
+        }
+
+        assertFalse(translog.isOpen());
+        translog.close(); // we are closed
+        config.setTranslogGeneration(translogGeneration);
+        try (Translog tlog = new Translog(config)){
+            assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration());
+            assertFalse(tlog.syncNeeded());
+
+            try (Translog.Snapshot snapshot = tlog.newSnapshot()) {
+                assertEquals(opsSynced, snapshot.estimatedTotalOperations());
+                for (int i = 0; i < opsSynced; i++) {
+                    assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1, locations.get(i).generation);
+                    Translog.Operation next = snapshot.next();
+                    assertNotNull("operation " + i + " must be non-null", next);
+                    assertEquals(i, Integer.parseInt(next.getSource().source.toUtf8()));
+                }
+            }
+        }
+    }
+
+    public void testTranslogOpsCountIsCorrect() throws IOException {
+        List<Translog.Location> locations = new ArrayList<>();
+        int numOps = randomIntBetween(100, 200);
+        LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
+        for (int opsAdded = 0; opsAdded < numOps; opsAdded++) {
+            locations.add(translog.add(new Translog.Index("test", "" + opsAdded, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))));
+            try (Translog.Snapshot snapshot = translog.newSnapshot()) {
+                assertEquals(opsAdded+1, snapshot.estimatedTotalOperations());
+                for (int i = 0; i < opsAdded; i++) {
+                    assertEquals("expected operation" + i + " to be in the current translog but wasn't", translog.currentFileGeneration(), locations.get(i).generation);
+                    Translog.Operation next = snapshot.next();
+                    assertNotNull("operation " + i + " must be non-null", next);
+                }
+            }
+        }
+    }
 }
 }