Преглед изворни кода

Keep checkpoint file channel open across fsyncs (#61744)

Currently we open and close the checkpoint file channel for every fsync.
This file channel can be kept open for the lifecycle of a translog
writer. This avoids the overhead of opening the file, checking file
permissions, and closing the file on every fsync.
Tim Brooks пре 5 година
родитељ
комит
1a31f9e332

+ 33 - 1
server/src/main/java/org/elasticsearch/common/io/Channels.java

@@ -172,7 +172,6 @@ public final class Channels {
         writeToChannel(source, 0, source.length, channel);
     }
 
-
     /**
      * Writes part of a byte array to a {@link java.nio.channels.WritableByteChannel}
      *
@@ -195,6 +194,39 @@ public final class Channels {
         assert length == 0 : "wrote more then expected bytes (length=" + length + ")";
     }
 
+    /**
+     * Writes part of a byte array to a {@link java.nio.channels.WritableByteChannel} at the provided
+     * position.
+     *
+     * @param source          byte array to copy from
+     * @param channel         target WritableByteChannel
+     * @param channelPosition position to write at
+     */
+    public static void writeToChannel(byte[] source, FileChannel channel, long channelPosition) throws IOException {
+        writeToChannel(source, 0, source.length, channel, channelPosition);
+    }
+
+    /**
+     * Writes part of a byte array to a {@link java.nio.channels.WritableByteChannel} at the provided
+     * position.
+     *
+     * @param source          byte array to copy from
+     * @param offset          start copying from this offset
+     * @param length          how many bytes to copy
+     * @param channel         target WritableByteChannel
+     * @param channelPosition position to write at
+     */
+    public static void writeToChannel(byte[] source, int offset, int length, FileChannel channel, long channelPosition) throws IOException {
+        ByteBuffer buffer = ByteBuffer.wrap(source, offset, length);
+        int written = channel.write(buffer, channelPosition);
+        length -= written;
+        while (length > 0) {
+            written = channel.write(buffer, channelPosition + buffer.position());
+            length -= written;
+        }
+        assert length == 0 : "wrote more then expected bytes (length=" + length + ")";
+    }
+
     /**
      * Writes a {@link java.nio.ByteBuffer} to a {@link java.nio.channels.WritableByteChannel}
      *

+ 21 - 8
server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java

@@ -167,6 +167,26 @@ final class Checkpoint {
     }
 
     public static void write(ChannelFactory factory, Path checkpointFile, Checkpoint checkpoint, OpenOption... options) throws IOException {
+        byte[] bytes = createCheckpointBytes(checkpointFile, checkpoint);
+
+        // now go and write to the channel, in one go.
+        try (FileChannel channel = factory.open(checkpointFile, options)) {
+            Channels.writeToChannel(bytes, channel);
+            // no need to force metadata, file size stays the same and we did the full fsync
+            // when we first created the file, so the directory entry doesn't change as well
+            channel.force(false);
+        }
+    }
+
+    public static void write(FileChannel fileChannel, Path checkpointFile, Checkpoint checkpoint) throws IOException {
+        byte[] bytes = createCheckpointBytes(checkpointFile, checkpoint);
+        Channels.writeToChannel(bytes, fileChannel, 0);
+        // no need to force metadata, file size stays the same and we did the full fsync
+        // when we first created the file, so the directory entry doesn't change as well
+        fileChannel.force(false);
+    }
+
+    private static byte[] createCheckpointBytes(Path checkpointFile, Checkpoint checkpoint) throws IOException {
         final ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(V3_FILE_SIZE) {
             @Override
             public synchronized byte[] toByteArray() {
@@ -185,15 +205,8 @@ final class Checkpoint {
                 "get you numbers straight; bytes written: " + indexOutput.getFilePointer() + ", buffer size: " + V3_FILE_SIZE;
             assert indexOutput.getFilePointer() < 512 :
                 "checkpoint files have to be smaller than 512 bytes for atomic writes; size: " + indexOutput.getFilePointer();
-
-        }
-        // now go and write to the channel, in one go.
-        try (FileChannel channel = factory.open(checkpointFile, options)) {
-            Channels.writeToChannel(byteOutputStream.toByteArray(), channel);
-            // no need to force metadata, file size stays the same and we did the full fsync
-            // when we first created the file, so the directory entry doesn't change as well
-            channel.force(false);
         }
+        return byteOutputStream.toByteArray();
     }
 
     @Override

+ 3 - 3
server/src/main/java/org/elasticsearch/index/translog/Translog.java

@@ -498,9 +498,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
      */
     TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen, long initialGlobalCheckpoint,
                                 LongConsumer persistedSequenceNumberConsumer) throws IOException {
-        final TranslogWriter newFile;
+        final TranslogWriter newWriter;
         try {
-            newFile = TranslogWriter.create(
+            newWriter = TranslogWriter.create(
                 shardId,
                 translogUUID,
                 fileGeneration,
@@ -513,7 +513,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
         } catch (final IOException e) {
             throw new TranslogException(shardId, "failed to create new translog file", e);
         }
-        return newFile;
+        return newWriter;
     }
 
     /**

+ 26 - 13
server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java

@@ -22,13 +22,13 @@ package org.elasticsearch.index.translog;
 import com.carrotsearch.hppc.LongArrayList;
 import com.carrotsearch.hppc.procedures.LongProcedure;
 import org.apache.lucene.store.AlreadyClosedException;
-import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.Assertions;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.Channels;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.ShardId;
 
@@ -49,7 +49,8 @@ import java.util.function.LongSupplier;
 
 public class TranslogWriter extends BaseTranslogReader implements Closeable {
     private final ShardId shardId;
-    private final ChannelFactory channelFactory;
+    private final FileChannel checkpointChannel;
+    private final Path checkpointPath;
     // the last checkpoint that was written when the translog was last synced
     private volatile Checkpoint lastSyncedCheckpoint;
     /* the number of translog operations written to this file */
@@ -79,11 +80,12 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
     private final Map<Long, Tuple<BytesReference, Exception>> seenSequenceNumbers;
 
     private TranslogWriter(
-        final ChannelFactory channelFactory,
         final ShardId shardId,
         final Checkpoint initialCheckpoint,
         final FileChannel channel,
+        final FileChannel checkpointChannel,
         final Path path,
+        final Path checkpointPath,
         final ByteSizeValue bufferSize,
         final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header,
         TragicExceptionHolder tragedy,
@@ -95,7 +97,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
             "initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel position ["
                 + channel.position() + "]";
         this.shardId = shardId;
-        this.channelFactory = channelFactory;
+        this.checkpointChannel = checkpointChannel;
+        this.checkpointPath = checkpointPath;
         this.minTranslogGenerationSupplier = minTranslogGenerationSupplier;
         this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channel), bufferSize.bytesAsInt());
         this.lastSyncedCheckpoint = initialCheckpoint;
@@ -117,13 +120,17 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
                                         final LongSupplier globalCheckpointSupplier, final LongSupplier minTranslogGenerationSupplier,
                                         final long primaryTerm, TragicExceptionHolder tragedy, LongConsumer persistedSequenceNumberConsumer)
         throws IOException {
+        final Path checkpointFile = file.getParent().resolve(Translog.CHECKPOINT_FILE_NAME);
+
         final FileChannel channel = channelFactory.open(file);
+        FileChannel checkpointChannel = null;
         try {
+            checkpointChannel = channelFactory.open(checkpointFile, StandardOpenOption.WRITE);
             final TranslogHeader header = new TranslogHeader(translogUUID, primaryTerm);
             header.write(channel);
             final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(header.sizeInBytes(), fileGeneration,
                 initialGlobalCheckpoint, initialMinTranslogGen);
-            writeCheckpoint(channelFactory, file.getParent(), checkpoint);
+            writeCheckpoint(checkpointChannel, checkpointFile, checkpoint);
             final LongSupplier writerGlobalCheckpointSupplier;
             if (Assertions.ENABLED) {
                 writerGlobalCheckpointSupplier = () -> {
@@ -135,13 +142,13 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
             } else {
                 writerGlobalCheckpointSupplier = globalCheckpointSupplier;
             }
-            return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize,
+            return new TranslogWriter(shardId, checkpoint, channel, checkpointChannel, file, checkpointFile, bufferSize,
                 writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header, tragedy, persistedSequenceNumberConsumer);
         } catch (Exception exception) {
             // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
             // file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation
             // is an error condition
-            IOUtils.closeWhileHandlingException(channel);
+            IOUtils.closeWhileHandlingException(channel, checkpointChannel);
             throw exception;
         }
     }
@@ -314,6 +321,12 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
                     throw ex;
                 }
                 if (closed.compareAndSet(false, true)) {
+                    try {
+                        checkpointChannel.close();
+                    } catch (final Exception ex) {
+                        closeWithTragicEvent(ex);
+                        throw ex;
+                    }
                     return new TranslogReader(getLastSyncedCheckpoint(), channel, path, header);
                 } else {
                     throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]",
