|
|
@@ -32,7 +32,7 @@ import java.util.Map;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
/** Maps _uid value to its version information. */
|
|
|
-class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
|
|
|
+final class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
|
|
|
|
|
|
/**
|
|
|
* Resets the internal map and adjusts it's capacity as if there were no indexing operations.
|
|
|
@@ -46,22 +46,100 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
|
|
|
maps = new Maps();
|
|
|
}
|
|
|
|
|
|
- private static class Maps {
|
|
|
+ private static final class VersionLookup {
|
|
|
+
|
|
|
+ private static final VersionLookup EMPTY = new VersionLookup(Collections.emptyMap());
|
|
|
+ private final Map<BytesRef,VersionValue> map;
|
|
|
+
|
|
|
+ // each version map has a notion of safe / unsafe which allows us to apply certain optimization in the auto-generated ID usecase
|
|
|
+ // where we know that documents can't have any duplicates so we can skip the version map entirely. This reduces
|
|
|
+ // the memory pressure significantly for this use-case where we often get a massive amount of small document (metrics).
|
|
|
+ // if the version map is in safeAccess mode we track all version in the version map. yet if a document comes in that needs
|
|
|
+ // safe access but we are not in this mode we force a refresh and make the map as safe access required. All subsequent ops will
|
|
|
+ // respect that and fill the version map. The nice part here is that we are only really requiring this for a single ID and since
|
|
|
+ // we hold the ID lock in the engine while we do all this it's safe to do it globally unlocked.
|
|
|
+ // NOTE: these values can both be non-volatile since it's ok to read a stale value per doc ID. We serialize changes in the engine
|
|
|
+ // that will prevent concurrent updates to the same document ID and therefore we can rely on the happens-before guanratee of the
|
|
|
+ // map reference itself.
|
|
|
+ private boolean unsafe;
|
|
|
+
|
|
|
+ private VersionLookup(Map<BytesRef, VersionValue> map) {
|
|
|
+ this.map = map;
|
|
|
+ }
|
|
|
+
|
|
|
+ VersionValue get(BytesRef key) {
|
|
|
+ return map.get(key);
|
|
|
+ }
|
|
|
+
|
|
|
+ VersionValue put(BytesRef key, VersionValue value) {
|
|
|
+ return map.put(key, value);
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isEmpty() {
|
|
|
+ return map.isEmpty();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ int size() {
|
|
|
+ return map.size();
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isUnsafe() {
|
|
|
+ return unsafe;
|
|
|
+ }
|
|
|
+
|
|
|
+ void markAsUnsafe() {
|
|
|
+ unsafe = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static final class Maps {
|
|
|
|
|
|
// All writes (adds and deletes) go into here:
|
|
|
- final Map<BytesRef,VersionValue> current;
|
|
|
+ final VersionLookup current;
|
|
|
|
|
|
// Used while refresh is running, and to hold adds/deletes until refresh finishes. We read from both current and old on lookup:
|
|
|
- final Map<BytesRef,VersionValue> old;
|
|
|
+ final VersionLookup old;
|
|
|
+
|
|
|
+ // this is not volatile since we don't need to maintain a happens before relation ship across doc IDs so it's enough to
|
|
|
+ // have the volatile read of the Maps reference to make it visible even across threads.
|
|
|
+ boolean needsSafeAccess;
|
|
|
+ final boolean previousMapsNeededSafeAccess;
|
|
|
|
|
|
- Maps(Map<BytesRef,VersionValue> current, Map<BytesRef,VersionValue> old) {
|
|
|
- this.current = current;
|
|
|
- this.old = old;
|
|
|
+ Maps(VersionLookup current, VersionLookup old, boolean previousMapsNeededSafeAccess) {
|
|
|
+ this.current = current;
|
|
|
+ this.old = old;
|
|
|
+ this.previousMapsNeededSafeAccess = previousMapsNeededSafeAccess;
|
|
|
}
|
|
|
|
|
|
Maps() {
|
|
|
- this(ConcurrentCollections.<BytesRef,VersionValue>newConcurrentMapWithAggressiveConcurrency(),
|
|
|
- Collections.emptyMap());
|
|
|
+ this(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency()), VersionLookup.EMPTY, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isSafeAccessMode() {
|
|
|
+ return needsSafeAccess || previousMapsNeededSafeAccess;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean shouldInheritSafeAccess() {
|
|
|
+ final boolean mapHasNotSeenAnyOperations = current.isEmpty() && current.isUnsafe() == false;
|
|
|
+ return needsSafeAccess
|
|
|
+ // we haven't seen any ops and map before needed it so we maintain it
|
|
|
+ || (mapHasNotSeenAnyOperations && previousMapsNeededSafeAccess);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Builds a new map for the refresh transition this should be called in beforeRefresh()
|
|
|
+ */
|
|
|
+ Maps buildTransitionMap() {
|
|
|
+ return new Maps(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(current.size())),
|
|
|
+ current, shouldInheritSafeAccess());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * builds a new map that invalidates the old map but maintains the current. This should be called in afterRefresh()
|
|
|
+ */
|
|
|
+ Maps invalidateOldMap() {
|
|
|
+ return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -69,6 +147,9 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
|
|
|
private final Map<BytesRef,DeleteVersionValue> tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
|
|
|
|
|
|
private volatile Maps maps = new Maps();
|
|
|
+ // we maintain a second map that only receives the updates that we skip on the actual map (unsafe ops)
|
|
|
+ // this map is only maintained if assertions are enabled
|
|
|
+ private volatile Maps unsafeKeysMap = new Maps();
|
|
|
|
|
|
/** Bytes consumed for each BytesRef UID:
|
|
|
* In this base value, we account for the {@link BytesRef} object itself as
|
|
|
@@ -113,8 +194,8 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
|
|
|
// map. While reopen is running, any lookup will first
|
|
|
// try this new map, then fallback to old, then to the
|
|
|
// current searcher:
|
|
|
- maps = new Maps(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(maps.current.size()), maps.current);
|
|
|
-
|
|
|
+ maps = maps.buildTransitionMap();
|
|
|
+ assert (unsafeKeysMap = unsafeKeysMap.buildTransitionMap()) != null;
|
|
|
// This is not 100% correct, since concurrent indexing ops can change these counters in between our execution of the previous
|
|
|
// line and this one, but that should be minor, and the error won't accumulate over time:
|
|
|
ramBytesUsedCurrent.set(0);
|
|
|
@@ -128,13 +209,18 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
|
|
|
// case. This is because we assign new maps (in beforeRefresh) slightly before Lucene actually flushes any segments for the
|
|
|
// reopen, and so any concurrent indexing requests can still sneak in a few additions to that current map that are in fact reflected
|
|
|
// in the previous reader. We don't touch tombstones here: they expire on their own index.gc_deletes timeframe:
|
|
|
- maps = new Maps(maps.current, Collections.emptyMap());
|
|
|
+
|
|
|
+ maps = maps.invalidateOldMap();
|
|
|
+ assert (unsafeKeysMap = unsafeKeysMap.invalidateOldMap()) != null;
|
|
|
+
|
|
|
}
|
|
|
|
|
|
/** Returns the live version (add or delete) for this uid. */
|
|
|
VersionValue getUnderLock(final BytesRef uid) {
|
|
|
- Maps currentMaps = maps;
|
|
|
+ return getUnderLock(uid, maps);
|
|
|
+ }
|
|
|
|
|
|
+ private VersionValue getUnderLock(final BytesRef uid, Maps currentMaps) {
|
|
|
// First try to get the "live" value:
|
|
|
VersionValue value = currentMaps.current.get(uid);
|
|
|
if (value != null) {
|
|
|
@@ -149,11 +235,52 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
|
|
|
return tombstones.get(uid);
|
|
|
}
|
|
|
|
|
|
+ VersionValue getVersionForAssert(final BytesRef uid) {
|
|
|
+ VersionValue value = getUnderLock(uid, maps);
|
|
|
+ if (value == null) {
|
|
|
+ value = getUnderLock(uid, unsafeKeysMap);
|
|
|
+ }
|
|
|
+ return value;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isUnsafe() {
|
|
|
+ return maps.current.isUnsafe() || maps.old.isUnsafe();
|
|
|
+ }
|
|
|
+
|
|
|
+ void enforceSafeAccess() {
|
|
|
+ maps.needsSafeAccess = true;
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean isSafeAccessRequired() {
|
|
|
+ return maps.isSafeAccessMode();
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Adds this uid/version to the pending adds map iff the map needs safe access. */
|
|
|
+ void maybePutUnderLock(BytesRef uid, VersionValue version) {
|
|
|
+ Maps maps = this.maps;
|
|
|
+ if (maps.isSafeAccessMode()) {
|
|
|
+ putUnderLock(uid, version, maps);
|
|
|
+ } else {
|
|
|
+ maps.current.markAsUnsafe();
|
|
|
+ assert putAssertionMap(uid, version);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean putAssertionMap(BytesRef uid, VersionValue version) {
|
|
|
+ putUnderLock(uid, version, unsafeKeysMap);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
/** Adds this uid/version to the pending adds map. */
|
|
|
void putUnderLock(BytesRef uid, VersionValue version) {
|
|
|
+ Maps maps = this.maps;
|
|
|
+ putUnderLock(uid, version, maps);
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Adds this uid/version to the pending adds map. */
|
|
|
+ private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) {
|
|
|
assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length;
|
|
|
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
|
|
|
-
|
|
|
final VersionValue prev = maps.current.put(uid, version);
|
|
|
if (prev != null) {
|
|
|
// Deduct RAM for the version we just replaced:
|
|
|
@@ -264,5 +391,5 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
|
|
|
|
|
|
/** Returns the current internal versions as a point in time snapshot*/
|
|
|
Map<BytesRef, VersionValue> getAllCurrent() {
|
|
|
- return maps.current;
|
|
|
+ return maps.current.map;
|
|
|
}}
|