ソースを参照

Add checksumming and versions to the Translog's Checkpoint files (#19797)

This prepares the infrastructure to be able to extend the checkpoint file to store more information.
Boaz Leskes 9 年 前
コミット
7010082112

+ 82 - 31
core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java

@@ -18,16 +18,19 @@
  */
 package org.elasticsearch.index.translog;
 
-import org.apache.lucene.store.ByteArrayDataOutput;
+import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.store.DataInput;
 import org.apache.lucene.store.DataOutput;
-import org.apache.lucene.store.InputStreamDataInput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.OutputStreamIndexOutput;
+import org.apache.lucene.store.SimpleFSDirectory;
 import org.elasticsearch.common.io.Channels;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.nio.channels.FileChannel;
-import java.nio.file.Files;
 import java.nio.file.OpenOption;
 import java.nio.file.Path;
 
@@ -35,69 +38,117 @@ import java.nio.file.Path;
  */
 class Checkpoint {
 
-   static final int BUFFER_SIZE = Integer.BYTES  // ops
-            + Long.BYTES // offset
-            + Long.BYTES;// generation
     final long offset;
     final int numOps;
     final long generation;
 
+    private static final int INITIAL_VERSION = 1; // start with 1, just to recognize there was some magic serialization logic before
+
+    private static final String CHECKPOINT_CODEC = "ckp";
+
+    static final int FILE_SIZE = CodecUtil.headerLength(CHECKPOINT_CODEC)
+        + Integer.BYTES  // ops
+        + Long.BYTES // offset
+        + Long.BYTES // generation
+        + CodecUtil.footerLength();
+
+    static final int LEGACY_NON_CHECKSUMMED_FILE_LENGTH = Integer.BYTES  // ops
+            + Long.BYTES // offset
+            + Long.BYTES; // generation
+
     Checkpoint(long offset, int numOps, long generation) {
         this.offset = offset;
         this.numOps = numOps;
         this.generation = generation;
     }
 
-    Checkpoint(DataInput in) throws IOException {
-        offset = in.readLong();
-        numOps = in.readInt();
-        generation = in.readLong();
+    private void write(DataOutput out) throws IOException {
+        out.writeLong(offset);
+        out.writeInt(numOps);
+        out.writeLong(generation);
     }
 
-    private void write(FileChannel channel) throws IOException {
-        byte[] buffer = new byte[BUFFER_SIZE];
-        final ByteArrayDataOutput out = new ByteArrayDataOutput(buffer);
-        write(out);
-        Channels.writeToChannel(buffer, channel);
+    // reads a checksummed checkpoint introduced in ES 5.0.0
+    static Checkpoint readChecksummedV1(DataInput in) throws IOException {
+        return new Checkpoint(in.readLong(), in.readInt(), in.readLong());
     }
 
-    void write(DataOutput out) throws IOException {
-        out.writeLong(offset);
-        out.writeInt(numOps);
-        out.writeLong(generation);
+    // reads checkpoint from ES < 5.0.0
+    static Checkpoint readNonChecksummed(DataInput in) throws IOException {
+        return new Checkpoint(in.readLong(), in.readInt(), in.readLong());
     }
 
     @Override
     public String toString() {
         return "Checkpoint{" +
-                "offset=" + offset +
-                ", numOps=" + numOps +
-                ", translogFileGeneration= " + generation +
-                '}';
+            "offset=" + offset +
+            ", numOps=" + numOps +
+            ", translogFileGeneration= " + generation +
+            '}';
     }
 
     public static Checkpoint read(Path path) throws IOException {
-        try (InputStream in = Files.newInputStream(path)) {
-            return new Checkpoint(new InputStreamDataInput(in));
+        try (Directory dir = new SimpleFSDirectory(path.getParent())) {
+            try (final IndexInput indexInput = dir.openInput(path.getFileName().toString(), IOContext.DEFAULT)) {
+                if (indexInput.length() == LEGACY_NON_CHECKSUMMED_FILE_LENGTH) {
+                    // OLD unchecksummed file that was written < ES 5.0.0
+                    return Checkpoint.readNonChecksummed(indexInput);
+                }
+                // We checksum the entire file before we even go and parse it. If it's corrupted we barf right here.
+                CodecUtil.checksumEntireFile(indexInput);
+                final int fileVersion = CodecUtil.checkHeader(indexInput, CHECKPOINT_CODEC, INITIAL_VERSION, INITIAL_VERSION);
+                return Checkpoint.readChecksummedV1(indexInput);
+            }
         }
     }
 
     public static void write(ChannelFactory factory, Path checkpointFile, Checkpoint checkpoint, OpenOption... options) throws IOException {
+        final ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(FILE_SIZE) {
+            @Override
+            public synchronized byte[] toByteArray() {
+                // don't clone
+                return buf;
+            }
+        };
+        final String resourceDesc = "checkpoint(path=\"" + checkpointFile + "\", gen=" + checkpoint + ")";
+        try (final OutputStreamIndexOutput indexOutput =
+                 new OutputStreamIndexOutput(resourceDesc, checkpointFile.toString(), byteOutputStream, FILE_SIZE)) {
+            CodecUtil.writeHeader(indexOutput, CHECKPOINT_CODEC, INITIAL_VERSION);
+            checkpoint.write(indexOutput);
+            CodecUtil.writeFooter(indexOutput);
+
+            assert indexOutput.getFilePointer() == FILE_SIZE :
+                "get you number straights. Bytes written: " + indexOutput.getFilePointer() + " buffer size: " + FILE_SIZE;
+            assert indexOutput.getFilePointer() < 512 :
+                "checkpoint files have to be smaller 512b for atomic writes. size: " + indexOutput.getFilePointer();
+
+        }
+        // now go and write to the channel, in one go.
         try (FileChannel channel = factory.open(checkpointFile, options)) {
-            checkpoint.write(channel);
+            Channels.writeToChannel(byteOutputStream.toByteArray(), channel);
+            // no need to force metadata, file size stays the same and we did the full fsync
+            // when we first created the file, so the directory entry doesn't change as well
             channel.force(false);
         }
     }
 
     @Override
     public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
 
         Checkpoint that = (Checkpoint) o;
 
-        if (offset != that.offset) return false;
-        if (numOps != that.numOps) return false;
+        if (offset != that.offset) {
+            return false;
+        }
+        if (numOps != that.numOps) {
+            return false;
+        }
         return generation == that.generation;
 
     }

+ 3 - 1
core/src/main/java/org/elasticsearch/index/translog/Translog.java

@@ -200,7 +200,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
                 Files.createDirectories(location);
                 final long generation = 1;
                 Checkpoint checkpoint = new Checkpoint(0, 0, generation);
-                Checkpoint.write(getChannelFactory(), location.resolve(CHECKPOINT_FILE_NAME), checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
+                final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME);
+                Checkpoint.write(getChannelFactory(), checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
+                IOUtils.fsync(checkpointFile, false);
                 current = createWriter(generation);
                 this.lastCommittedTranslogFileGeneration = NOT_SET_GENERATION;
 

+ 5 - 8
core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java

@@ -36,11 +36,9 @@ import org.apache.lucene.util.IOUtils;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.cli.SettingCommand;
 import org.elasticsearch.cli.Terminal;
-import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.SuppressForbidden;
 import org.elasticsearch.common.io.PathUtils;
 import org.elasticsearch.index.IndexNotFoundException;
-import org.elasticsearch.index.translog.Checkpoint;
 
 import java.io.IOException;
 import java.nio.channels.Channels;
@@ -168,12 +166,11 @@ public class TruncateTranslogCommand extends SettingCommand {
 
     /** Write a checkpoint file to the given location with the given generation */
     public static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration) throws IOException {
-        try (FileChannel fc = FileChannel.open(filename, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
-                OutputStreamDataOutput out = new OutputStreamDataOutput(Channels.newOutputStream(fc))) {
-            Checkpoint emptyCheckpoint = new Checkpoint(translogLength, 0, translogGeneration);
-            emptyCheckpoint.write(out);
-            fc.force(true);
-        }
+        Checkpoint emptyCheckpoint = new Checkpoint(translogLength, 0, translogGeneration);
+        Checkpoint.write(FileChannel::open, filename, emptyCheckpoint,
+            StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
+        // fsync with metadata here to make sure.
+        IOUtils.fsync(filename, false);
     }
 
     /**