|
@@ -22,32 +22,42 @@ package org.elasticsearch.index.translog;
|
|
|
import com.carrotsearch.hppc.LongArrayList;
|
|
|
import com.carrotsearch.hppc.procedures.LongProcedure;
|
|
|
import org.apache.lucene.store.AlreadyClosedException;
|
|
|
+import org.apache.lucene.util.BytesRef;
|
|
|
+import org.apache.lucene.util.BytesRefIterator;
|
|
|
import org.elasticsearch.Assertions;
|
|
|
+import org.elasticsearch.common.SuppressForbidden;
|
|
|
import org.elasticsearch.common.bytes.BytesArray;
|
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
|
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
|
|
|
import org.elasticsearch.common.collect.Tuple;
|
|
|
import org.elasticsearch.common.io.Channels;
|
|
|
+import org.elasticsearch.common.io.DiskIoBufferPool;
|
|
|
+import org.elasticsearch.common.lease.Releasable;
|
|
|
+import org.elasticsearch.common.lease.Releasables;
|
|
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
+import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
|
|
import org.elasticsearch.core.internal.io.IOUtils;
|
|
|
import org.elasticsearch.index.seqno.SequenceNumbers;
|
|
|
import org.elasticsearch.index.shard.ShardId;
|
|
|
|
|
|
-import java.io.BufferedOutputStream;
|
|
|
import java.io.Closeable;
|
|
|
import java.io.IOException;
|
|
|
-import java.io.OutputStream;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.FileChannel;
|
|
|
import java.nio.file.Path;
|
|
|
import java.nio.file.StandardOpenOption;
|
|
|
+import java.util.ArrayDeque;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.locks.ReentrantLock;
|
|
|
import java.util.function.LongConsumer;
|
|
|
import java.util.function.LongSupplier;
|
|
|
|
|
|
public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
|
+
|
|
|
private final ShardId shardId;
|
|
|
private final FileChannel checkpointChannel;
|
|
|
private final Path checkpointPath;
|
|
@@ -57,8 +67,6 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
|
private volatile int operationCounter;
|
|
|
/* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */
|
|
|
private final TragicExceptionHolder tragedy;
|
|
|
- /* A buffered outputstream what writes to the writers channel */
|
|
|
- private final OutputStream outputStream;
|
|
|
/* the total offset of this file including the bytes written to the file as well as into the buffer */
|
|
|
private volatile long totalOffset;
|
|
|
|
|
@@ -72,10 +80,14 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
|
private final LongConsumer persistedSequenceNumberConsumer;
|
|
|
|
|
|
protected final AtomicBoolean closed = new AtomicBoolean(false);
|
|
|
- // lock order synchronized(syncLock) -> synchronized(this)
|
|
|
+ // lock order try(Releasable lock = writeLock.acquire()) -> synchronized(this)
|
|
|
+ private final ReleasableLock writeLock = new ReleasableLock(new ReentrantLock());
|
|
|
+ // lock order synchronized(syncLock) -> try(Releasable lock = writeLock.acquire()) -> synchronized(this)
|
|
|
private final Object syncLock = new Object();
|
|
|
|
|
|
- private LongArrayList nonFsyncedSequenceNumbers;
|
|
|
+ private final int forceWriteThreshold;
|
|
|
+ private final ArrayList<Operation> bufferedOps = new ArrayList<>();
|
|
|
+ private long bufferedBytes = 0L;
|
|
|
|
|
|
private final Map<Long, Tuple<BytesReference, Exception>> seenSequenceNumbers;
|
|
|
|
|
@@ -96,11 +108,11 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
|
assert initialCheckpoint.offset == channel.position() :
|
|
|
"initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel position ["
|
|
|
+ channel.position() + "]";
|
|
|
+ this.forceWriteThreshold = Math.toIntExact(bufferSize.getBytes());
|
|
|
this.shardId = shardId;
|
|
|
this.checkpointChannel = checkpointChannel;
|
|
|
this.checkpointPath = checkpointPath;
|
|
|
this.minTranslogGenerationSupplier = minTranslogGenerationSupplier;
|
|
|
- this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channel), bufferSize.bytesAsInt());
|
|
|
this.lastSyncedCheckpoint = initialCheckpoint;
|
|
|
this.totalOffset = initialCheckpoint.offset;
|
|
|
assert initialCheckpoint.minSeqNo == SequenceNumbers.NO_OPS_PERFORMED : initialCheckpoint.minSeqNo;
|
|
@@ -109,7 +121,6 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
|
this.maxSeqNo = initialCheckpoint.maxSeqNo;
|
|
|
assert initialCheckpoint.trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : initialCheckpoint.trimmedAboveSeqNo;
|
|
|
this.globalCheckpointSupplier = globalCheckpointSupplier;
|
|
|
- this.nonFsyncedSequenceNumbers = new LongArrayList(64);
|
|
|
this.persistedSequenceNumberConsumer = persistedSequenceNumberConsumer;
|
|
|
this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null;
|
|
|
this.tragedy = tragedy;
|
|
@@ -162,10 +173,6 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * add the given bytes to the translog and return the location they were written at
|
|
|
- */
|
|
|
-
|
|
|
/**
|
|
|
* Add the given bytes to the translog with the specified sequence number; returns the location the bytes were written to.
|
|
|
*
|
|
@@ -174,34 +181,35 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
|
* @return the location the bytes were written to
|
|
|
* @throws IOException if writing to the translog resulted in an I/O exception
|
|
|
*/
|
|
|
- public synchronized Translog.Location add(final BytesReference data, final long seqNo) throws IOException {
|
|
|
- ensureOpen();
|
|
|
- final long offset = totalOffset;
|
|
|
- try {
|
|
|
- data.writeTo(outputStream);
|
|
|
- } catch (final Exception ex) {
|
|
|
- closeWithTragicEvent(ex);
|
|
|
- throw ex;
|
|
|
- }
|
|
|
- totalOffset += data.length();
|
|
|
+ public Translog.Location add(final ReleasableBytesReference data, final long seqNo) throws IOException {
|
|
|
+ final Translog.Location location;
|
|
|
+ final long bytesBufferedAfterAdd;
|
|
|
+ synchronized (this) {
|
|
|
+ ensureOpen();
|
|
|
+ final long offset = totalOffset;
|
|
|
+ totalOffset += data.length();
|
|
|
+ bufferedBytes += data.length();
|
|
|
+ bufferedOps.add(new Operation(seqNo, data.retain()));
|
|
|
|
|
|
- if (minSeqNo == SequenceNumbers.NO_OPS_PERFORMED) {
|
|
|
- assert operationCounter == 0;
|
|
|
- }
|
|
|
- if (maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED) {
|
|
|
- assert operationCounter == 0;
|
|
|
- }
|
|
|
+ assert minSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0;
|
|
|
+ assert maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0;
|
|
|
|
|
|
- minSeqNo = SequenceNumbers.min(minSeqNo, seqNo);
|
|
|
- maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNo);
|
|
|
+ minSeqNo = SequenceNumbers.min(minSeqNo, seqNo);
|
|
|
+ maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNo);
|
|
|
|
|
|
- nonFsyncedSequenceNumbers.add(seqNo);
|
|
|
+ operationCounter++;
|
|
|
|
|
|
- operationCounter++;
|
|
|
+ assert assertNoSeqNumberConflict(seqNo, data);
|
|
|
|
|
|
- assert assertNoSeqNumberConflict(seqNo, data);
|
|
|
+ location = new Translog.Location(generation, offset, data.length());
|
|
|
+ bytesBufferedAfterAdd = bufferedBytes;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (bytesBufferedAfterAdd >= forceWriteThreshold) {
|
|
|
+ writeBufferedOps(Long.MAX_VALUE, bytesBufferedAfterAdd >= forceWriteThreshold * 4);
|
|
|
+ }
|
|
|
|
|
|
- return new Translog.Location(generation, offset, data.length());
|
|
|
+ return location;
|
|
|
}
|
|
|
|
|
|
private synchronized boolean assertNoSeqNumberConflict(long seqNo, BytesReference data) throws IOException {
|
|
@@ -211,9 +219,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
|
final Tuple<BytesReference, Exception> previous = seenSequenceNumbers.get(seqNo);
|
|
|
if (previous.v1().equals(data) == false) {
|
|
|
Translog.Operation newOp = Translog.readOperation(
|
|
|
- new BufferedChecksumStreamInput(data.streamInput(), "assertion"));
|
|
|
+ new BufferedChecksumStreamInput(data.streamInput(), "assertion"));
|
|
|
Translog.Operation prvOp = Translog.readOperation(
|
|
|
- new BufferedChecksumStreamInput(previous.v1().streamInput(), "assertion"));
|
|
|
+ new BufferedChecksumStreamInput(previous.v1().streamInput(), "assertion"));
|
|
|
// TODO: We haven't had timestamp for Index operations in Lucene yet, we need to loosen this check without timestamp.
|
|
|
final boolean sameOp;
|
|
|
if (newOp instanceof Translog.Index && prvOp instanceof Translog.Index) {
|
|
@@ -250,7 +258,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
|
final Translog.Operation op;
|
|
|
try {
|
|
|
op = Translog.readOperation(
|
|
|
- new BufferedChecksumStreamInput(e.getValue().v1().streamInput(), "assertion"));
|
|
|
+ new BufferedChecksumStreamInput(e.getValue().v1().streamInput(), "assertion"));
|
|
|
} catch (IOException ex) {
|
|
|
throw new RuntimeException(ex);
|
|
|
}
|
|
@@ -309,28 +317,36 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
|
public TranslogReader closeIntoReader() throws IOException {
|
|
|
// make sure to acquire the sync lock first, to prevent dead locks with threads calling
|
|
|
// syncUpTo() , where the sync lock is acquired first, following by the synchronize(this)
|
|
|
+ // After the sync lock we acquire the write lock to avoid deadlocks with threads writing where
|
|
|
+ // the write lock is acquired first followed by synchronize(this).
|
|
|
//
|
|
|
// Note: While this is not strictly needed as this method is called while blocking all ops on the translog,
|
|
|
// we do this to for correctness and preventing future issues.
|
|
|
synchronized (syncLock) {
|
|
|
- synchronized (this) {
|
|
|
- try {
|
|
|
- sync(); // sync before we close..
|
|
|
- } catch (final Exception ex) {
|
|
|
- closeWithTragicEvent(ex);
|
|
|
- throw ex;
|
|
|
- }
|
|
|
- if (closed.compareAndSet(false, true)) {
|
|
|
+ try (ReleasableLock toClose = writeLock.acquire()) {
|
|
|
+ synchronized (this) {
|
|
|
try {
|
|
|
- checkpointChannel.close();
|
|
|
+ sync(); // sync before we close..
|
|
|
} catch (final Exception ex) {
|
|
|
closeWithTragicEvent(ex);
|
|
|
throw ex;
|
|
|
}
|
|
|
- return new TranslogReader(getLastSyncedCheckpoint(), channel, path, header);
|
|
|
- } else {
|
|
|
- throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]",
|
|
|
+ // If we reached this point, all of the buffered ops should have been flushed successfully.
|
|
|
+ assert bufferedOps.size() == 0;
|
|
|
+ assert checkChannelPositionWhileHandlingException(totalOffset);
|
|
|
+ assert totalOffset == lastSyncedCheckpoint.offset;
|
|
|
+ if (closed.compareAndSet(false, true)) {
|
|
|
+ try {
|
|
|
+ checkpointChannel.close();
|
|
|
+ } catch (final Exception ex) {
|
|
|
+ closeWithTragicEvent(ex);
|
|
|
+ throw ex;
|
|
|
+ }
|
|
|
+ return new TranslogReader(getLastSyncedCheckpoint(), channel, path, header);
|
|
|
+ } else {
|
|
|
+ throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed (path [" + path + "]",
|
|
|
tragedy.get());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -341,15 +357,23 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
|
public TranslogSnapshot newSnapshot() {
|
|
|
// make sure to acquire the sync lock first, to prevent dead locks with threads calling
|
|
|
// syncUpTo() , where the sync lock is acquired first, following by the synchronize(this)
|
|
|
+ // After the sync lock we acquire the write lock to avoid deadlocks with threads writing where
|
|
|
+ // the write lock is acquired first followed by synchronize(this).
|
|
|
synchronized (syncLock) {
|
|
|
- synchronized (this) {
|
|
|
- ensureOpen();
|
|
|
- try {
|
|
|
- sync();
|
|
|
- } catch (IOException e) {
|
|
|
- throw new TranslogException(shardId, "exception while syncing before creating a snapshot", e);
|
|
|
+ try (ReleasableLock toClose = writeLock.acquire()) {
|
|
|
+ synchronized (this) {
|
|
|
+ ensureOpen();
|
|
|
+ try {
|
|
|
+ sync();
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new TranslogException(shardId, "exception while syncing before creating a snapshot", e);
|
|
|
+ }
|
|
|
+ // If we reached this point, all of the buffered ops should have been flushed successfully.
|
|
|
+ assert bufferedOps.size() == 0;
|
|
|
+ assert checkChannelPositionWhileHandlingException(totalOffset);
|
|
|
+ assert totalOffset == lastSyncedCheckpoint.offset;
|
|
|
+ return super.newSnapshot();
|
|
|
}
|
|
|
- return super.newSnapshot();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -371,13 +395,21 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
|
// the lock we should check again since if this code is busy we might have fsynced enough already
|
|
|
final Checkpoint checkpointToSync;
|
|
|
final LongArrayList flushedSequenceNumbers;
|
|
|
- synchronized (this) {
|
|
|
- ensureOpen();
|
|
|
- try {
|
|
|
- outputStream.flush();
|
|
|
+ final ArrayDeque<Operation> toWrite;
|
|
|
+ try (ReleasableLock toClose = writeLock.acquire()) {
|
|
|
+ synchronized (this) {
|
|
|
+ ensureOpen();
|
|
|
checkpointToSync = getCheckpoint();
|
|
|
- flushedSequenceNumbers = nonFsyncedSequenceNumbers;
|
|
|
- nonFsyncedSequenceNumbers = new LongArrayList(64);
|
|
|
+ toWrite = pollOpsToWrite();
|
|
|
+ }
|
|
|
+ flushedSequenceNumbers = new LongArrayList(toWrite.size());
|
|
|
+ for (Operation operation : toWrite) {
|
|
|
+ flushedSequenceNumbers.add(operation.seqNo);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ // Write ops will release operations.
|
|
|
+ writeAndReleaseOps(toWrite);
|
|
|
} catch (final Exception ex) {
|
|
|
closeWithTragicEvent(ex);
|
|
|
throw ex;
|
|
@@ -403,19 +435,76 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
+ private void writeBufferedOps(long offset, boolean blockOnExistingWriter) throws IOException {
|
|
|
+ try (ReleasableLock locked = blockOnExistingWriter ? writeLock.acquire() : writeLock.tryAcquire()) {
|
|
|
+ try {
|
|
|
+ if (locked != null && offset > getWrittenOffset()) {
|
|
|
+ writeAndReleaseOps(pollOpsToWrite());
|
|
|
+ }
|
|
|
+ } catch (IOException e){
|
|
|
+ closeWithTragicEvent(e);
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized ArrayDeque<Operation> pollOpsToWrite() {
|
|
|
+ ensureOpen();
|
|
|
+ final ArrayDeque<Operation> operationsToWrite = new ArrayDeque<>(bufferedOps.size());
|
|
|
+ operationsToWrite.addAll(bufferedOps);
|
|
|
+ bufferedOps.clear();
|
|
|
+ bufferedBytes = 0;
|
|
|
+ return operationsToWrite;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void writeAndReleaseOps(final ArrayDeque<Operation> operationsToWrite) throws IOException {
|
|
|
+ try {
|
|
|
+ assert writeLock.isHeldByCurrentThread();
|
|
|
+ ByteBuffer ioBuffer = DiskIoBufferPool.getIoBuffer();
|
|
|
+
|
|
|
+ Operation operation;
|
|
|
+ while ((operation = operationsToWrite.pollFirst()) != null) {
|
|
|
+ try (Releasable toClose = operation) {
|
|
|
+ BytesRefIterator iterator = operation.bytesReference.iterator();
|
|
|
+ BytesRef current;
|
|
|
+ while ((current = iterator.next()) != null) {
|
|
|
+ int currentBytesConsumed = 0;
|
|
|
+ while (currentBytesConsumed != current.length) {
|
|
|
+ int nBytesToWrite = Math.min(current.length - currentBytesConsumed, ioBuffer.remaining());
|
|
|
+ ioBuffer.put(current.bytes, current.offset + currentBytesConsumed, nBytesToWrite);
|
|
|
+ currentBytesConsumed += nBytesToWrite;
|
|
|
+ if (ioBuffer.hasRemaining() == false) {
|
|
|
+ ioBuffer.flip();
|
|
|
+ writeToFile(ioBuffer);
|
|
|
+ ioBuffer.clear();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ioBuffer.flip();
|
|
|
+ writeToFile(ioBuffer);
|
|
|
+ } finally {
|
|
|
+ Releasables.close(operationsToWrite);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressForbidden(reason = "Channel#write")
|
|
|
+ private void writeToFile(ByteBuffer ioBuffer) throws IOException {
|
|
|
+ while (ioBuffer.remaining() > 0) {
|
|
|
+ channel.write(ioBuffer);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected void readBytes(ByteBuffer targetBuffer, long position) throws IOException {
|
|
|
try {
|
|
|
if (position + targetBuffer.remaining() > getWrittenOffset()) {
|
|
|
- synchronized (this) {
|
|
|
- // we only flush here if it's really really needed - try to minimize the impact of the read operation
|
|
|
- // in some cases ie. a tragic event we might still be able to read the relevant value
|
|
|
- // which is not really important in production but some test can make most strict assumptions
|
|
|
- // if we don't fail in this call unless absolutely necessary.
|
|
|
- if (position + targetBuffer.remaining() > getWrittenOffset()) {
|
|
|
- outputStream.flush();
|
|
|
- }
|
|
|
- }
|
|
|
+ // we only flush here if it's really really needed - try to minimize the impact of the read operation
|
|
|
+ // in some cases ie. a tragic event we might still be able to read the relevant value
|
|
|
+ // which is not really important in production but some test can make most strict assumptions
|
|
|
+ // if we don't fail in this call unless absolutely necessary.
|
|
|
+ writeBufferedOps(position + targetBuffer.remaining(), true);
|
|
|
}
|
|
|
} catch (final Exception ex) {
|
|
|
closeWithTragicEvent(ex);
|
|
@@ -448,9 +537,21 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private boolean checkChannelPositionWhileHandlingException(long expectedOffset) {
|
|
|
+ try {
|
|
|
+ return expectedOffset == channel.position();
|
|
|
+ } catch (IOException e) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public final void close() throws IOException {
|
|
|
if (closed.compareAndSet(false, true)) {
|
|
|
+ synchronized (this) {
|
|
|
+ Releasables.closeWhileHandlingException(bufferedOps);
|
|
|
+ bufferedOps.clear();
|
|
|
+ }
|
|
|
IOUtils.close(checkpointChannel, channel);
|
|
|
}
|
|
|
}
|
|
@@ -459,32 +560,19 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
|
return closed.get();
|
|
|
}
|
|
|
|
|
|
+ private static class Operation implements Releasable {
|
|
|
|
|
|
- private final class BufferedChannelOutputStream extends BufferedOutputStream {
|
|
|
+ private final long seqNo;
|
|
|
+ private final ReleasableBytesReference bytesReference;
|
|
|
|
|
|
- BufferedChannelOutputStream(OutputStream out, int size) throws IOException {
|
|
|
- super(out, size);
|
|
|
+ private Operation(long seqNo, ReleasableBytesReference bytesReference) {
|
|
|
+ this.seqNo = seqNo;
|
|
|
+ this.bytesReference = bytesReference;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public synchronized void flush() throws IOException {
|
|
|
- if (count > 0) {
|
|
|
- try {
|
|
|
- ensureOpen();
|
|
|
- super.flush();
|
|
|
- } catch (final Exception ex) {
|
|
|
- closeWithTragicEvent(ex);
|
|
|
- throw ex;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void close() throws IOException {
|
|
|
- // the stream is intentionally not closed because
|
|
|
- // closing it will close the FileChannel
|
|
|
- throw new IllegalStateException("never close this stream");
|
|
|
+ public void close() {
|
|
|
+ Releasables.closeWhileHandlingException(bytesReference);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
}
|