@@ -374,7 +387,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
                     // we can continue writing to the buffer etc.
                     try {
                         channel.force(false);
-                        writeCheckpoint(channelFactory, path.getParent(), checkpointToSync);
+                        writeCheckpoint(checkpointChannel, checkpointPath, checkpointToSync);
                     } catch (final Exception ex) {
                         closeWithTragicEvent(ex);
                         throw ex;
@@ -414,10 +427,10 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
     }
 
     private static void writeCheckpoint(
-            final ChannelFactory channelFactory,
-            final Path translogFile,
-            final Checkpoint checkpoint) throws IOException {
-        Checkpoint.write(channelFactory, translogFile.resolve(Translog.CHECKPOINT_FILE_NAME), checkpoint, StandardOpenOption.WRITE);
+        final FileChannel fileChannel,
+        final Path checkpointFile,
+        final Checkpoint checkpoint) throws IOException {
+        Checkpoint.write(fileChannel, checkpointFile, checkpoint);
     }
 
     /**
@@ -438,7 +451,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
     @Override
     public final void close() throws IOException {
         if (closed.compareAndSet(false, true)) {
-            channel.close();
+            IOUtils.close(checkpointChannel, channel);
         }
     }
 

+ 20 - 4
server/src/test/java/org/elasticsearch/common/ChannelsTests.java

@@ -63,7 +63,7 @@ public class ChannelsTests extends ESTestCase {
     }
 
     public void testReadWriteThoughArrays() throws Exception {
-        Channels.writeToChannel(randomBytes, fileChannel);
+        writeToChannel(randomBytes, fileChannel);
         byte[] readBytes = Channels.readFromFileChannel(fileChannel, 0, randomBytes.length);
         assertThat("read bytes didn't match written bytes", randomBytes, Matchers.equalTo(readBytes));
     }
@@ -72,7 +72,7 @@ public class ChannelsTests extends ESTestCase {
     public void testPartialReadWriteThroughArrays() throws Exception {
         int length = randomIntBetween(1, randomBytes.length / 2);
         int offset = randomIntBetween(0, randomBytes.length - length);
-        Channels.writeToChannel(randomBytes, offset, length, fileChannel);
+        writeToChannel(randomBytes, offset, length, fileChannel);
 
         int lengthToRead = randomIntBetween(1, length);
         int offsetToRead = randomIntBetween(0, length - lengthToRead);
@@ -87,7 +87,7 @@ public class ChannelsTests extends ESTestCase {
 
     public void testBufferReadPastEOFWithException() throws Exception {
         int bytesToWrite = randomIntBetween(0, randomBytes.length - 1);
-        Channels.writeToChannel(randomBytes, 0, bytesToWrite, fileChannel);
+        writeToChannel(randomBytes, 0, bytesToWrite, fileChannel);
         try {
             Channels.readFromFileChannel(fileChannel, 0, bytesToWrite + 1 + randomInt(1000));
             fail("Expected an EOFException");
@@ -98,7 +98,7 @@ public class ChannelsTests extends ESTestCase {
 
     public void testBufferReadPastEOFWithoutException() throws Exception {
         int bytesToWrite = randomIntBetween(0, randomBytes.length - 1);
-        Channels.writeToChannel(randomBytes, 0, bytesToWrite, fileChannel);
+        writeToChannel(randomBytes, 0, bytesToWrite, fileChannel);
         byte[] bytes = new byte[bytesToWrite + 1 + randomInt(1000)];
         int read = Channels.readFromFileChannel(fileChannel, 0, bytes, 0, bytes.length);
         assertThat(read, Matchers.lessThan(0));
@@ -161,6 +161,22 @@ public class ChannelsTests extends ESTestCase {
         assertTrue("read bytes didn't match written bytes", sourceRef.equals(copyRef));
     }
 
+    private static void writeToChannel(byte[] source, int offset, int length, FileChannel channel) throws IOException {
+        if (randomBoolean()) {
+            Channels.writeToChannel(source, offset, length, channel, channel.position());
+        } else {
+            Channels.writeToChannel(source, offset, length, channel);
+        }
+    }
+
+    private static void writeToChannel(byte[] source, FileChannel channel) throws IOException {
+        if (randomBoolean()) {
+            Channels.writeToChannel(source, channel, channel.position());
+        } else {
+            Channels.writeToChannel(source, channel);
+        }
+    }
+
     class MockFileChannel extends FileChannel {
 
         FileChannel delegate;

+ 19 - 1
server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

@@ -2416,7 +2416,25 @@ public class TranslogTests extends ESTestCase {
 
         @Override
         public int write(ByteBuffer src, long position) throws IOException {
-            throw new UnsupportedOperationException();
+            if (fail.fail()) {
+                if (partialWrite) {
+                    if (src.hasRemaining()) {
+                        final int pos = src.position();
+                        final int limit = src.limit();
+                        src.limit(randomIntBetween(pos, limit));
+                        super.write(src, position);
+                        src.limit(limit);
+                        src.position(pos);
+                        throw new IOException("__FAKE__ no space left on device");
+                    }
+                }
+                if (throwUnknownException) {
+                    throw new UnknownException();
+                } else {
+                    throw new MockDirectoryWrapper.FakeIOException();
+                }
+            }
+            return super.write(src, position);
         }