|
@@ -314,7 +314,7 @@ public class InternalEngine extends Engine {
|
|
|
try (ReleasableLock lock = readLock.acquire()) {
|
|
|
ensureOpen();
|
|
|
if (get.realtime()) {
|
|
|
- VersionValue versionValue = versionMap.getUnderLock(get.uid().bytes());
|
|
|
+ VersionValue versionValue = versionMap.getUnderLock(get.uid());
|
|
|
if (versionValue != null) {
|
|
|
if (versionValue.delete()) {
|
|
|
return GetResult.NOT_EXISTS;
|
|
@@ -336,6 +336,59 @@ public class InternalEngine extends Engine {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private boolean checkVersionConflict(
|
|
|
+ final Operation op,
|
|
|
+ final long currentVersion,
|
|
|
+ final long expectedVersion,
|
|
|
+ final boolean deleted) {
|
|
|
+ if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
|
|
|
+ if (op.origin().isRecovery()) {
|
|
|
+ // version conflict, but okay
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ // fatal version conflict
|
|
|
+ throw new VersionConflictEngineException(shardId, op.type(), op.id(),
|
|
|
+ op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ private long checkDeletedAndGCed(VersionValue versionValue) {
|
|
|
+ long currentVersion;
|
|
|
+ if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) {
|
|
|
+ currentVersion = Versions.NOT_FOUND; // deleted, and GC
|
|
|
+ } else {
|
|
|
+ currentVersion = versionValue.version();
|
|
|
+ }
|
|
|
+ return currentVersion;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static VersionValueSupplier NEW_VERSION_VALUE = (u, t, l) -> new VersionValue(u, l);
|
|
|
+
|
|
|
+ @FunctionalInterface
|
|
|
+ private interface VersionValueSupplier {
|
|
|
+ VersionValue apply(long updatedVersion, long time, Translog.Location location);
|
|
|
+ }
|
|
|
+
|
|
|
+ private <T extends Engine.Operation> void maybeAddToTranslog(
|
|
|
+ final T op,
|
|
|
+ final long updatedVersion,
|
|
|
+ final Function<T, Translog.Operation> toTranslogOp,
|
|
|
+ final VersionValueSupplier toVersionValue) throws IOException {
|
|
|
+ if (op.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
|
|
|
+ final Translog.Location translogLocation = translog.add(toTranslogOp.apply(op));
|
|
|
+ op.setTranslogLocation(translogLocation);
|
|
|
+ versionMap.putUnderLock(op.uid().bytes(), toVersionValue.apply(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), op.getTranslogLocation()));
|
|
|
+ } else {
|
|
|
+ // we do not replay in to the translog, so there is no
|
|
|
+ // translog location; that is okay because real-time
|
|
|
+ // gets are not possible during recovery and we will
|
|
|
+ // flush when the recovery is complete
|
|
|
+ versionMap.putUnderLock(op.uid().bytes(), toVersionValue.apply(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), null));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public boolean index(Index index) {
|
|
|
final boolean created;
|
|
@@ -361,72 +414,47 @@ public class InternalEngine extends Engine {
|
|
|
lastWriteNanos = index.startTime();
|
|
|
final long currentVersion;
|
|
|
final boolean deleted;
|
|
|
- VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
|
|
|
+ final VersionValue versionValue = versionMap.getUnderLock(index.uid());
|
|
|
if (versionValue == null) {
|
|
|
currentVersion = loadCurrentVersionFromIndex(index.uid());
|
|
|
deleted = currentVersion == Versions.NOT_FOUND;
|
|
|
} else {
|
|
|
+ currentVersion = checkDeletedAndGCed(versionValue);
|
|
|
deleted = versionValue.delete();
|
|
|
- if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) {
|
|
|
- currentVersion = Versions.NOT_FOUND; // deleted, and GC
|
|
|
- } else {
|
|
|
- currentVersion = versionValue.version();
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
- long expectedVersion = index.version();
|
|
|
- if (isVersionConflictForWrites(index, currentVersion, deleted, expectedVersion)) {
|
|
|
- if (!index.origin().isRecovery()) {
|
|
|
- throw new VersionConflictEngineException(shardId, index.type(), index.id(),
|
|
|
- index.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
- long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
|
|
|
+ final long expectedVersion = index.version();
|
|
|
+ if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) return false;
|
|
|
|
|
|
- final boolean created;
|
|
|
- index.updateVersion(updatedVersion);
|
|
|
+ final long updatedVersion = updateVersion(index, currentVersion, expectedVersion);
|
|
|
|
|
|
- if (currentVersion == Versions.NOT_FOUND) {
|
|
|
- // document does not exists, we can optimize for create
|
|
|
- created = true;
|
|
|
- index(index, indexWriter);
|
|
|
- } else {
|
|
|
- created = update(index, versionValue, indexWriter);
|
|
|
- }
|
|
|
+ final boolean created = indexOrUpdate(index, currentVersion, versionValue);
|
|
|
|
|
|
- if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
|
|
|
- final Translog.Location translogLocation = translog.add(new Translog.Index(index));
|
|
|
- index.setTranslogLocation(translogLocation);
|
|
|
- versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, index.getTranslogLocation()));
|
|
|
- } else {
|
|
|
- // we do not replay in to the translog, so there is no
|
|
|
- // translog location; that is okay because real-time
|
|
|
- // gets are not possible during recovery and we will
|
|
|
- // flush when the recovery is complete
|
|
|
- versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, null));
|
|
|
- }
|
|
|
+ maybeAddToTranslog(index, updatedVersion, Translog.Index::new, NEW_VERSION_VALUE);
|
|
|
|
|
|
return created;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static boolean update(Index index, VersionValue versionValue, IndexWriter indexWriter) throws IOException {
|
|
|
- boolean created;
|
|
|
- if (versionValue != null) {
|
|
|
- created = versionValue.delete(); // we have a delete which is not GC'ed...
|
|
|
- } else {
|
|
|
- created = false;
|
|
|
- }
|
|
|
- if (index.docs().size() > 1) {
|
|
|
- indexWriter.updateDocuments(index.uid(), index.docs());
|
|
|
+ private long updateVersion(Engine.Operation op, long currentVersion, long expectedVersion) {
|
|
|
+ final long updatedVersion = op.versionType().updateVersion(currentVersion, expectedVersion);
|
|
|
+ op.updateVersion(updatedVersion);
|
|
|
+ return updatedVersion;
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean indexOrUpdate(final Index index, final long currentVersion, final VersionValue versionValue) throws IOException {
|
|
|
+ final boolean created;
|
|
|
+ if (currentVersion == Versions.NOT_FOUND) {
|
|
|
+ // document does not exists, we can optimize for create
|
|
|
+ created = true;
|
|
|
+ index(index, indexWriter);
|
|
|
} else {
|
|
|
- indexWriter.updateDocument(index.uid(), index.docs().get(0));
|
|
|
+ created = update(index, versionValue, indexWriter);
|
|
|
}
|
|
|
return created;
|
|
|
}
|
|
|
|
|
|
- private static void index(Index index, IndexWriter indexWriter) throws IOException {
|
|
|
+ private static void index(final Index index, final IndexWriter indexWriter) throws IOException {
|
|
|
if (index.docs().size() > 1) {
|
|
|
indexWriter.addDocuments(index.docs());
|
|
|
} else {
|
|
@@ -434,8 +462,19 @@ public class InternalEngine extends Engine {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private boolean isVersionConflictForWrites(Index index, long currentVersion, boolean deleted, long expectedVersion) {
|
|
|
- return index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted);
|
|
|
+ private static boolean update(final Index index, final VersionValue versionValue, final IndexWriter indexWriter) throws IOException {
|
|
|
+ final boolean created;
|
|
|
+ if (versionValue != null) {
|
|
|
+ created = versionValue.delete(); // we have a delete which is not GC'ed...
|
|
|
+ } else {
|
|
|
+ created = false;
|
|
|
+ }
|
|
|
+ if (index.docs().size() > 1) {
|
|
|
+ indexWriter.updateDocuments(index.uid(), index.docs());
|
|
|
+ } else {
|
|
|
+ indexWriter.updateDocument(index.uid(), index.docs().get(0));
|
|
|
+ }
|
|
|
+ return created;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -465,57 +504,42 @@ public class InternalEngine extends Engine {
|
|
|
lastWriteNanos = delete.startTime();
|
|
|
final long currentVersion;
|
|
|
final boolean deleted;
|
|
|
- VersionValue versionValue = versionMap.getUnderLock(delete.uid().bytes());
|
|
|
+ final VersionValue versionValue = versionMap.getUnderLock(delete.uid());
|
|
|
if (versionValue == null) {
|
|
|
currentVersion = loadCurrentVersionFromIndex(delete.uid());
|
|
|
deleted = currentVersion == Versions.NOT_FOUND;
|
|
|
} else {
|
|
|
+ currentVersion = checkDeletedAndGCed(versionValue);
|
|
|
deleted = versionValue.delete();
|
|
|
- if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) {
|
|
|
- currentVersion = Versions.NOT_FOUND; // deleted, and GC
|
|
|
- } else {
|
|
|
- currentVersion = versionValue.version();
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
- long updatedVersion;
|
|
|
- long expectedVersion = delete.version();
|
|
|
- if (delete.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
|
|
|
- if (delete.origin().isRecovery()) {
|
|
|
- return;
|
|
|
- } else {
|
|
|
- throw new VersionConflictEngineException(shardId, delete.type(), delete.id(),
|
|
|
- delete.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
|
|
|
- }
|
|
|
- }
|
|
|
- updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
|
|
|
- final boolean found;
|
|
|
- if (currentVersion == Versions.NOT_FOUND) {
|
|
|
- // doc does not exist and no prior deletes
|
|
|
- found = false;
|
|
|
- } else if (versionValue != null && versionValue.delete()) {
|
|
|
- // a "delete on delete", in this case, we still increment the version, log it, and return that version
|
|
|
- found = false;
|
|
|
- } else {
|
|
|
- // we deleted a currently existing document
|
|
|
- indexWriter.deleteDocuments(delete.uid());
|
|
|
- found = true;
|
|
|
- }
|
|
|
+ final long expectedVersion = delete.version();
|
|
|
+ if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) return;
|
|
|
+
|
|
|
+ final long updatedVersion = updateVersion(delete, currentVersion, expectedVersion);
|
|
|
+
|
|
|
+ final boolean found = deleteIfFound(delete, currentVersion, deleted, versionValue);
|
|
|
|
|
|
delete.updateVersion(updatedVersion, found);
|
|
|
|
|
|
- if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
|
|
|
- final Translog.Location translogLocation = translog.add(new Translog.Delete(delete));
|
|
|
- delete.setTranslogLocation(translogLocation);
|
|
|
- versionMap.putUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), delete.getTranslogLocation()));
|
|
|
- } else {
|
|
|
- // we do not replay in to the translog, so there is no
|
|
|
- // translog location; that is okay because real-time
|
|
|
- // gets are not possible during recovery and we will
|
|
|
- // flush when the recovery is complete
|
|
|
- versionMap.putUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), null));
|
|
|
- }
|
|
|
+ maybeAddToTranslog(delete, updatedVersion, Translog.Delete::new, DeleteVersionValue::new);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean deleteIfFound(Delete delete, long currentVersion, boolean deleted, VersionValue versionValue) throws IOException {
|
|
|
+ final boolean found;
|
|
|
+ if (currentVersion == Versions.NOT_FOUND) {
|
|
|
+ // doc does not exist and no prior deletes
|
|
|
+ found = false;
|
|
|
+ } else if (versionValue != null && deleted) {
|
|
|
+ // a "delete on delete", in this case, we still increment the version, log it, and return that version
|
|
|
+ found = false;
|
|
|
+ } else {
|
|
|
+ // we deleted a currently existing document
|
|
|
+ indexWriter.deleteDocuments(delete.uid());
|
|
|
+ found = true;
|
|
|
}
|
|
|
+ return found;
|
|
|
}
|
|
|
|
|
|
@Override
|