Explorar o código

Generalize AsyncTwoPhaseIndexer first phase (#61739)

Current implementations of the indexer are using aggregations.
Thus each search step executes a search action. However,
we can generalize that to allow for any action that returns a `SearchResponse`.
This commit abstracts the search phase from the search action.
Dimitris Athanasiou %!s(int64=5) %!d(string=hai) anos
pai
achega
513117ae6a

+ 8 - 20
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

@@ -12,7 +12,6 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.RunOnce;
@@ -287,17 +286,6 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
      */
     protected abstract IterationResult<JobPosition> doProcess(SearchResponse searchResponse);
 
-    /**
-     * Called to build the next search request.
-     *
-     * In case the indexer is throttled waitTimeInNanos can be used as hint for building a less resource hungry
-     * search request.
-     *
-     * @param waitTimeInNanos duration in nanoseconds the indexer has waited due to throttling.
-     * @return SearchRequest to be passed to the search phase.
-     */
-    protected abstract SearchRequest buildSearchRequest(long waitTimeInNanos);
-
     /**
      * Called at startup after job has been triggered using {@link #maybeTriggerAsyncJob(long)} and the
      * internal state is {@link IndexerState#STARTED}.
@@ -310,15 +298,18 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
     protected abstract void onStart(long now, ActionListener<Boolean> listener);
 
     /**
-     * Executes the {@link SearchRequest} and calls <code>nextPhase</code> with the
+     * Executes the next search and calls <code>nextPhase</code> with the
      * response or the exception if an error occurs.
      *
-     * @param request
-     *            The search request to execute
+     * In case the indexer is throttled waitTimeInNanos can be used as hint for doing a less resource hungry
+     * search.
+     *
+     * @param waitTimeInNanos
+     *            Duration in nanoseconds the indexer has waited due to throttling
      * @param nextPhase
      *            Listener for the next phase
      */
-    protected abstract void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase);
+    protected abstract void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase);
 
     /**
      * Executes the {@link BulkRequest} and calls <code>nextPhase</code> with the
@@ -575,10 +566,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
         stats.markStartSearch();
         lastSearchStartTimeNanos = getTimeNanos();
 
-        // ensure that partial results are not accepted and cause a search failure
-        SearchRequest searchRequest = buildSearchRequest(waitTimeInNanos).allowPartialSearchResults(false);
-
-        doNextSearch(searchRequest, searchResponseListener);
+        doNextSearch(waitTimeInNanos, searchResponseListener);
     }
 
     /**

+ 10 - 33
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java

@@ -12,7 +12,6 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchResponseSections;
 import org.elasticsearch.action.search.ShardSearchFailure;
@@ -72,7 +71,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
         @Override
         protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {
             assertFalse("should not be called as stoppedBeforeFinished is false", stoppedBeforeFinished);
-            assertThat(step, equalTo(3));
+            assertThat(step, equalTo(2));
             ++step;
             return new IterationResult<>(Collections.emptyList(), 3, true);
         }
@@ -85,13 +84,6 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
             }
         }
 
-        @Override
-        protected SearchRequest buildSearchRequest(long waitTimeInNanos) {
-            assertThat(step, equalTo(1));
-            ++step;
-            return new SearchRequest();
-        }
-
         @Override
         protected void onStart(long now, ActionListener<Boolean> listener) {
             assertThat(step, equalTo(0));
@@ -100,8 +92,8 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
         }
 
         @Override
-        protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
-            assertThat(step, equalTo(2));
+        protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
+            assertThat(step, equalTo(1));
             ++step;
             final SearchResponseSections sections = new SearchResponseSections(
                 new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0), null,
@@ -121,7 +113,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
         protected void doSaveState(IndexerState state, Integer position, Runnable next) {
             // for stop before finished we do not know if its stopped before are after the search
             if (stoppedBeforeFinished == false) {
-                assertThat(step, equalTo(5));
+                assertThat(step, equalTo(4));
             }
             ++step;
             next.run();
@@ -134,7 +126,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
 
         @Override
         protected void onFinish(ActionListener<Void> listener) {
-            assertThat(step, equalTo(4));
+            assertThat(step, equalTo(3));
             ++step;
             listener.onResponse(null);
             assertTrue(isFinished.compareAndSet(false, true));
@@ -164,7 +156,6 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
         // counters
         private volatile boolean started = false;
         private volatile boolean waitingForLatch = false;
-        private volatile int searchRequests = 0;
         private volatile int searchOps = 0;
         private volatile int processOps = 0;
         private volatile int bulkOps = 0;
@@ -208,12 +199,6 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
             return new IterationResult<>(Collections.singletonList(new IndexRequest()), processOps, false);
         }
 
-        @Override
-        protected SearchRequest buildSearchRequest(long waitTimeInNanos) {
-            ++searchRequests;
-            return new SearchRequest();
-        }
-
         @Override
         protected void onStart(long now, ActionListener<Boolean> listener) {
             started = true;
@@ -238,7 +223,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
         }
 
         @Override
-        protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
+        protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
             ++searchOps;
             final SearchResponseSections sections = new SearchResponseSections(
                 new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0), null,
@@ -289,7 +274,6 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
 
         public void assertCounters() {
             assertTrue(started);
-            assertEquals(5L, searchRequests);
             assertEquals(5L, searchOps);
             assertEquals(5L, processOps);
             assertEquals(2L, bulkOps);
@@ -318,13 +302,6 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
             return null;
         }
 
-        @Override
-        protected SearchRequest buildSearchRequest(long waitTimeInNanos) {
-            assertThat(step, equalTo(1));
-            ++step;
-            return new SearchRequest();
-        }
-
         @Override
         protected void onStart(long now, ActionListener<Boolean> listener) {
             assertThat(step, equalTo(0));
@@ -333,7 +310,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
         }
 
         @Override
-        protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
+        protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
             throw new RuntimeException("Failed to build search request");
         }
 
@@ -349,7 +326,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
 
         @Override
         protected void onFailure(Exception exc) {
-            assertThat(step, equalTo(2));
+            assertThat(step, equalTo(1));
             ++step;
             assertTrue(isFinished.compareAndSet(false, true));
         }
@@ -414,7 +391,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
             assertThat(indexer.getPosition(), equalTo(3));
 
             assertFalse(isStopped.get());
-            assertThat(indexer.getStep(), equalTo(6));
+            assertThat(indexer.getStep(), equalTo(5));
             assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
             assertThat(indexer.getStats().getNumPages(), equalTo(1L));
             assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L));
@@ -434,7 +411,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
             assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
             assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
             assertBusy(() -> assertTrue(isFinished.get()), 10000, TimeUnit.SECONDS);
-            assertThat(indexer.getStep(), equalTo(3));
+            assertThat(indexer.getStep(), equalTo(2));
         } finally {
             ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
         }

+ 4 - 3
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java

@@ -112,8 +112,7 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Obj
         }
     }
 
-    @Override
-    protected SearchRequest buildSearchRequest(long waitTimeInNanos) {
+    protected SearchRequest buildSearchRequest() {
         final Map<String, Object> position = getPosition();
         SearchSourceBuilder searchSource = new SearchSourceBuilder()
                 .size(0)
@@ -121,7 +120,9 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Obj
                 // make sure we always compute complete buckets that appears before the configured delay
                 .query(createBoundaryQuery(position))
                 .aggregation(compositeBuilder.aggregateAfter(position));
-        return new SearchRequest(job.getConfig().getIndexPattern()).source(searchSource);
+        return new SearchRequest(job.getConfig().getIndexPattern())
+                .allowPartialSearchResults(false)
+                .source(searchSource);
     }
 
     @Override

+ 3 - 4
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java

@@ -13,7 +13,6 @@ import org.elasticsearch.action.bulk.BulkAction;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.search.SearchAction;
-import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.ParentTaskAssigningClient;
@@ -108,9 +107,9 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
         }
 
         @Override
-        protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
-            ClientHelper.executeWithHeadersAsync(job.getHeaders(), ClientHelper.ROLLUP_ORIGIN, client, SearchAction.INSTANCE, request,
-                    nextPhase);
+        protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
+            ClientHelper.executeWithHeadersAsync(job.getHeaders(), ClientHelper.ROLLUP_ORIGIN, client, SearchAction.INSTANCE,
+                    buildSearchRequest(), nextPhase);
         }
 
         @Override

+ 2 - 1
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java

@@ -634,7 +634,8 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase {
         }
 
         @Override
-        protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> listener) {
+        protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> listener) {
+            SearchRequest request = buildSearchRequest();
             assertNotNull(request.source());
 
             // extract query

+ 8 - 8
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java

@@ -63,7 +63,7 @@ public class RollupIndexerStateTests extends ESTestCase {
         }
 
         @Override
-        protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
+        protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
             // TODO Should use InternalComposite constructor but it is package protected in core.
             Aggregations aggs = new Aggregations(Collections.singletonList(new CompositeAggregation() {
                 @Override
@@ -149,14 +149,14 @@ public class RollupIndexerStateTests extends ESTestCase {
         }
 
         @Override
-        protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
+        protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
             assert latch != null;
             try {
                 latch.await();
             } catch (InterruptedException e) {
                 throw new IllegalStateException(e);
             }
-            super.doNextSearch(request, nextPhase);
+            super.doNextSearch(waitTimeInNanos, nextPhase);
         }
     }
 
@@ -189,7 +189,7 @@ public class RollupIndexerStateTests extends ESTestCase {
         }
 
         @Override
-        protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
+        protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
             assert latch != null;
             try {
                 latch.await();
@@ -198,7 +198,7 @@ public class RollupIndexerStateTests extends ESTestCase {
             }
 
             try {
-                SearchResponse response = searchFunction.apply(request);
+                SearchResponse response = searchFunction.apply(buildSearchRequest());
                 nextPhase.onResponse(response);
             } catch (Exception e) {
                 nextPhase.onFailure(e);
@@ -364,14 +364,14 @@ public class RollupIndexerStateTests extends ESTestCase {
                 }
 
                 @Override
-                protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
+                protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
                     try {
                         latch.await();
                     } catch (InterruptedException e) {
                         throw new IllegalStateException(e);
                     }
                     state.set(IndexerState.ABORTING);   // <-- Set to aborting right before we return the (empty) search response
-                    super.doNextSearch(request, nextPhase);
+                    super.doNextSearch(waitTimeInNanos, nextPhase);
                 }
 
                 @Override
@@ -412,7 +412,7 @@ public class RollupIndexerStateTests extends ESTestCase {
                 }
 
                 @Override
-                protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
+                protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
                     try {
                         doNextSearchLatch.await();
                     } catch (InterruptedException e) {

+ 2 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java

@@ -126,7 +126,7 @@ class ClientTransformIndexer extends TransformIndexer {
     }
 
     @Override
-    protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
+    protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
         if (context.getTaskState() == TransformTaskState.FAILED) {
             logger.debug("[{}] attempted to search while failed.", getJobId());
             nextPhase.onFailure(new ElasticsearchException("Attempted to do a search request for failed transform [{}].", getJobId()));
@@ -137,7 +137,7 @@ class ClientTransformIndexer extends TransformIndexer {
             ClientHelper.TRANSFORM_ORIGIN,
             client,
             SearchAction.INSTANCE,
-            request,
+            buildSearchRequest(),
             nextPhase
         );
     }

+ 1 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

@@ -700,8 +700,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
         return queryBuilder;
     }
 
-    @Override
-    protected SearchRequest buildSearchRequest(long waitTimeInNanos) {
+    protected SearchRequest buildSearchRequest() {
         assert nextCheckpoint != null;
 
         SearchRequest searchRequest = new SearchRequest(getConfig().getSource().getIndex()).allowPartialSearchResults(false)

+ 2 - 2
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java

@@ -141,7 +141,7 @@ public class TransformIndexerTests extends ESTestCase {
         }
 
         @Override
-        protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
+        protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse> nextPhase) {
             assert latch != null;
             try {
                 latch.await();
@@ -150,7 +150,7 @@ public class TransformIndexerTests extends ESTestCase {
             }
 
             try {
-                SearchResponse response = searchFunction.apply(request);
+                SearchResponse response = searchFunction.apply(buildSearchRequest());
                 nextPhase.onResponse(response);
             } catch (Exception e) {
                 nextPhase.onFailure(e);