|
@@ -14,6 +14,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
|
|
|
import org.elasticsearch.action.search.SearchResponse;
|
|
|
import org.elasticsearch.action.search.ShardSearchFailure;
|
|
|
import org.elasticsearch.client.Client;
|
|
|
+import org.elasticsearch.common.Nullable;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
|
|
import org.elasticsearch.index.query.QueryBuilder;
|
|
@@ -43,6 +44,7 @@ import java.util.stream.Collectors;
|
|
|
import static org.hamcrest.Matchers.containsString;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.is;
|
|
|
+import static org.hamcrest.Matchers.nullValue;
|
|
|
import static org.mockito.Matchers.same;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.when;
|
|
@@ -82,7 +84,7 @@ public class DataFrameDataExtractorTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testTwoPageExtraction() throws IOException {
|
|
|
- TestExtractor dataExtractor = createExtractor(true);
|
|
|
+ TestExtractor dataExtractor = createExtractor(true, false);
|
|
|
|
|
|
// First batch
|
|
|
SearchResponse response1 = createSearchResponse(Arrays.asList(1_1, 1_2, 1_3), Arrays.asList(2_1, 2_2, 2_3));
|
|
@@ -142,7 +144,7 @@ public class DataFrameDataExtractorTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testRecoveryFromErrorOnSearchAfterRetry() throws IOException {
|
|
|
- TestExtractor dataExtractor = createExtractor(true);
|
|
|
+ TestExtractor dataExtractor = createExtractor(true, false);
|
|
|
|
|
|
// First search will fail
|
|
|
dataExtractor.setNextResponse(createResponseWithShardFailures());
|
|
@@ -176,7 +178,7 @@ public class DataFrameDataExtractorTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testErrorOnSearchTwiceLeadsToFailure() {
|
|
|
- TestExtractor dataExtractor = createExtractor(true);
|
|
|
+ TestExtractor dataExtractor = createExtractor(true, false);
|
|
|
|
|
|
// First search will fail
|
|
|
dataExtractor.setNextResponse(createResponseWithShardFailures());
|
|
@@ -189,7 +191,7 @@ public class DataFrameDataExtractorTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testRecoveryFromErrorOnContinueScrollAfterRetry() throws IOException {
|
|
|
- TestExtractor dataExtractor = createExtractor(true);
|
|
|
+ TestExtractor dataExtractor = createExtractor(true, false);
|
|
|
|
|
|
// Search will succeed
|
|
|
SearchResponse response1 = createSearchResponse(Arrays.asList(1_1), Arrays.asList(2_1));
|
|
@@ -238,7 +240,7 @@ public class DataFrameDataExtractorTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testErrorOnContinueScrollTwiceLeadsToFailure() throws IOException {
|
|
|
- TestExtractor dataExtractor = createExtractor(true);
|
|
|
+ TestExtractor dataExtractor = createExtractor(true, false);
|
|
|
|
|
|
// Search will succeed
|
|
|
SearchResponse response1 = createSearchResponse(Arrays.asList(1_1), Arrays.asList(2_1));
|
|
@@ -263,7 +265,7 @@ public class DataFrameDataExtractorTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testIncludeSourceIsFalseAndNoSourceFields() throws IOException {
|
|
|
- TestExtractor dataExtractor = createExtractor(false);
|
|
|
+ TestExtractor dataExtractor = createExtractor(false, false);
|
|
|
|
|
|
SearchResponse response = createSearchResponse(Arrays.asList(1_1), Arrays.asList(2_1));
|
|
|
dataExtractor.setNextResponse(response);
|
|
@@ -291,7 +293,7 @@ public class DataFrameDataExtractorTests extends ESTestCase {
|
|
|
ExtractedField.newField("field_1", Collections.singleton("keyword"), ExtractedField.ExtractionMethod.DOC_VALUE),
|
|
|
ExtractedField.newField("field_2", Collections.singleton("text"), ExtractedField.ExtractionMethod.SOURCE)));
|
|
|
|
|
|
- TestExtractor dataExtractor = createExtractor(false);
|
|
|
+ TestExtractor dataExtractor = createExtractor(false, false);
|
|
|
|
|
|
SearchResponse response = createSearchResponse(Arrays.asList(1_1), Arrays.asList(2_1));
|
|
|
dataExtractor.setNextResponse(response);
|
|
@@ -314,9 +316,77 @@ public class DataFrameDataExtractorTests extends ESTestCase {
|
|
|
assertThat(searchRequest, containsString("\"_source\":{\"includes\":[\"field_2\"],\"excludes\":[]}"));
|
|
|
}
|
|
|
|
|
|
- private TestExtractor createExtractor(boolean includeSource) {
|
|
|
+ public void testMissingValues_GivenShouldNotInclude() throws IOException {
|
|
|
+ TestExtractor dataExtractor = createExtractor(true, false);
|
|
|
+
|
|
|
+ // First and only batch
|
|
|
+ SearchResponse response1 = createSearchResponse(Arrays.asList(1_1, null, 1_3), Arrays.asList(2_1, 2_2, 2_3));
|
|
|
+ dataExtractor.setNextResponse(response1);
|
|
|
+
|
|
|
+ // Empty
|
|
|
+ SearchResponse lastAndEmptyResponse = createEmptySearchResponse();
|
|
|
+ dataExtractor.setNextResponse(lastAndEmptyResponse);
|
|
|
+
|
|
|
+ assertThat(dataExtractor.hasNext(), is(true));
|
|
|
+
|
|
|
+ // First batch
|
|
|
+ Optional<List<DataFrameDataExtractor.Row>> rows = dataExtractor.next();
|
|
|
+ assertThat(rows.isPresent(), is(true));
|
|
|
+ assertThat(rows.get().size(), equalTo(3));
|
|
|
+
|
|
|
+ assertThat(rows.get().get(0).getValues(), equalTo(new String[] {"11", "21"}));
|
|
|
+ assertThat(rows.get().get(1).getValues(), is(nullValue()));
|
|
|
+ assertThat(rows.get().get(2).getValues(), equalTo(new String[] {"13", "23"}));
|
|
|
+
|
|
|
+ assertThat(rows.get().get(0).shouldSkip(), is(false));
|
|
|
+ assertThat(rows.get().get(1).shouldSkip(), is(true));
|
|
|
+ assertThat(rows.get().get(2).shouldSkip(), is(false));
|
|
|
+
|
|
|
+ assertThat(dataExtractor.hasNext(), is(true));
|
|
|
+
|
|
|
+ // Third batch should return empty
|
|
|
+ rows = dataExtractor.next();
|
|
|
+ assertThat(rows.isEmpty(), is(true));
|
|
|
+ assertThat(dataExtractor.hasNext(), is(false));
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testMissingValues_GivenShouldInclude() throws IOException {
|
|
|
+ TestExtractor dataExtractor = createExtractor(true, true);
|
|
|
+
|
|
|
+ // First and only batch
|
|
|
+ SearchResponse response1 = createSearchResponse(Arrays.asList(1_1, null, 1_3), Arrays.asList(2_1, 2_2, 2_3));
|
|
|
+ dataExtractor.setNextResponse(response1);
|
|
|
+
|
|
|
+ // Empty
|
|
|
+ SearchResponse lastAndEmptyResponse = createEmptySearchResponse();
|
|
|
+ dataExtractor.setNextResponse(lastAndEmptyResponse);
|
|
|
+
|
|
|
+ assertThat(dataExtractor.hasNext(), is(true));
|
|
|
+
|
|
|
+ // First batch
|
|
|
+ Optional<List<DataFrameDataExtractor.Row>> rows = dataExtractor.next();
|
|
|
+ assertThat(rows.isPresent(), is(true));
|
|
|
+ assertThat(rows.get().size(), equalTo(3));
|
|
|
+
|
|
|
+ assertThat(rows.get().get(0).getValues(), equalTo(new String[] {"11", "21"}));
|
|
|
+ assertThat(rows.get().get(1).getValues(), equalTo(new String[] {"", "22"}));
|
|
|
+ assertThat(rows.get().get(2).getValues(), equalTo(new String[] {"13", "23"}));
|
|
|
+
|
|
|
+ assertThat(rows.get().get(0).shouldSkip(), is(false));
|
|
|
+ assertThat(rows.get().get(1).shouldSkip(), is(false));
|
|
|
+ assertThat(rows.get().get(2).shouldSkip(), is(false));
|
|
|
+
|
|
|
+ assertThat(dataExtractor.hasNext(), is(true));
|
|
|
+
|
|
|
+ // Third batch should return empty
|
|
|
+ rows = dataExtractor.next();
|
|
|
+ assertThat(rows.isEmpty(), is(true));
|
|
|
+ assertThat(dataExtractor.hasNext(), is(false));
|
|
|
+ }
|
|
|
+
|
|
|
+ private TestExtractor createExtractor(boolean includeSource, boolean includeRowsWithMissingValues) {
|
|
|
DataFrameDataExtractorContext context = new DataFrameDataExtractorContext(
|
|
|
- JOB_ID, extractedFields, indices, query, scrollSize, headers, includeSource);
|
|
|
+ JOB_ID, extractedFields, indices, query, scrollSize, headers, includeSource, includeRowsWithMissingValues);
|
|
|
return new TestExtractor(client, context);
|
|
|
}
|
|
|
|
|
@@ -326,11 +396,10 @@ public class DataFrameDataExtractorTests extends ESTestCase {
|
|
|
when(searchResponse.getScrollId()).thenReturn(randomAlphaOfLength(1000));
|
|
|
List<SearchHit> hits = new ArrayList<>();
|
|
|
for (int i = 0; i < field1Values.size(); i++) {
|
|
|
- SearchHit hit = new SearchHit(randomInt());
|
|
|
- SearchHitBuilder searchHitBuilder = new SearchHitBuilder(randomInt())
|
|
|
- .addField("field_1", Collections.singletonList(field1Values.get(i)))
|
|
|
- .addField("field_2", Collections.singletonList(field2Values.get(i)))
|
|
|
- .setSource("{\"field_1\":" + field1Values.get(i) + ",\"field_2\":" + field2Values.get(i) + "}");
|
|
|
+ SearchHitBuilder searchHitBuilder = new SearchHitBuilder(randomInt());
|
|
|
+ addField(searchHitBuilder, "field_1", field1Values.get(i));
|
|
|
+ addField(searchHitBuilder, "field_2", field2Values.get(i));
|
|
|
+ searchHitBuilder.setSource("{\"field_1\":" + field1Values.get(i) + ",\"field_2\":" + field2Values.get(i) + "}");
|
|
|
hits.add(searchHitBuilder.build());
|
|
|
}
|
|
|
SearchHits searchHits = new SearchHits(hits.toArray(new SearchHit[0]), new TotalHits(hits.size(), TotalHits.Relation.EQUAL_TO), 1);
|
|
@@ -338,6 +407,10 @@ public class DataFrameDataExtractorTests extends ESTestCase {
|
|
|
return searchResponse;
|
|
|
}
|
|
|
|
|
|
+ private static void addField(SearchHitBuilder searchHitBuilder, String field, @Nullable Number value) {
|
|
|
+ searchHitBuilder.addField(field, value == null ? Collections.emptyList() : Collections.singletonList(value));
|
|
|
+ }
|
|
|
+
|
|
|
private SearchResponse createEmptySearchResponse() {
|
|
|
return createSearchResponse(Collections.emptyList(), Collections.emptyList());
|
|
|
}
|