浏览代码

[Transform] Reduce indexes to query based on checkpoints (#75839)

Continuous transform reduce the amount of data to query for by detecting what has been changed
since the last checkpoint. This information is used to inject queries that narrow the scope.
The query is send to all configured indices. This change reduces the indexes to call
using checkpoint information. The number of network calls go down which in addition to performance
reduces the probability of a failure.

This change mainly helps the transforms of type latest, pivot transform require additional
changes planned for later.
Hendrik Muhs 4 年之前
父节点
当前提交
4974a7cd7b

+ 20 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java

@@ -21,10 +21,13 @@ import org.elasticsearch.xpack.core.transform.TransformField;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Set;
 import java.util.TreeMap;
 
 import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
@@ -312,6 +315,23 @@ public class TransformCheckpoint implements Writeable, ToXContentObject {
         return newCheckPointOperationsSum - oldCheckPointOperationsSum;
     }
 
+    public static Collection<String> getChangedIndices(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint) {
+        if (oldCheckpoint.isEmpty()) {
+            return newCheckpoint.indicesCheckpoints.keySet();
+        }
+
+        Set<String> indices = new HashSet<>();
+
+        for (Entry<String, long[]> entry : newCheckpoint.indicesCheckpoints.entrySet()) {
+            // compare against the old checkpoint
+            if (Arrays.equals(entry.getValue(), oldCheckpoint.indicesCheckpoints.get(entry.getKey())) == false) {
+                indices.add(entry.getKey());
+            }
+        }
+
+        return indices;
+    }
+
     private static Map<String, long[]> readCheckpoints(Map<String, Object> readMap) {
         Map<String, long[]> checkpoints = new TreeMap<>();
         for (Map.Entry<String, Object> e : readMap.entrySet()) {

+ 72 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointTests.java

@@ -19,9 +19,13 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 import static org.elasticsearch.test.TestMatchers.matchesPattern;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.lessThan;
 
 public class TransformCheckpointTests extends AbstractSerializingTransformTestCase<TransformCheckpoint> {
 
@@ -191,6 +195,74 @@ public class TransformCheckpointTests extends AbstractSerializingTransformTestCa
         assertEquals((indices - 2) * shards * 10L, TransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew));
     }
 
+    public void testGetChangedIndices() {
+        String baseIndexName = randomAlphaOfLength(8);
+        String id = randomAlphaOfLengthBetween(1, 10);
+        long timestamp = randomNonNegativeLong();
+
+        TreeMap<String, long[]> checkpointsByIndexOld = new TreeMap<>();
+        TreeMap<String, long[]> checkpointsByIndexNew = new TreeMap<>();
+
+        int indices = randomIntBetween(5, 20);
+        int shards = randomIntBetween(1, 20);
+
+        for (int i = 0; i < indices; ++i) {
+            List<Long> checkpoints1 = new ArrayList<>();
+            List<Long> checkpoints2 = new ArrayList<>();
+
+            for (int j = 0; j < shards; ++j) {
+                long shardCheckpoint = randomLongBetween(-1, 1_000_000);
+                checkpoints1.add(shardCheckpoint);
+                if (i % 3 == 0) {
+                    checkpoints2.add(shardCheckpoint + 10);
+                } else {
+                    checkpoints2.add(shardCheckpoint);
+                }
+            }
+
+            String indexName = baseIndexName + i;
+
+            if (i < 15) {
+                checkpointsByIndexOld.put(indexName, checkpoints1.stream().mapToLong(l -> l).toArray());
+            }
+            if (i % 5 != 0) {
+                checkpointsByIndexNew.put(indexName, checkpoints2.stream().mapToLong(l -> l).toArray());
+            }
+        }
+        long checkpoint = randomLongBetween(10, 100);
+        TransformCheckpoint checkpointOld = new TransformCheckpoint(id, timestamp, checkpoint, checkpointsByIndexOld, 0L);
+        TransformCheckpoint checkpointNew = new TransformCheckpoint(id, timestamp, checkpoint + 1, checkpointsByIndexNew, 0L);
+
+        Set<Integer> changedIndexes = TransformCheckpoint.getChangedIndices(checkpointOld, checkpointNew)
+            .stream()
+            .map(x -> Integer.parseInt(x.substring(baseIndexName.length())))
+            .collect(Collectors.toSet());
+
+        assertThat(changedIndexes.size(), lessThan(indices));
+
+        for (int i = 0; i < indices; ++i) {
+            if (i >= 15) {
+                if (i % 5 == 0) {
+                    assertFalse(changedIndexes.contains(i));
+                } else {
+                    assertTrue(changedIndexes.contains(i));
+                }
+            } else if (i % 5 == 0) {
+                assertFalse(changedIndexes.contains(i));
+            } else if (i % 3 == 0) {
+                assertTrue(changedIndexes.contains(i));
+            } else {
+                assertFalse(changedIndexes.contains(i));
+            }
+        }
+
+        // check against empty
+        assertThat(
+            TransformCheckpoint.getChangedIndices(TransformCheckpoint.EMPTY, checkpointNew),
+            equalTo(checkpointNew.getIndicesCheckpoints().keySet())
+        );
+    }
+
     private static Map<String, long[]> randomCheckpointsByIndex() {
         Map<String, long[]> checkpointsByIndex = new TreeMap<>();
         int indices = randomIntBetween(1, 10);

+ 51 - 25
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java

@@ -30,6 +30,7 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.common.logging.LoggerMessageFormat;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.engine.VersionConflictEngineException;
 import org.elasticsearch.index.mapper.MapperParsingException;
 import org.elasticsearch.index.reindex.BulkByScrollResponse;
@@ -59,6 +60,7 @@ import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -71,7 +73,7 @@ class ClientTransformIndexer extends TransformIndexer {
     private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false);
 
     private final AtomicReference<SeqNoPrimaryTermAndIndex> seqNoPrimaryTermAndIndex;
-    private volatile PointInTimeBuilder pit;
+    private final ConcurrentHashMap<String, PointInTimeBuilder> namedPits = new ConcurrentHashMap<>();
     private volatile long pitCheckpoint;
     private volatile boolean disablePit = false;
 
@@ -250,11 +252,7 @@ class ClientTransformIndexer extends TransformIndexer {
 
     @Override
     void doGetFieldMappings(ActionListener<Map<String, String>> fieldMappingsListener) {
-        SchemaUtil.getDestinationFieldMappings(
-            client,
-            getConfig().getDestination().getIndex(),
-            fieldMappingsListener
-        );
+        SchemaUtil.getDestinationFieldMappings(client, getConfig().getDestination().getIndex(), fieldMappingsListener);
     }
 
     /**
@@ -363,12 +361,20 @@ class ClientTransformIndexer extends TransformIndexer {
     }
 
     private void closePointInTime() {
+        for (String name : namedPits.keySet()) {
+            closePointInTime(name);
+        }
+    }
+
+    private void closePointInTime(String name) {
+        PointInTimeBuilder pit = namedPits.remove(name);
+
         if (pit == null) {
             return;
         }
 
         String oldPit = pit.getEncodedId();
-        pit = null;
+
         ClosePointInTimeRequest closePitRequest = new ClosePointInTimeRequest(oldPit);
         ClientHelper.executeWithHeadersAsync(
             transformConfig.getHeaders(),
@@ -383,20 +389,25 @@ class ClientTransformIndexer extends TransformIndexer {
         );
     }
 
-    private void injectPointInTimeIfNeeded(SearchRequest searchRequest, ActionListener<SearchRequest> listener) {
+    private void injectPointInTimeIfNeeded(
+        Tuple<String, SearchRequest> namedSearchRequest,
+        ActionListener<Tuple<String, SearchRequest>> listener
+    ) {
         if (disablePit) {
-            listener.onResponse(searchRequest);
+            listener.onResponse(namedSearchRequest);
             return;
         }
 
+        SearchRequest searchRequest = namedSearchRequest.v2();
+        PointInTimeBuilder pit = namedPits.get(namedSearchRequest.v1());
         if (pit != null) {
             searchRequest.source().pointInTimeBuilder(pit);
-            listener.onResponse(searchRequest);
+            listener.onResponse(namedSearchRequest);
             return;
         }
 
         // no pit, create a new one
-        OpenPointInTimeRequest pitRequest = new OpenPointInTimeRequest(transformConfig.getSource().getIndex()).keepAlive(PIT_KEEP_ALIVE);
+        OpenPointInTimeRequest pitRequest = new OpenPointInTimeRequest(searchRequest.indices()).keepAlive(PIT_KEEP_ALIVE);
 
         ClientHelper.executeWithHeadersAsync(
             transformConfig.getHeaders(),
@@ -405,11 +416,17 @@ class ClientTransformIndexer extends TransformIndexer {
             OpenPointInTimeAction.INSTANCE,
             pitRequest,
             ActionListener.wrap(response -> {
-                pit = new PointInTimeBuilder(response.getPointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE);
-                searchRequest.source().pointInTimeBuilder(pit);
+                PointInTimeBuilder newPit = new PointInTimeBuilder(response.getPointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE);
+                namedPits.put(namedSearchRequest.v1(), newPit);
+                searchRequest.source().pointInTimeBuilder(newPit);
                 pitCheckpoint = getNextCheckpoint().getCheckpoint();
-                logger.trace("[{}] using pit search context with id [{}]", getJobId(), pit.getEncodedId());
-                listener.onResponse(searchRequest);
+                logger.trace(
+                    "[{}] using pit search context with id [{}]; request [{}]",
+                    getJobId(),
+                    newPit.getEncodedId(),
+                    namedSearchRequest.v1()
+                );
+                listener.onResponse(namedSearchRequest);
             }, e -> {
                 Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e);
                 // if point in time is not supported, disable it but do not remember forever (stopping and starting will give it another
@@ -433,25 +450,27 @@ class ClientTransformIndexer extends TransformIndexer {
                         e
                     );
                 }
-                listener.onResponse(searchRequest);
+                listener.onResponse(namedSearchRequest);
             })
         );
     }
 
-    private void doSearch(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
-        logger.trace("searchRequest: {}", searchRequest);
+    private void doSearch(Tuple<String, SearchRequest> namedSearchRequest, ActionListener<SearchResponse> listener) {
+        logger.trace(() -> new ParameterizedMessage("searchRequest: [{}]", namedSearchRequest.v2()));
+
+        PointInTimeBuilder pit = namedSearchRequest.v2().pointInTimeBuilder();
 
         ClientHelper.executeWithHeadersAsync(
             transformConfig.getHeaders(),
             ClientHelper.TRANSFORM_ORIGIN,
             client,
             SearchAction.INSTANCE,
-            searchRequest,
+            namedSearchRequest.v2(),
             ActionListener.wrap(response -> {
                 // did the pit change?
                 if (response.pointInTimeId() != null && (pit == null || response.pointInTimeId() != pit.getEncodedId())) {
-                    pit = new PointInTimeBuilder(response.pointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE);
-                    logger.trace("point in time handle has changed");
+                    namedPits.put(namedSearchRequest.v1(), new PointInTimeBuilder(response.pointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE));
+                    logger.trace("point in time handle has changed; request [{}]", namedSearchRequest.v1());
                 }
 
                 listener.onResponse(response);
@@ -461,15 +480,22 @@ class ClientTransformIndexer extends TransformIndexer {
                 // succeeds a new pit gets created at the next run
                 Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e);
                 if (unwrappedException instanceof SearchContextMissingException) {
-                    logger.warn(new ParameterizedMessage("[{}] Search context missing, falling back to normal search.", getJobId()), e);
-                    pit = null;
-                    searchRequest.source().pointInTimeBuilder(null);
+                    logger.warn(
+                        new ParameterizedMessage(
+                            "[{}] Search context missing, falling back to normal search; request [{}]",
+                            getJobId(),
+                            namedSearchRequest.v1()
+                        ),
+                        e
+                    );
+                    namedPits.remove(namedSearchRequest.v1());
+                    namedSearchRequest.v2().source().pointInTimeBuilder(null);
                     ClientHelper.executeWithHeadersAsync(
                         transformConfig.getHeaders(),
                         ClientHelper.TRANSFORM_ORIGIN,
                         client,
                         SearchAction.INSTANCE,
-                        searchRequest,
+                        namedSearchRequest.v2(),
                         listener
                     );
                     return;

+ 14 - 5
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java

@@ -15,9 +15,11 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
+import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
 import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
 import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
@@ -74,13 +76,20 @@ public interface Function {
         /**
          * Build the filter query to narrow the result set given the previously collected changes.
          *
-         * TODO: it might be useful to have the full checkpoint data.
-         *
-         * @param lastCheckpointTimestamp the timestamp of the last checkpoint
-         * @param nextCheckpointTimestamp the timestamp of the next (in progress) checkpoint
+         * @param lastCheckpoint the last checkpoint
+         * @param nextCheckpoint the next (in progress) checkpoint
          * @return a filter query, null in case of no filter
          */
-        QueryBuilder buildFilterQuery(long lastCheckpointTimestamp, long nextCheckpointTimestamp);
+        QueryBuilder buildFilterQuery(TransformCheckpoint lastCheckpoint, TransformCheckpoint nextCheckpoint);
+
+        /**
+         * Filter indices according to the given checkpoints.
+         *
+         * @param lastCheckpoint the last checkpoint
+         * @param nextCheckpoint the next (in progress) checkpoint
+         * @return set of indices to query
+         */
+        Collection<String> getIndicesToQuery(TransformCheckpoint lastCheckpoint, TransformCheckpoint nextCheckpoint);
 
         /**
          * Clear the internal state to free up memory.

+ 27 - 18
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

@@ -1091,32 +1091,34 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
         return queryBuilder;
     }
 
-    protected SearchRequest buildSearchRequest() {
+    protected Tuple<String, SearchRequest> buildSearchRequest() {
         assert nextCheckpoint != null;
 
-        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().runtimeMappings(getConfig().getSource().getRuntimeMappings());
         switch (runState) {
             case APPLY_RESULTS:
-                buildUpdateQuery(sourceBuilder);
-                break;
+                return new Tuple<>("apply_results", buildQueryToUpdateDestinationIndex());
             case IDENTIFY_CHANGES:
-                buildChangedBucketsQuery(sourceBuilder);
-                break;
+                return new Tuple<>("identify_changes", buildQueryToFindChanges());
             default:
                 // Any other state is a bug, should not happen
                 logger.warn("Encountered unexpected run state [" + runState + "]");
                 throw new IllegalStateException("Transform indexer job encountered an illegal state [" + runState + "]");
         }
-
-        return new SearchRequest(getConfig().getSource().getIndex()).allowPartialSearchResults(false)
-            .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
-            .source(sourceBuilder);
     }
 
-    private SearchSourceBuilder buildChangedBucketsQuery(SearchSourceBuilder sourceBuilder) {
+    private SearchRequest buildQueryToFindChanges() {
         assert isContinuous();
 
         TransformIndexerPosition position = getPosition();
+        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().runtimeMappings(getConfig().getSource().getRuntimeMappings());
+
+        // reduce the indexes to query to the ones that have changes
+        SearchRequest request = new SearchRequest(
+            TransformCheckpoint.getChangedIndices(getLastCheckpoint(), getNextCheckpoint()).toArray(new String[0])
+        );
+
+        request.allowPartialSearchResults(false) // shard failures should fail the request
+            .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); // TODO: make configurable
 
         changeCollector.buildChangesQuery(sourceBuilder, position != null ? position.getBucketsPosition() : null, pageSize);
 
@@ -1130,16 +1132,18 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
         sourceBuilder.query(filteredQuery);
 
         logger.debug("[{}] Querying for changes: {}", getJobId(), sourceBuilder);
-        return sourceBuilder;
+        return request.source(sourceBuilder);
     }
 
-    private SearchSourceBuilder buildUpdateQuery(SearchSourceBuilder sourceBuilder) {
+    private SearchRequest buildQueryToUpdateDestinationIndex() {
         TransformIndexerPosition position = getPosition();
 
         TransformConfig config = getConfig();
+        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().runtimeMappings(getConfig().getSource().getRuntimeMappings());
 
         function.buildSearchQuery(sourceBuilder, position != null ? position.getIndexerPosition() : null, pageSize);
 
+        SearchRequest request = new SearchRequest();
         QueryBuilder queryBuilder = config.getSource().getQueryConfig().getQuery();
 
         if (isContinuous()) {
@@ -1148,22 +1152,27 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
 
             // Only apply extra filter if it is the subsequent run of the continuous transform
             if (nextCheckpoint.getCheckpoint() > 1 && changeCollector != null) {
-                QueryBuilder filter = changeCollector.buildFilterQuery(
-                    lastCheckpoint.getTimeUpperBound(),
-                    nextCheckpoint.getTimeUpperBound()
-                );
+                QueryBuilder filter = changeCollector.buildFilterQuery(lastCheckpoint, nextCheckpoint);
                 if (filter != null) {
                     filteredQuery.filter(filter);
                 }
+                request.indices(changeCollector.getIndicesToQuery(lastCheckpoint, nextCheckpoint).toArray(new String[0]));
+            } else {
+                request.indices(getConfig().getSource().getIndex());
             }
 
             queryBuilder = filteredQuery;
+
+        } else {
+            request.indices(getConfig().getSource().getIndex());
         }
 
         sourceBuilder.query(queryBuilder);
         logger.debug(() -> new ParameterizedMessage("[{}] Querying for data: {}", getJobId(), sourceBuilder));
 
-        return sourceBuilder;
+        return request.source(sourceBuilder)
+            .allowPartialSearchResults(false) // shard failures should fail the request
+            .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); // TODO: make configurable
     }
 
     /**

+ 11 - 3
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/LatestChangeCollector.java

@@ -11,8 +11,10 @@ import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
 import org.elasticsearch.xpack.transform.transforms.Function;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
 
@@ -40,15 +42,21 @@ class LatestChangeCollector implements Function.ChangeCollector {
     }
 
     @Override
-    public QueryBuilder buildFilterQuery(long lastCheckpointTimestamp, long nextCheckpointTimestamp) {
+    public QueryBuilder buildFilterQuery(TransformCheckpoint lastCheckpoint, TransformCheckpoint nextCheckpoint) {
         // We are only interested in documents that were created in the timeline of the current checkpoint.
         // Older documents cannot influence the transform results as we require the sort field values to change monotonically over time.
         return QueryBuilders.rangeQuery(synchronizationField)
-            .gte(lastCheckpointTimestamp)
-            .lt(nextCheckpointTimestamp)
+            .gte(lastCheckpoint.getTimeUpperBound())
+            .lt(nextCheckpoint.getTimeUpperBound())
             .format("epoch_millis");
     }
 
+    @Override
+    public Collection<String> getIndicesToQuery(TransformCheckpoint lastCheckpoint, TransformCheckpoint nextCheckpoint) {
+        // we can shortcut here, only the changed indices are of interest
+        return TransformCheckpoint.getChangedIndices(lastCheckpoint, nextCheckpoint);
+    }
+
     @Override
     public void clear() {
         // This object is stateless so there is no internal state to clear

+ 13 - 3
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollector.java

@@ -32,6 +32,7 @@ import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceB
 import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
 import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.DateHistogramGroupSource;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.HistogramGroupSource;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
@@ -705,16 +706,19 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
     }
 
     @Override
-    public QueryBuilder buildFilterQuery(long lastCheckpointTimestamp, long nextCheckpointTimestamp) {
+    public QueryBuilder buildFilterQuery(TransformCheckpoint lastCheckpoint, TransformCheckpoint nextCheckpoint) {
         // shortcut for only 1 element
         if (fieldCollectors.size() == 1) {
-            return fieldCollectors.values().iterator().next().filterByChanges(lastCheckpointTimestamp, nextCheckpointTimestamp);
+            return fieldCollectors.values()
+                .iterator()
+                .next()
+                .filterByChanges(lastCheckpoint.getTimeUpperBound(), nextCheckpoint.getTimeUpperBound());
         }
 
         BoolQueryBuilder filteredQuery = new BoolQueryBuilder();
 
         for (FieldCollector fieldCollector : fieldCollectors.values()) {
-            QueryBuilder filter = fieldCollector.filterByChanges(lastCheckpointTimestamp, nextCheckpointTimestamp);
+            QueryBuilder filter = fieldCollector.filterByChanges(lastCheckpoint.getTimeUpperBound(), nextCheckpoint.getTimeUpperBound());
             if (filter != null) {
                 filteredQuery.filter(filter);
             }
@@ -723,6 +727,12 @@ public class CompositeBucketsChangeCollector implements ChangeCollector {
         return filteredQuery;
     }
 
+    @Override
+    public Collection<String> getIndicesToQuery(TransformCheckpoint lastCheckpoint, TransformCheckpoint nextCheckpoint) {
+        // for updating the data, all indices have to be queried
+        return TransformCheckpoint.getChangedIndices(TransformCheckpoint.EMPTY, nextCheckpoint);
+    }
+
     @Override
     public Map<String, Object> processSearchResponse(final SearchResponse searchResponse) {
         final Aggregations aggregations = searchResponse.getAggregations();

+ 3 - 6
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java

@@ -21,6 +21,7 @@ import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.search.SearchContextMissingException;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
@@ -52,7 +53,6 @@ import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
 import java.time.Instant;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -143,7 +143,6 @@ public class ClientTransformIndexerTests extends ESTestCase {
                 client,
                 mock(TransformIndexerStats.class),
                 config,
-                Collections.emptyMap(),
                 null,
                 new TransformCheckpoint(
                     "transform",
@@ -241,7 +240,6 @@ public class ClientTransformIndexerTests extends ESTestCase {
                 client,
                 mock(TransformIndexerStats.class),
                 config,
-                Collections.emptyMap(),
                 null,
                 new TransformCheckpoint(
                     "transform",
@@ -309,7 +307,6 @@ public class ClientTransformIndexerTests extends ESTestCase {
             Client client,
             TransformIndexerStats initialStats,
             TransformConfig transformConfig,
-            Map<String, String> fieldMappings,
             TransformProgress transformProgress,
             TransformCheckpoint lastCheckpoint,
             TransformCheckpoint nextCheckpoint,
@@ -336,8 +333,8 @@ public class ClientTransformIndexerTests extends ESTestCase {
         }
 
         @Override
-        protected SearchRequest buildSearchRequest() {
-            return new SearchRequest().source(new SearchSourceBuilder());
+        protected Tuple<String, SearchRequest> buildSearchRequest() {
+            return new Tuple<>("mock", new SearchRequest().source(new SearchSourceBuilder()));
         }
     }
 

+ 1 - 1
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java

@@ -177,7 +177,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
             }
 
             try {
-                SearchResponse response = searchFunction.apply(buildSearchRequest());
+                SearchResponse response = searchFunction.apply(buildSearchRequest().v2());
                 nextPhase.onResponse(response);
             } catch (Exception e) {
                 nextPhase.onFailure(e);

+ 221 - 4
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/latest/LatestChangeCollectorTests.java

@@ -9,6 +9,11 @@ package org.elasticsearch.xpack.transform.transforms.latest;
 
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -19,11 +24,223 @@ public class LatestChangeCollectorTests extends ESTestCase {
         LatestChangeCollector changeCollector = new LatestChangeCollector("timestamp");
 
         assertThat(
-            changeCollector.buildFilterQuery(0, 123456789),
-            is(equalTo(QueryBuilders.rangeQuery("timestamp").gte(0L).lt(123456789L).format("epoch_millis"))));
+            changeCollector.buildFilterQuery(
+                new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 0L),
+                new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 123456789L)
+            ),
+            is(equalTo(QueryBuilders.rangeQuery("timestamp").gte(0L).lt(123456789L).format("epoch_millis")))
+        );
+
+        assertThat(
+            changeCollector.buildFilterQuery(
+                new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 123456789L),
+                new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 234567890L)
+            ),
+            is(equalTo(QueryBuilders.rangeQuery("timestamp").gte(123456789L).lt(234567890L).format("epoch_millis")))
+        );
+    }
+
+    public void testGetIndicesToQuery() {
+        LatestChangeCollector changeCollector = new LatestChangeCollector("timestamp");
+
+        long[] indexSequenceIds1 = { 25L, 25L, 25L };
+        long[] indexSequenceIds2 = { 324L, 2425L, 2225L };
+        long[] indexSequenceIds3 = { 244L, 225L, 2425L };
+        long[] indexSequenceIds4 = { 2005L, 2445L, 2425L };
+
+        long[] indexSequenceIds3_1 = { 246L, 255L, 2485L };
+        long[] indexSequenceIds4_1 = { 2105L, 2545L, 2525L };
+
+        // no changes
+        assertThat(
+            changeCollector.getIndicesToQuery(
+                new TransformCheckpoint(
+                    "t_id",
+                    123513L,
+                    42L,
+                    Map.of(
+                        "index-1",
+                        indexSequenceIds1,
+                        "index-2",
+                        indexSequenceIds2,
+                        "index-3",
+                        indexSequenceIds3,
+                        "index-4",
+                        indexSequenceIds4
+                    ),
+                    123543L
+                ),
+                new TransformCheckpoint(
+                    "t_id",
+                    123456759L,
+                    43L,
+                    Map.of(
+                        "index-1",
+                        indexSequenceIds1,
+                        "index-2",
+                        indexSequenceIds2,
+                        "index-3",
+                        indexSequenceIds3,
+                        "index-4",
+                        indexSequenceIds4
+                    ),
+                    123456789L
+                )
+            ),
+            equalTo(Collections.emptySet())
+        );
+
+        // 3 and 4 changed, 1 and 2 not
+        assertThat(
+            changeCollector.getIndicesToQuery(
+                new TransformCheckpoint(
+                    "t_id",
+                    123513L,
+                    42L,
+                    Map.of(
+                        "index-1",
+                        indexSequenceIds1,
+                        "index-2",
+                        indexSequenceIds2,
+                        "index-3",
+                        indexSequenceIds3,
+                        "index-4",
+                        indexSequenceIds4
+                    ),
+                    123543L
+                ),
+                new TransformCheckpoint(
+                    "t_id",
+                    123456759L,
+                    43L,
+                    Map.of(
+                        "index-1",
+                        indexSequenceIds1,
+                        "index-2",
+                        indexSequenceIds2,
+                        "index-3",
+                        indexSequenceIds3_1,
+                        "index-4",
+                        indexSequenceIds4_1
+                    ),
+                    123456789L
+                )
+            ),
+            equalTo(Set.of("index-3", "index-4"))
+        );
+
+        // only 3 changed (no order)
+        assertThat(
+            changeCollector.getIndicesToQuery(
+                new TransformCheckpoint(
+                    "t_id",
+                    123513L,
+                    42L,
+                    Map.of(
+                        "index-1",
+                        indexSequenceIds1,
+                        "index-2",
+                        indexSequenceIds2,
+                        "index-3",
+                        indexSequenceIds3,
+                        "index-4",
+                        indexSequenceIds4
+                    ),
+                    123543L
+                ),
+                new TransformCheckpoint(
+                    "t_id",
+                    123456759L,
+                    43L,
+                    Map.of(
+                        "index-1",
+                        indexSequenceIds1,
+                        "index-2",
+                        indexSequenceIds2,
+                        "index-3",
+                        indexSequenceIds3_1,
+                        "index-4",
+                        indexSequenceIds4
+                    ),
+                    123456789L
+                )
+            ),
+            equalTo(Collections.singleton("index-3"))
+        );
+
+        // all have changed
+        assertThat(
+            changeCollector.getIndicesToQuery(
+                new TransformCheckpoint("t_id", 123513L, 42L, Map.of("index-3", indexSequenceIds3, "index-4", indexSequenceIds4), 123543L),
+                new TransformCheckpoint(
+                    "t_id",
+                    123456759L,
+                    43L,
+                    Map.of("index-3", indexSequenceIds3_1, "index-4", indexSequenceIds4_1),
+                    123456789L
+                )
+            ),
+            equalTo(Set.of("index-3", "index-4"))
+        );
+
+        // a new index appeared
+        assertThat(
+            changeCollector.getIndicesToQuery(
+                new TransformCheckpoint(
+                    "t_id",
+                    123513L,
+                    42L,
+                    Map.of("index-2", indexSequenceIds2, "index-3", indexSequenceIds3, "index-4", indexSequenceIds4),
+                    123543L
+                ),
+                new TransformCheckpoint(
+                    "t_id",
+                    123456759L,
+                    43L,
+                    Map.of(
+                        "index-1",
+                        indexSequenceIds1,
+                        "index-2",
+                        indexSequenceIds2,
+                        "index-3",
+                        indexSequenceIds3_1,
+                        "index-4",
+                        indexSequenceIds4_1
+                    ),
+                    123456789L
+                )
+            ),
+            equalTo(Set.of("index-1", "index-3", "index-4"))
+        );
 
+        // index disappeared
         assertThat(
-            changeCollector.buildFilterQuery(123456789, 234567890),
-            is(equalTo(QueryBuilders.rangeQuery("timestamp").gte(123456789L).lt(234567890L).format("epoch_millis"))));
+            changeCollector.getIndicesToQuery(
+                new TransformCheckpoint(
+                    "t_id",
+                    123513L,
+                    42L,
+                    Map.of(
+                        "index-1",
+                        indexSequenceIds1,
+                        "index-2",
+                        indexSequenceIds2,
+                        "index-3",
+                        indexSequenceIds3,
+                        "index-4",
+                        indexSequenceIds4
+                    ),
+                    123543L
+                ),
+                new TransformCheckpoint(
+                    "t_id",
+                    123456759L,
+                    43L,
+                    Map.of("index-2", indexSequenceIds2, "index-3", indexSequenceIds3_1, "index-4", indexSequenceIds4_1),
+                    123456789L
+                )
+            ),
+            equalTo(Set.of("index-3", "index-4"))
+        );
     }
 }

+ 28 - 6
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollectorTests.java

@@ -20,6 +20,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInter
 import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.DateHistogramGroupSource;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.GeoTileGroupSourceTests;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig;
@@ -122,7 +123,10 @@ public class CompositeBucketsChangeCollectorTests extends ESTestCase {
 
         collector.processSearchResponse(response);
 
-        QueryBuilder queryBuilder = collector.buildFilterQuery(0, 0);
+        QueryBuilder queryBuilder = collector.buildFilterQuery(
+            new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 0L),
+            new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 0L)
+        );
         assertNotNull(queryBuilder);
         assertThat(queryBuilder, instanceOf(TermsQueryBuilder.class));
         assertThat(((TermsQueryBuilder) queryBuilder).values(), containsInAnyOrder("id1", "id2", "id3"));
@@ -142,7 +146,11 @@ public class CompositeBucketsChangeCollectorTests extends ESTestCase {
 
         ChangeCollector collector = CompositeBucketsChangeCollector.buildChangeCollector(groups, "timestamp");
 
-        QueryBuilder queryBuilder = collector.buildFilterQuery(66_666, 200_222);
+        QueryBuilder queryBuilder = collector.buildFilterQuery(
+            new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 66_666L),
+            new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 200_222L)
+        );
+
         assertNotNull(queryBuilder);
         assertThat(queryBuilder, instanceOf(RangeQueryBuilder.class));
         // rounded down
@@ -168,7 +176,11 @@ public class CompositeBucketsChangeCollectorTests extends ESTestCase {
         collector.processSearchResponse(response);
 
         // provide checkpoints, although they don't matter in this case
-        queryBuilder = collector.buildFilterQuery(66_666, 200_222);
+        queryBuilder = collector.buildFilterQuery(
+            new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 66_666L),
+            new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 200_222L)
+        );
+
         assertNotNull(queryBuilder);
         assertThat(queryBuilder, instanceOf(RangeQueryBuilder.class));
         // rounded down
@@ -190,7 +202,11 @@ public class CompositeBucketsChangeCollectorTests extends ESTestCase {
 
         // simulate the agg response, that should inject
         collector.processSearchResponse(response);
-        queryBuilder = collector.buildFilterQuery(66_666, 200_222);
+        queryBuilder = collector.buildFilterQuery(
+            new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 66_666L),
+            new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 200_222L)
+        );
+
         assertNotNull(queryBuilder);
 
         assertThat(queryBuilder, instanceOf(RangeQueryBuilder.class));
@@ -214,12 +230,18 @@ public class CompositeBucketsChangeCollectorTests extends ESTestCase {
 
         collector = CompositeBucketsChangeCollector.buildChangeCollector(groups, "timestamp");
 
-        queryBuilder = collector.buildFilterQuery(66_666, 200_222);
+        queryBuilder = collector.buildFilterQuery(
+            new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 66_666L),
+            new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 200_222L)
+        );
         assertNull(queryBuilder);
 
         collector = CompositeBucketsChangeCollector.buildChangeCollector(groups, "sync_timestamp");
 
-        queryBuilder = collector.buildFilterQuery(66_666, 200_222);
+        queryBuilder = collector.buildFilterQuery(
+            new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 66_666L),
+            new TransformCheckpoint("t_id", 42L, 42L, Collections.emptyMap(), 200_222L)
+        );
         assertNull(queryBuilder);
     }