|
@@ -39,6 +39,7 @@ import java.nio.file.OpenOption;
|
|
import java.nio.file.Path;
|
|
import java.nio.file.Path;
|
|
import java.nio.file.StandardOpenOption;
|
|
import java.nio.file.StandardOpenOption;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
+import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
|
|
|
|
@@ -60,7 +61,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
private volatile long totalOffset;
|
|
private volatile long totalOffset;
|
|
|
|
|
|
protected final AtomicBoolean closed = new AtomicBoolean(false);
|
|
protected final AtomicBoolean closed = new AtomicBoolean(false);
|
|
-
|
|
|
|
|
|
+ // lock order synchronized(syncLock) -> synchronized(this)
|
|
|
|
+ private final Object syncLock = new Object();
|
|
|
|
|
|
public TranslogWriter(ShardId shardId, long generation, FileChannel channel, Path path, ByteSizeValue bufferSize) throws IOException {
|
|
public TranslogWriter(ShardId shardId, long generation, FileChannel channel, Path path, ByteSizeValue bufferSize) throws IOException {
|
|
super(generation, channel, path, channel.position());
|
|
super(generation, channel, path, channel.position());
|
|
@@ -146,23 +148,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
* raising the exception.
|
|
* raising the exception.
|
|
*/
|
|
*/
|
|
public void sync() throws IOException {
|
|
public void sync() throws IOException {
|
|
- if (syncNeeded()) {
|
|
|
|
- synchronized (this) {
|
|
|
|
- ensureOpen();
|
|
|
|
- final long offsetToSync;
|
|
|
|
- final int opsCounter;
|
|
|
|
- try {
|
|
|
|
- outputStream.flush();
|
|
|
|
- offsetToSync = totalOffset;
|
|
|
|
- opsCounter = operationCounter;
|
|
|
|
- checkpoint(offsetToSync, opsCounter, generation, channel, path);
|
|
|
|
- } catch (Throwable ex) {
|
|
|
|
- closeWithTragicEvent(ex);
|
|
|
|
- throw ex;
|
|
|
|
- }
|
|
|
|
- lastSyncedOffset = offsetToSync;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ syncUpTo(Long.MAX_VALUE);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -229,9 +215,38 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
* @return <code>true</code> if this call caused an actual sync operation
|
|
* @return <code>true</code> if this call caused an actual sync operation
|
|
*/
|
|
*/
|
|
public boolean syncUpTo(long offset) throws IOException {
|
|
public boolean syncUpTo(long offset) throws IOException {
|
|
- if (lastSyncedOffset < offset) {
|
|
|
|
- sync();
|
|
|
|
- return true;
|
|
|
|
|
|
+ if (lastSyncedOffset < offset && syncNeeded()) {
|
|
|
|
+ synchronized (syncLock) { // only one sync/checkpoint should happen concurrently but we wait
|
|
|
|
+ if (lastSyncedOffset < offset && syncNeeded()) {
|
|
|
|
+ // double checked locking - we don't want to fsync unless we have to and now that we have
|
|
|
|
+ // the lock we should check again since if this code is busy we might have fsynced enough already
|
|
|
|
+ final long offsetToSync;
|
|
|
|
+ final int opsCounter;
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ ensureOpen();
|
|
|
|
+ try {
|
|
|
|
+ outputStream.flush();
|
|
|
|
+ offsetToSync = totalOffset;
|
|
|
|
+ opsCounter = operationCounter;
|
|
|
|
+ } catch (Throwable ex) {
|
|
|
|
+ closeWithTragicEvent(ex);
|
|
|
|
+ throw ex;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ // now do the actual fsync outside of the synchronized block such that
|
|
|
|
+ // we can continue writing to the buffer etc.
|
|
|
|
+ try {
|
|
|
|
+ channel.force(false);
|
|
|
|
+ writeCheckpoint(offsetToSync, opsCounter, path.getParent(), generation, StandardOpenOption.WRITE);
|
|
|
|
+ } catch (Throwable ex) {
|
|
|
|
+ closeWithTragicEvent(ex);
|
|
|
|
+ throw ex;
|
|
|
|
+ }
|
|
|
|
+ assert lastSyncedOffset <= offsetToSync : "illegal state: " + lastSyncedOffset + " <= " + offsetToSync;
|
|
|
|
+ lastSyncedOffset = offsetToSync; // write protected by syncLock
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
@@ -254,11 +269,6 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
Channels.readFromFileChannelWithEofException(channel, position, targetBuffer);
|
|
Channels.readFromFileChannelWithEofException(channel, position, targetBuffer);
|
|
}
|
|
}
|
|
|
|
|
|
- private synchronized void checkpoint(long lastSyncPosition, int operationCounter, long generation, FileChannel translogFileChannel, Path translogFilePath) throws IOException {
|
|
|
|
- translogFileChannel.force(false);
|
|
|
|
- writeCheckpoint(lastSyncPosition, operationCounter, translogFilePath.getParent(), generation, StandardOpenOption.WRITE);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
private static void writeCheckpoint(long syncPosition, int numOperations, Path translogFile, long generation, OpenOption... options) throws IOException {
|
|
private static void writeCheckpoint(long syncPosition, int numOperations, Path translogFile, long generation, OpenOption... options) throws IOException {
|
|
final Path checkpointFile = translogFile.resolve(Translog.CHECKPOINT_FILE_NAME);
|
|
final Path checkpointFile = translogFile.resolve(Translog.CHECKPOINT_FILE_NAME);
|
|
Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation);
|
|
Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation);
|
|
@@ -269,7 +279,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|
|
|
|
|
static final ChannelFactory DEFAULT = new ChannelFactory();
|
|
static final ChannelFactory DEFAULT = new ChannelFactory();
|
|
|
|
|
|
- // only for testing until we have a disk-full FileSystemt
|
|
|
|
|
|
+ // only for testing until we have a disk-full FileSystem
|
|
public FileChannel open(Path file) throws IOException {
|
|
public FileChannel open(Path file) throws IOException {
|
|
return FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
|
|
return FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
|
|
}
|
|
}
|