|
@@ -13,6 +13,7 @@ import org.elasticsearch.common.Rounding;
|
|
|
import org.elasticsearch.common.geo.GeoPoint;
|
|
|
import org.elasticsearch.geometry.Rectangle;
|
|
|
import org.elasticsearch.index.query.BoolQueryBuilder;
|
|
|
+import org.elasticsearch.index.query.ExistsQueryBuilder;
|
|
|
import org.elasticsearch.index.query.GeoBoundingBoxQueryBuilder;
|
|
|
import org.elasticsearch.index.query.QueryBuilder;
|
|
|
import org.elasticsearch.index.query.QueryBuilders;
|
|
@@ -90,18 +91,30 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
|
|
|
* Clear the field collector, e.g. the changes to free up memory.
|
|
|
*/
|
|
|
void clear();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Whether the collector optimizes change detection by narrowing the required query.
|
|
|
+ *
|
|
|
+ * @return true if the collector optimizes change detection
|
|
|
+ */
|
|
|
+ boolean isOptimized();
|
|
|
}
|
|
|
|
|
|
static class TermsFieldCollector implements FieldCollector {
|
|
|
|
|
|
private final String sourceFieldName;
|
|
|
private final String targetFieldName;
|
|
|
+ private final boolean missingBucket;
|
|
|
private final Set<String> changedTerms;
|
|
|
+ // although we could add null to the hash set, its easier to handle null separately
|
|
|
+ private boolean foundNullBucket;
|
|
|
|
|
|
- TermsFieldCollector(final String sourceFieldName, final String targetFieldName) {
|
|
|
+ TermsFieldCollector(final String sourceFieldName, final String targetFieldName, final boolean missingBucket) {
|
|
|
this.sourceFieldName = sourceFieldName;
|
|
|
this.targetFieldName = targetFieldName;
|
|
|
+ this.missingBucket = missingBucket;
|
|
|
this.changedTerms = new HashSet<>();
|
|
|
+ this.foundNullBucket = false;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -114,11 +127,16 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
|
|
|
@Override
|
|
|
public boolean collectChanges(Collection<? extends Bucket> buckets) {
|
|
|
changedTerms.clear();
|
|
|
+ foundNullBucket = false;
|
|
|
|
|
|
for (Bucket b : buckets) {
|
|
|
Object term = b.getKey().get(targetFieldName);
|
|
|
if (term != null) {
|
|
|
changedTerms.add(term.toString());
|
|
|
+ } else {
|
|
|
+ // we should not find a null bucket if missing bucket is false
|
|
|
+ assert missingBucket;
|
|
|
+ foundNullBucket = true;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -127,7 +145,44 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
|
|
|
|
|
|
@Override
|
|
|
public QueryBuilder filterByChanges(long lastCheckpointTimestamp, long nextcheckpointTimestamp) {
|
|
|
- if (changedTerms.isEmpty() == false) {
|
|
|
+ if (missingBucket && foundNullBucket) {
|
|
|
+ QueryBuilder missingBucketQuery = new BoolQueryBuilder().mustNot(new ExistsQueryBuilder(sourceFieldName));
|
|
|
+
|
|
|
+ if (changedTerms.isEmpty()) {
|
|
|
+ return missingBucketQuery;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Combined query with terms and missing bucket:
|
|
|
+ *
|
|
|
+ * "bool": {
|
|
|
+ * "should": [
|
|
|
+ * {
|
|
|
+ * "terms": {
|
|
|
+ * "source_field": [
|
|
|
+ * "term1",
|
|
|
+ * "term2",
|
|
|
+ * ...
|
|
|
+ * ]
|
|
|
+ * }
|
|
|
+ * },
|
|
|
+ * {
|
|
|
+ * "bool": {
|
|
|
+ * "must_not": [
|
|
|
+ * {
|
|
|
+ * "exists": {
|
|
|
+ * "field": "source_field"
|
|
|
+ * }
|
|
|
+ * }
|
|
|
+ * ]
|
|
|
+ * }
|
|
|
+ * }
|
|
|
+ * ]
|
|
|
+ * }
|
|
|
+ */
|
|
|
+ return new BoolQueryBuilder().should(new TermsQueryBuilder(sourceFieldName, changedTerms)).should(missingBucketQuery);
|
|
|
+
|
|
|
+ } else if (changedTerms.isEmpty() == false) {
|
|
|
return new TermsQueryBuilder(sourceFieldName, changedTerms);
|
|
|
}
|
|
|
return null;
|
|
@@ -136,31 +191,43 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
|
|
|
@Override
|
|
|
public void clear() {
|
|
|
changedTerms.clear();
|
|
|
+ foundNullBucket = false;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public AggregationBuilder aggregateChanges() {
|
|
|
return null;
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean isOptimized() {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static class DateHistogramFieldCollector implements FieldCollector {
|
|
|
|
|
|
private final String sourceFieldName;
|
|
|
private final String targetFieldName;
|
|
|
- private final boolean isSynchronizationField;
|
|
|
+ private final boolean missingBucket;
|
|
|
+ private final boolean applyOptimizationForSyncField;
|
|
|
private final Rounding.Prepared rounding;
|
|
|
|
|
|
DateHistogramFieldCollector(
|
|
|
final String sourceFieldName,
|
|
|
final String targetFieldName,
|
|
|
+ final boolean missingBucket,
|
|
|
final Rounding.Prepared rounding,
|
|
|
final boolean isSynchronizationField
|
|
|
) {
|
|
|
this.sourceFieldName = sourceFieldName;
|
|
|
this.targetFieldName = targetFieldName;
|
|
|
+ this.missingBucket = missingBucket;
|
|
|
this.rounding = rounding;
|
|
|
- this.isSynchronizationField = isSynchronizationField;
|
|
|
+
|
|
|
+ // if missing_bucket is set to true, we can't apply the optimization, note: this combination
|
|
|
+ // is illogical, because the sync field should be steady
|
|
|
+ this.applyOptimizationForSyncField = isSynchronizationField && (missingBucket == false);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -176,7 +243,9 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
|
|
|
|
|
|
@Override
|
|
|
public QueryBuilder filterByChanges(long lastCheckpointTimestamp, long nextcheckpointTimestamp) {
|
|
|
- if (isSynchronizationField && lastCheckpointTimestamp > 0) {
|
|
|
+
|
|
|
+ if (applyOptimizationForSyncField && lastCheckpointTimestamp > 0) {
|
|
|
+ assert missingBucket == false;
|
|
|
return new RangeQueryBuilder(sourceFieldName).gte(rounding.round(lastCheckpointTimestamp)).format("epoch_millis");
|
|
|
}
|
|
|
|
|
@@ -192,16 +261,24 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
|
|
|
public AggregationBuilder aggregateChanges() {
|
|
|
return null;
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean isOptimized() {
|
|
|
+ // we only have 1 optimization
|
|
|
+ return applyOptimizationForSyncField;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static class HistogramFieldCollector implements FieldCollector {
|
|
|
|
|
|
private final String sourceFieldName;
|
|
|
private final String targetFieldName;
|
|
|
+ private final boolean missingBucket;
|
|
|
|
|
|
- HistogramFieldCollector(final String sourceFieldName, final String targetFieldName) {
|
|
|
+ HistogramFieldCollector(final String sourceFieldName, final String targetFieldName, final boolean missingBucket) {
|
|
|
this.sourceFieldName = sourceFieldName;
|
|
|
this.targetFieldName = targetFieldName;
|
|
|
+ this.missingBucket = missingBucket;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -226,18 +303,28 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
|
|
|
public AggregationBuilder aggregateChanges() {
|
|
|
return null;
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean isOptimized() {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static class GeoTileFieldCollector implements FieldCollector {
|
|
|
|
|
|
private final String sourceFieldName;
|
|
|
private final String targetFieldName;
|
|
|
+ private final boolean missingBucket;
|
|
|
private final Set<String> changedBuckets;
|
|
|
+ // although we could add null to the hash set, its easier to handle null separately
|
|
|
+ private boolean foundNullBucket;
|
|
|
|
|
|
- GeoTileFieldCollector(final String sourceFieldName, final String targetFieldName) {
|
|
|
+ GeoTileFieldCollector(final String sourceFieldName, final String targetFieldName, final boolean missingBucket) {
|
|
|
this.sourceFieldName = sourceFieldName;
|
|
|
this.targetFieldName = targetFieldName;
|
|
|
+ this.missingBucket = missingBucket;
|
|
|
this.changedBuckets = new HashSet<>();
|
|
|
+ this.foundNullBucket = false;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -249,11 +336,16 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
|
|
|
@Override
|
|
|
public boolean collectChanges(Collection<? extends Bucket> buckets) {
|
|
|
changedBuckets.clear();
|
|
|
+ foundNullBucket = false;
|
|
|
|
|
|
for (Bucket b : buckets) {
|
|
|
Object bucket = b.getKey().get(targetFieldName);
|
|
|
if (bucket != null) {
|
|
|
changedBuckets.add(bucket.toString());
|
|
|
+ } else {
|
|
|
+ // we should not find a null bucket if missing bucket is false
|
|
|
+ assert missingBucket;
|
|
|
+ foundNullBucket = true;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -262,16 +354,69 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
|
|
|
|
|
|
@Override
|
|
|
public QueryBuilder filterByChanges(long lastCheckpointTimestamp, long nextcheckpointTimestamp) {
|
|
|
- if (changedBuckets != null && changedBuckets.isEmpty() == false) {
|
|
|
- BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
|
|
|
- changedBuckets.stream().map(GeoTileUtils::toBoundingBox).map(this::toGeoQuery).forEach(boolQueryBuilder::should);
|
|
|
- return boolQueryBuilder;
|
|
|
+ BoolQueryBuilder boundingBoxesQueryBuilder = null;
|
|
|
+
|
|
|
+ if (changedBuckets.isEmpty() == false) {
|
|
|
+ boundingBoxesQueryBuilder = QueryBuilders.boolQuery();
|
|
|
+ changedBuckets.stream().map(GeoTileUtils::toBoundingBox).map(this::toGeoQuery).forEach(boundingBoxesQueryBuilder::should);
|
|
|
}
|
|
|
- return null;
|
|
|
+
|
|
|
+ if (missingBucket && foundNullBucket) {
|
|
|
+ QueryBuilder missingBucketQuery = new BoolQueryBuilder().mustNot(new ExistsQueryBuilder(sourceFieldName));
|
|
|
+
|
|
|
+ if (boundingBoxesQueryBuilder == null) {
|
|
|
+ return missingBucketQuery;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Combined query with geo bounding boxes and missing bucket:
|
|
|
+ *
|
|
|
+ * "bool": {
|
|
|
+ * "should": [
|
|
|
+ * {
|
|
|
+ * "geo_bounding_box": {
|
|
|
+ * "source_field": {
|
|
|
+ * "top_left": {
|
|
|
+ * "lat": x1,
|
|
|
+ * "lon": y1
|
|
|
+ * },
|
|
|
+ * "bottom_right": {
|
|
|
+ * "lat": x2,
|
|
|
+ * "lon": y2
|
|
|
+ * }
|
|
|
+ * }
|
|
|
+ * }
|
|
|
+ * },
|
|
|
+ * {
|
|
|
+ * "geo_bounding_box": {
|
|
|
+ * ...
|
|
|
+ * }
|
|
|
+ * },
|
|
|
+ * {
|
|
|
+ * "bool": {
|
|
|
+ * "must_not": [
|
|
|
+ * {
|
|
|
+ * "exists": {
|
|
|
+ * "field": "source_field"
|
|
|
+ * }
|
|
|
+ * }
|
|
|
+ * ]
|
|
|
+ * }
|
|
|
+ * }
|
|
|
+ * ]
|
|
|
+ * }
|
|
|
+ */
|
|
|
+ return boundingBoxesQueryBuilder.should(missingBucketQuery);
|
|
|
+ }
|
|
|
+
|
|
|
+ return boundingBoxesQueryBuilder;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void clear() {}
|
|
|
+ public void clear() {
|
|
|
+ changedBuckets.clear();
|
|
|
+ foundNullBucket = false;
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
public AggregationBuilder aggregateChanges() {
|
|
@@ -285,9 +430,14 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
|
|
|
new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon())
|
|
|
);
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean isOptimized() {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- public CompositeBucketsChangeCollector(
|
|
|
+ private CompositeBucketsChangeCollector(
|
|
|
@Nullable CompositeAggregationBuilder compositeAggregation,
|
|
|
Map<String, FieldCollector> fieldCollectors
|
|
|
) {
|
|
@@ -368,6 +518,11 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
|
|
|
return afterKey;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public boolean isOptimized() {
|
|
|
+ return fieldCollectors.values().stream().anyMatch(FieldCollector::isOptimized);
|
|
|
+ }
|
|
|
+
|
|
|
public static ChangeCollector buildChangeCollector(
|
|
|
@Nullable CompositeAggregationBuilder compositeAggregationBuilder,
|
|
|
Map<String, SingleGroupSource> groups,
|
|
@@ -385,13 +540,21 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
|
|
|
case TERMS:
|
|
|
fieldCollectors.put(
|
|
|
entry.getKey(),
|
|
|
- new CompositeBucketsChangeCollector.TermsFieldCollector(entry.getValue().getField(), entry.getKey())
|
|
|
+ new CompositeBucketsChangeCollector.TermsFieldCollector(
|
|
|
+ entry.getValue().getField(),
|
|
|
+ entry.getKey(),
|
|
|
+ entry.getValue().getMissingBucket()
|
|
|
+ )
|
|
|
);
|
|
|
break;
|
|
|
case HISTOGRAM:
|
|
|
fieldCollectors.put(
|
|
|
entry.getKey(),
|
|
|
- new CompositeBucketsChangeCollector.HistogramFieldCollector(entry.getValue().getField(), entry.getKey())
|
|
|
+ new CompositeBucketsChangeCollector.HistogramFieldCollector(
|
|
|
+ entry.getValue().getField(),
|
|
|
+ entry.getKey(),
|
|
|
+ entry.getValue().getMissingBucket()
|
|
|
+ )
|
|
|
);
|
|
|
break;
|
|
|
case DATE_HISTOGRAM:
|
|
@@ -400,6 +563,7 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
|
|
|
new CompositeBucketsChangeCollector.DateHistogramFieldCollector(
|
|
|
entry.getValue().getField(),
|
|
|
entry.getKey(),
|
|
|
+ entry.getValue().getMissingBucket(),
|
|
|
((DateHistogramGroupSource) entry.getValue()).getRounding(),
|
|
|
entry.getKey().equals(synchronizationField)
|
|
|
)
|
|
@@ -408,7 +572,11 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
|
|
|
case GEOTILE_GRID:
|
|
|
fieldCollectors.put(
|
|
|
entry.getKey(),
|
|
|
- new CompositeBucketsChangeCollector.GeoTileFieldCollector(entry.getValue().getField(), entry.getKey())
|
|
|
+ new CompositeBucketsChangeCollector.GeoTileFieldCollector(
|
|
|
+ entry.getValue().getField(),
|
|
|
+ entry.getKey(),
|
|
|
+ entry.getValue().getMissingBucket()
|
|
|
+ )
|
|
|
);
|
|
|
break;
|
|
|
default:
|