|
@@ -23,6 +23,7 @@ import org.elasticsearch.common.lease.Releasable;
|
|
|
import org.elasticsearch.common.lease.Releasables;
|
|
|
import org.elasticsearch.common.util.BigArrays;
|
|
|
import org.elasticsearch.common.util.IntArray;
|
|
|
+import org.elasticsearch.common.util.ObjectArray;
|
|
|
|
|
|
/**
|
|
|
* AbstractHyperLogLogPlusPlus instance that only supports linear counting. The maximum number of hashes supported
|
|
@@ -33,15 +34,16 @@ import org.elasticsearch.common.util.IntArray;
|
|
|
*/
|
|
|
final class HyperLogLogPlusPlusSparse extends AbstractHyperLogLogPlusPlus implements Releasable {
|
|
|
|
|
|
+ // TODO: consider a hll sparse structure
|
|
|
private final LinearCounting lc;
|
|
|
|
|
|
/**
|
|
|
* Create an sparse HLL++ algorithm where capacity is the maximum number of hashes this structure can hold
|
|
|
* per bucket.
|
|
|
*/
|
|
|
- HyperLogLogPlusPlusSparse(int precision, BigArrays bigArrays, int capacity, int initialSize) {
|
|
|
+ HyperLogLogPlusPlusSparse(int precision, BigArrays bigArrays, long initialBuckets) {
|
|
|
super(precision);
|
|
|
- this.lc = new LinearCounting(precision, bigArrays, capacity, initialSize);
|
|
|
+ this.lc = new LinearCounting(precision, bigArrays, initialBuckets);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -49,6 +51,11 @@ final class HyperLogLogPlusPlusSparse extends AbstractHyperLogLogPlusPlus implem
|
|
|
return lc.sizes.size();
|
|
|
}
|
|
|
|
|
|
+ /** Needs to be called before adding elements into a bucket */
|
|
|
+ protected void ensureCapacity(long bucketOrd, long size) {
|
|
|
+ lc.ensureCapacity(bucketOrd, size);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public long cardinality(long bucketOrd) {
|
|
|
return lc.cardinality(bucketOrd);
|
|
@@ -85,24 +92,22 @@ final class HyperLogLogPlusPlusSparse extends AbstractHyperLogLogPlusPlus implem
|
|
|
|
|
|
private static class LinearCounting extends AbstractLinearCounting implements Releasable {
|
|
|
|
|
|
- private final int capacity;
|
|
|
private final BigArrays bigArrays;
|
|
|
private final LinearCountingIterator iterator;
|
|
|
// We are actually using HyperLogLog's runLens array but interpreting it as a hash set for linear counting.
|
|
|
// Number of elements stored.
|
|
|
- private IntArray values;
|
|
|
+ private ObjectArray<IntArray> values;
|
|
|
private IntArray sizes;
|
|
|
|
|
|
- LinearCounting(int p, BigArrays bigArrays, int capacity, int initialSize) {
|
|
|
+ LinearCounting(int p, BigArrays bigArrays, long initialBuckets) {
|
|
|
super(p);
|
|
|
this.bigArrays = bigArrays;
|
|
|
- this.capacity = capacity;
|
|
|
- IntArray values = null;
|
|
|
+ ObjectArray<IntArray> values = null;
|
|
|
IntArray sizes = null;
|
|
|
boolean success = false;
|
|
|
try {
|
|
|
- values = bigArrays.newIntArray(initialSize * capacity);
|
|
|
- sizes = bigArrays.newIntArray(initialSize);
|
|
|
+ values = bigArrays.newObjectArray(initialBuckets);
|
|
|
+ sizes = bigArrays.newIntArray(initialBuckets);
|
|
|
success = true;
|
|
|
} finally {
|
|
|
if (success == false) {
|
|
@@ -111,7 +116,7 @@ final class HyperLogLogPlusPlusSparse extends AbstractHyperLogLogPlusPlus implem
|
|
|
}
|
|
|
this.values = values;
|
|
|
this.sizes = sizes;
|
|
|
- iterator = new LinearCountingIterator(this, capacity);
|
|
|
+ iterator = new LinearCountingIterator();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -120,6 +125,18 @@ final class HyperLogLogPlusPlusSparse extends AbstractHyperLogLogPlusPlus implem
|
|
|
return set(bucketOrd, encoded);
|
|
|
}
|
|
|
|
|
|
+ protected void ensureCapacity(long bucketOrd, long size) {
|
|
|
+ values = bigArrays.grow(values, bucketOrd + 1);
|
|
|
+ sizes = bigArrays.grow(sizes, bucketOrd + 1);
|
|
|
+ IntArray value = values.get(bucketOrd);
|
|
|
+ if (value == null) {
|
|
|
+ value = bigArrays.newIntArray(size);
|
|
|
+ } else {
|
|
|
+ value = bigArrays.grow(value, size);
|
|
|
+ }
|
|
|
+ values.set(bucketOrd, value);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected int size(long bucketOrd) {
|
|
|
if (bucketOrd >= sizes.size()) {
|
|
@@ -132,67 +149,55 @@ final class HyperLogLogPlusPlusSparse extends AbstractHyperLogLogPlusPlus implem
|
|
|
|
|
|
@Override
|
|
|
protected HashesIterator values(long bucketOrd) {
|
|
|
- iterator.reset(bucketOrd, size(bucketOrd));
|
|
|
+ iterator.reset(values.get(bucketOrd), size(bucketOrd));
|
|
|
return iterator;
|
|
|
}
|
|
|
|
|
|
- private long index(long bucketOrd, int index) {
|
|
|
- return (bucketOrd * capacity) + index;
|
|
|
- }
|
|
|
-
|
|
|
- private int get(long bucketOrd, int index) {
|
|
|
- long globalIndex = index(bucketOrd, index);
|
|
|
- if (values.size() < globalIndex) {
|
|
|
- return 0;
|
|
|
- }
|
|
|
- return values.get(globalIndex);
|
|
|
- }
|
|
|
-
|
|
|
private int set(long bucketOrd, int value) {
|
|
|
- int size = size(bucketOrd);
|
|
|
- if (size == 0) {
|
|
|
- sizes = bigArrays.grow(sizes, bucketOrd + 1);
|
|
|
- values = bigArrays.grow(values, (bucketOrd + 1) * capacity);
|
|
|
- }
|
|
|
- values.set(index(bucketOrd, size), value);
|
|
|
+ // This assumes that ensureCapacity has been called before
|
|
|
+ assert values.get(bucketOrd) != null : "Added a value without calling ensureCapacity";
|
|
|
+ IntArray array = values.get(bucketOrd);
|
|
|
+ int size = sizes.get(bucketOrd);
|
|
|
+ array.set(size, value);
|
|
|
return sizes.increment(bucketOrd, 1);
|
|
|
}
|
|
|
|
|
|
private int recomputedSize(long bucketOrd) {
|
|
|
- for (int i = 0; i < capacity; ++i) {
|
|
|
- final int v = get(bucketOrd, i);
|
|
|
+ IntArray array = values.get(bucketOrd);
|
|
|
+ if (array == null) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ for (int i = 0; i < array.size(); ++i) {
|
|
|
+ final int v = array.get(i);
|
|
|
if (v == 0) {
|
|
|
return i;
|
|
|
}
|
|
|
}
|
|
|
- return capacity;
|
|
|
+ return Math.toIntExact(array.size());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void close() {
|
|
|
+ for (int i = 0; i < values.size(); i++) {
|
|
|
+ Releasables.close(values.get(i));
|
|
|
+ }
|
|
|
Releasables.close(values, sizes);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private static class LinearCountingIterator implements AbstractLinearCounting.HashesIterator {
|
|
|
|
|
|
- private final LinearCounting lc;
|
|
|
- private final int capacity;
|
|
|
- long start;
|
|
|
- long end;
|
|
|
- private int value, size;
|
|
|
+ IntArray values;
|
|
|
+ int size, value;
|
|
|
private long pos;
|
|
|
|
|
|
- LinearCountingIterator(LinearCounting lc, int capacity) {
|
|
|
- this.lc = lc;
|
|
|
- this.capacity = capacity;
|
|
|
+ LinearCountingIterator() {
|
|
|
}
|
|
|
|
|
|
- void reset(long bucketOrd, int size) {
|
|
|
- this.start = bucketOrd * capacity;
|
|
|
+ void reset(IntArray values, int size) {
|
|
|
+ this.values = values;
|
|
|
this.size = size;
|
|
|
- this.end = start + size;
|
|
|
- this.pos = start;
|
|
|
+ this.pos = 0;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -202,9 +207,9 @@ final class HyperLogLogPlusPlusSparse extends AbstractHyperLogLogPlusPlus implem
|
|
|
|
|
|
@Override
|
|
|
public boolean next() {
|
|
|
- if (pos < end) {
|
|
|
- value = lc.values.get(pos++);
|
|
|
- return true;
|
|
|
+ if (pos < size) {
|
|
|
+ value = values.get(pos++);
|
|
|
+ return true;
|
|
|
}
|
|
|
return false;
|
|
|
}
|