|
@@ -28,11 +28,14 @@ import org.apache.lucene.util.RamUsageEstimator;
|
|
|
import org.elasticsearch.common.bytes.BytesArray;
|
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
|
import org.elasticsearch.common.io.Channels;
|
|
|
+import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
import org.elasticsearch.common.util.Callback;
|
|
|
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
|
|
import org.elasticsearch.index.shard.ShardId;
|
|
|
|
|
|
+import java.io.BufferedOutputStream;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.OutputStream;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.FileChannel;
|
|
|
import java.nio.file.Files;
|
|
@@ -49,30 +52,39 @@ public class TranslogWriter extends TranslogReader {
|
|
|
public static final int VERSION_CHECKPOINTS = 2; // since 2.0 we have checkpoints?
|
|
|
public static final int VERSION = VERSION_CHECKPOINTS;
|
|
|
|
|
|
- protected final ShardId shardId;
|
|
|
- protected final ReleasableLock readLock;
|
|
|
- protected final ReleasableLock writeLock;
|
|
|
+ private final ShardId shardId;
|
|
|
+ private final ReleasableLock readLock;
|
|
|
+ private final ReleasableLock writeLock;
|
|
|
/* the offset in bytes that was written when the file was last synced*/
|
|
|
- protected volatile long lastSyncedOffset;
|
|
|
+ private volatile long lastSyncedOffset;
|
|
|
/* the number of translog operations written to this file */
|
|
|
- protected volatile int operationCounter;
|
|
|
+ private volatile int operationCounter;
|
|
|
/* the offset in bytes written to the file */
|
|
|
- protected volatile long writtenOffset;
|
|
|
+ private volatile long writtenOffset;
|
|
|
/* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */
|
|
|
private volatile Throwable tragedy;
|
|
|
|
|
|
+ private final byte[] buffer;
|
|
|
+ private int bufferCount;
|
|
|
+ private WrapperOutputStream bufferOs = new WrapperOutputStream();
|
|
|
|
|
|
- public TranslogWriter(ShardId shardId, long generation, ChannelReference channelReference) throws IOException {
|
|
|
+ /* the total offset of this file including the bytes written to the file as well as into the buffer */
|
|
|
+ private volatile long totalOffset;
|
|
|
+
|
|
|
+
|
|
|
+ public TranslogWriter(ShardId shardId, long generation, ChannelReference channelReference, ByteSizeValue bufferSize) throws IOException {
|
|
|
super(generation, channelReference, channelReference.getChannel().position());
|
|
|
this.shardId = shardId;
|
|
|
ReadWriteLock rwl = new ReentrantReadWriteLock();
|
|
|
readLock = new ReleasableLock(rwl.readLock());
|
|
|
writeLock = new ReleasableLock(rwl.writeLock());
|
|
|
this.writtenOffset = channelReference.getChannel().position();
|
|
|
- this.lastSyncedOffset = channelReference.getChannel().position();;
|
|
|
+ this.totalOffset = writtenOffset;
|
|
|
+ this.buffer = new byte[bufferSize.bytesAsInt()];
|
|
|
+ this.lastSyncedOffset = channelReference.getChannel().position();
|
|
|
}
|
|
|
|
|
|
- public static TranslogWriter create(Type type, ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback<ChannelReference> onClose, int bufferSize, ChannelFactory channelFactory) throws IOException {
|
|
|
+ public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback<ChannelReference> onClose, ChannelFactory channelFactory, ByteSizeValue bufferSize) throws IOException {
|
|
|
final BytesRef ref = new BytesRef(translogUUID);
|
|
|
final int headerLength = CodecUtil.headerLength(TRANSLOG_CODEC) + ref.length + RamUsageEstimator.NUM_BYTES_INT;
|
|
|
final FileChannel channel = channelFactory.open(file);
|
|
@@ -85,7 +97,7 @@ public class TranslogWriter extends TranslogReader {
|
|
|
out.writeBytes(ref.bytes, ref.offset, ref.length);
|
|
|
channel.force(false);
|
|
|
writeCheckpoint(headerLength, 0, file.getParent(), fileGeneration, StandardOpenOption.WRITE);
|
|
|
- final TranslogWriter writer = type.create(shardId, fileGeneration, new ChannelReference(file, fileGeneration, channel, onClose), bufferSize);
|
|
|
+ final TranslogWriter writer = new TranslogWriter(shardId, fileGeneration, new ChannelReference(file, fileGeneration, channel, onClose), bufferSize);
|
|
|
return writer;
|
|
|
} catch (Throwable throwable){
|
|
|
IOUtils.closeWhileHandlingException(channel);
|
|
@@ -104,34 +116,7 @@ public class TranslogWriter extends TranslogReader {
|
|
|
return tragedy;
|
|
|
}
|
|
|
|
|
|
- public enum Type {
|
|
|
-
|
|
|
- SIMPLE() {
|
|
|
- @Override
|
|
|
- public TranslogWriter create(ShardId shardId, long generation, ChannelReference channelReference, int bufferSize) throws IOException {
|
|
|
- return new TranslogWriter(shardId, generation, channelReference);
|
|
|
- }
|
|
|
- },
|
|
|
- BUFFERED() {
|
|
|
- @Override
|
|
|
- public TranslogWriter create(ShardId shardId, long generation, ChannelReference channelReference, int bufferSize) throws IOException {
|
|
|
- return new BufferingTranslogWriter(shardId, generation, channelReference, bufferSize);
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- public abstract TranslogWriter create(ShardId shardId, long generation, ChannelReference raf, int bufferSize) throws IOException;
|
|
|
-
|
|
|
- public static Type fromString(String type) {
|
|
|
- if (SIMPLE.name().equalsIgnoreCase(type)) {
|
|
|
- return SIMPLE;
|
|
|
- } else if (BUFFERED.name().equalsIgnoreCase(type)) {
|
|
|
- return BUFFERED;
|
|
|
- }
|
|
|
- throw new IllegalArgumentException("No translog fs type [" + type + "]");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- protected final void closeWithTragicEvent(Throwable throwable) throws IOException {
|
|
|
+ private final void closeWithTragicEvent(Throwable throwable) throws IOException {
|
|
|
try (ReleasableLock lock = writeLock.acquire()) {
|
|
|
if (tragedy == null) {
|
|
|
tragedy = throwable;
|
|
@@ -146,38 +131,60 @@ public class TranslogWriter extends TranslogReader {
|
|
|
* add the given bytes to the translog and return the location they were written at
|
|
|
*/
|
|
|
public Translog.Location add(BytesReference data) throws IOException {
|
|
|
- final long position;
|
|
|
try (ReleasableLock lock = writeLock.acquire()) {
|
|
|
ensureOpen();
|
|
|
- position = writtenOffset;
|
|
|
- try {
|
|
|
- data.writeTo(channel);
|
|
|
- } catch (Throwable e) {
|
|
|
- closeWithTragicEvent(e);
|
|
|
- throw e;
|
|
|
+ final long offset = totalOffset;
|
|
|
+ if (data.length() >= buffer.length) {
|
|
|
+ flush();
|
|
|
+ // we use the channel to write, since on windows, writing to the RAF might not be reflected
|
|
|
+ // when reading through the channel
|
|
|
+ try {
|
|
|
+ data.writeTo(channel);
|
|
|
+ } catch (Throwable ex) {
|
|
|
+ closeWithTragicEvent(ex);
|
|
|
+ throw ex;
|
|
|
+ }
|
|
|
+ writtenOffset += data.length();
|
|
|
+ totalOffset += data.length();
|
|
|
+ } else {
|
|
|
+ if (data.length() > buffer.length - bufferCount) {
|
|
|
+ flush();
|
|
|
+ }
|
|
|
+ data.writeTo(bufferOs);
|
|
|
+ totalOffset += data.length();
|
|
|
}
|
|
|
- writtenOffset = writtenOffset + data.length();
|
|
|
- operationCounter++;;
|
|
|
+ operationCounter++;
|
|
|
+ return new Translog.Location(generation, offset, data.length());
|
|
|
}
|
|
|
- return new Translog.Location(generation, position, data.length());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * change the size of the internal buffer if relevant
|
|
|
- */
|
|
|
- public void updateBufferSize(int bufferSize) throws TranslogException {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* write all buffered ops to disk and fsync file
|
|
|
*/
|
|
|
- public synchronized void sync() throws IOException { // synchronized to ensure only one sync happens a time
|
|
|
- // check if we really need to sync here...
|
|
|
+ public synchronized void sync() throws IOException {
|
|
|
if (syncNeeded()) {
|
|
|
- try (ReleasableLock lock = writeLock.acquire()) {
|
|
|
- ensureOpen();
|
|
|
- checkpoint(writtenOffset, operationCounter, channelReference);
|
|
|
- lastSyncedOffset = writtenOffset;
|
|
|
+ ensureOpen(); // this call gives a better exception that the incRef if we are closed by a tragic event
|
|
|
+ channelReference.incRef();
|
|
|
+ try {
|
|
|
+ final long offsetToSync;
|
|
|
+ final int opsCounter;
|
|
|
+ try (ReleasableLock lock = writeLock.acquire()) {
|
|
|
+ flush();
|
|
|
+ offsetToSync = totalOffset;
|
|
|
+ opsCounter = operationCounter;
|
|
|
+ }
|
|
|
+ // we can do this outside of the write lock but we have to protect from
|
|
|
+ // concurrent syncs
|
|
|
+ ensureOpen(); // just for kicks - the checkpoint happens or not either way
|
|
|
+ try {
|
|
|
+ checkpoint(offsetToSync, opsCounter, channelReference);
|
|
|
+ } catch (Throwable ex) {
|
|
|
+ closeWithTragicEvent(ex);
|
|
|
+ throw ex;
|
|
|
+ }
|
|
|
+ lastSyncedOffset = offsetToSync;
|
|
|
+ } finally {
|
|
|
+ channelReference.decRef();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -185,9 +192,7 @@ public class TranslogWriter extends TranslogReader {
|
|
|
/**
|
|
|
* returns true if there are buffered ops
|
|
|
*/
|
|
|
- public boolean syncNeeded() {
|
|
|
- return writtenOffset != lastSyncedOffset; // by default nothing is buffered
|
|
|
- }
|
|
|
+ public boolean syncNeeded() { return totalOffset != lastSyncedOffset; }
|
|
|
|
|
|
@Override
|
|
|
public int totalOperations() {
|
|
@@ -196,14 +201,29 @@ public class TranslogWriter extends TranslogReader {
|
|
|
|
|
|
@Override
|
|
|
public long sizeInBytes() {
|
|
|
- return writtenOffset;
|
|
|
+ return totalOffset;
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
* Flushes the buffer if the translog is buffered.
|
|
|
*/
|
|
|
- protected void flush() throws IOException {
|
|
|
+ private final void flush() throws IOException {
|
|
|
+ assert writeLock.isHeldByCurrentThread();
|
|
|
+ if (bufferCount > 0) {
|
|
|
+ ensureOpen();
|
|
|
+ // we use the channel to write, since on windows, writing to the RAF might not be reflected
|
|
|
+ // when reading through the channel
|
|
|
+ final int bufferSize = bufferCount;
|
|
|
+ try {
|
|
|
+ Channels.writeToChannel(buffer, 0, bufferSize, channel);
|
|
|
+ } catch (Throwable ex) {
|
|
|
+ closeWithTragicEvent(ex);
|
|
|
+ throw ex;
|
|
|
+ }
|
|
|
+ writtenOffset += bufferSize;
|
|
|
+ bufferCount = 0;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -292,13 +312,23 @@ public class TranslogWriter extends TranslogReader {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- protected void readBytes(ByteBuffer buffer, long position) throws IOException {
|
|
|
+ protected void readBytes(ByteBuffer targetBuffer, long position) throws IOException {
|
|
|
try (ReleasableLock lock = readLock.acquire()) {
|
|
|
- Channels.readFromFileChannelWithEofException(channel, position, buffer);
|
|
|
+ if (position >= writtenOffset) {
|
|
|
+ assert targetBuffer.hasArray() : "buffer must have array";
|
|
|
+ final int sourcePosition = (int) (position - writtenOffset);
|
|
|
+ System.arraycopy(buffer, sourcePosition,
|
|
|
+ targetBuffer.array(), targetBuffer.position(), targetBuffer.limit());
|
|
|
+ targetBuffer.position(targetBuffer.limit());
|
|
|
+ return;
|
|
|
+ }
|
|
|
}
|
|
|
+ // we don't have to have a read lock here because we only write ahead to the file, so all writes has been complete
|
|
|
+ // for the requested location.
|
|
|
+ Channels.readFromFileChannelWithEofException(channel, position, targetBuffer);
|
|
|
}
|
|
|
|
|
|
- protected synchronized void checkpoint(long lastSyncPosition, int operationCounter, ChannelReference channelReference) throws IOException {
|
|
|
+ private synchronized void checkpoint(long lastSyncPosition, int operationCounter, ChannelReference channelReference) throws IOException {
|
|
|
channelReference.getChannel().force(false);
|
|
|
writeCheckpoint(lastSyncPosition, operationCounter, channelReference.getPath().getParent(), channelReference.getGeneration(), StandardOpenOption.WRITE);
|
|
|
}
|
|
@@ -324,4 +354,19 @@ public class TranslogWriter extends TranslogReader {
|
|
|
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed", tragedy);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ class WrapperOutputStream extends OutputStream {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void write(int b) throws IOException {
|
|
|
+ buffer[bufferCount++] = (byte) b;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void write(byte[] b, int off, int len) throws IOException {
|
|
|
+ // we do safety checked when we decide to use this stream...
|
|
|
+ System.arraycopy(b, off, buffer, bufferCount, len);
|
|
|
+ bufferCount += len;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|