Browse Source

Rollup/DataFrame: disallow partial results (#41114)

disallow partial results in rollup and data frame, after this change the client throws an error directly
replacing the previous runtime exception thrown, allowing better error handling in implementations.
Hendrik Muhs 6 years ago
parent
commit
36ea605177

+ 13 - 11
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

@@ -14,7 +14,6 @@ import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
@@ -150,8 +149,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
                 // fire off the search. Note this is async, the method will return from here
                 executor.execute(() -> {
                     onStart(now, ActionListener.wrap(r -> {
-                        stats.markStartSearch();
-                        doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
+                        nextSearch(ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
                     }, e -> {
                         finishAndSetState();
                         onFailure(e);
@@ -305,10 +303,9 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
             if (checkState(getState()) == false) {
                 return;
             }
-            if (searchResponse.getShardFailures().length != 0) {
-                throw new RuntimeException("Shard failures encountered while running indexer for job [" + getJobId() + "]: "
-                        + Arrays.toString(searchResponse.getShardFailures()));
-            }
+
+            // allowPartialSearchResults is set to false, so we should never see shard failures here
+            assert (searchResponse.getShardFailures().length == 0);
 
             stats.incrementNumPages(1);
             IterationResult<JobPosition> iterationResult = doProcess(searchResponse);
@@ -362,18 +359,23 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
             // TODO probably something more intelligent than every-50 is needed
             if (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0) {
                 doSaveState(IndexerState.INDEXING, position, () -> {
-                    stats.markStartSearch();
-                    doNextSearch(buildSearchRequest(), listener);
+                    nextSearch(listener);
                 });
             } else {
-                stats.markStartSearch();
-                doNextSearch(buildSearchRequest(), listener);
+                nextSearch(listener);
             }
         } catch (Exception e) {
             finishWithIndexingFailure(e);
         }
     }
 
+    private void nextSearch(ActionListener<SearchResponse> listener) {
+        stats.markStartSearch();
+        // ensure that partial results are not accepted and cause a search failure
+        SearchRequest searchRequest = buildSearchRequest().allowPartialSearchResults(false);
+        doNextSearch(searchRequest, listener);
+    }
+
     /**
      * Checks the {@link IndexerState} and returns false if the execution should be
      * stopped.

+ 2 - 2
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java

@@ -72,7 +72,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
         protected SearchRequest buildSearchRequest() {
             assertThat(step, equalTo(1));
             ++step;
-            return null;
+            return new SearchRequest();
         }
 
         @Override
@@ -151,7 +151,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
         protected SearchRequest buildSearchRequest() {
             assertThat(step, equalTo(1));
             ++step;
-            return null;
+            return new SearchRequest();
         }
 
         @Override

+ 11 - 5
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java

@@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.search.SearchPhaseExecutionException;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchResponseSections;
@@ -197,7 +198,13 @@ public class RollupIndexerStateTests extends ESTestCase {
             } catch (InterruptedException e) {
                 throw new IllegalStateException(e);
             }
-            nextPhase.onResponse(searchFunction.apply(request));
+
+            try {
+                SearchResponse response = searchFunction.apply(request);
+                nextPhase.onResponse(response);
+            } catch (Exception e) {
+                nextPhase.onFailure(e);
+            }
         }
 
         @Override
@@ -800,15 +807,14 @@ public class RollupIndexerStateTests extends ESTestCase {
         RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
         Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> {
-            ShardSearchFailure[] failures = new ShardSearchFailure[]{new ShardSearchFailure(new RuntimeException("failed"))};
-            return new SearchResponse(null, null, 1, 1, 0, 0,
-                failures, null);
+            throw new SearchPhaseExecutionException("query", "Partial shards failure",
+                    new ShardSearchFailure[] { new ShardSearchFailure(new RuntimeException("failed")) });
         };
 
         Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
 
         Consumer<Exception> failureConsumer = e -> {
-            assertThat(e.getMessage(), startsWith("Shard failures encountered while running indexer for job"));
+            assertThat(e.getMessage(), startsWith("Partial shards failure"));
             isFinished.set(true);
         };