Browse Source

[ML] Fix datafeed preview with remote indices (#81099)

In #77109 a bug was fixed with regard to `date_nanos` time fields
and the preview datafeed API. However, that fix introduces a new bug.
As we are calling the field caps API to find out whether the time field
is `date_nanos`, we are setting the datafeed indices on the request.
This may result to erroneous behaviour on local indices and it certainly
will result to an error if the datafeed's indices are remote.

This commit fixes that problem by setting the datafeed's indices on the
field caps request.
Dimitris Athanasiou 3 years ago
parent
commit
3d0c9efb97

+ 7 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java

@@ -119,7 +119,7 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
                 new DatafeedTimingStatsReporter(new DatafeedTimingStats(datafeedConfig.getJobId()), (ts, refreshPolicy) -> {}),
                 listener.delegateFailure((l, dataExtractorFactory) -> {
                     isDateNanos(
-                        previewDatafeedConfig.getHeaders(),
+                        previewDatafeedConfig,
                         job.getDataDescription().getTimeField(),
                         listener.delegateFailure((l2, isDateNanos) -> {
                             DataExtractor dataExtractor = dataExtractorFactory.newExtractor(
@@ -151,13 +151,16 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
         return previewDatafeed;
     }
 
-    private void isDateNanos(Map<String, String> headers, String timeField, ActionListener<Boolean> listener) {
+    private void isDateNanos(DatafeedConfig datafeed, String timeField, ActionListener<Boolean> listener) {
+        FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest();
+        fieldCapabilitiesRequest.indices(datafeed.getIndices().toArray(new String[0])).indicesOptions(datafeed.getIndicesOptions());
+        fieldCapabilitiesRequest.fields(timeField);
         executeWithHeadersAsync(
-            headers,
+            datafeed.getHeaders(),
             ML_ORIGIN,
             client,
             FieldCapabilitiesAction.INSTANCE,
-            new FieldCapabilitiesRequest().fields(timeField),
+            fieldCapabilitiesRequest,
             ActionListener.wrap(fieldCapsResponse -> {
                 Map<String, FieldCapabilities> timeFieldCaps = fieldCapsResponse.getField(timeField);
                 listener.onResponse(timeFieldCaps.keySet().contains(DateFieldMapper.DATE_NANOS_CONTENT_TYPE));

+ 10 - 17
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedActionTests.java

@@ -17,7 +17,6 @@ import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
 import org.junit.Before;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.ByteArrayInputStream;
@@ -51,21 +50,15 @@ public class TransportPreviewDatafeedActionTests extends ESTestCase {
         dataExtractor = mock(DataExtractor.class);
         actionListener = mock(ActionListener.class);
 
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocationOnMock) {
-                PreviewDatafeedAction.Response response = (PreviewDatafeedAction.Response) invocationOnMock.getArguments()[0];
-                capturedResponse = response.toString();
-                return null;
-            }
+        doAnswer((Answer<Void>) invocationOnMock -> {
+            PreviewDatafeedAction.Response response = (PreviewDatafeedAction.Response) invocationOnMock.getArguments()[0];
+            capturedResponse = response.toString();
+            return null;
         }).when(actionListener).onResponse(any());
 
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocationOnMock) {
-                capturedFailure = (Exception) invocationOnMock.getArguments()[0];
-                return null;
-            }
+        doAnswer((Answer<Void>) invocationOnMock -> {
+            capturedFailure = (Exception) invocationOnMock.getArguments()[0];
+            return null;
         }).when(actionListener).onFailure(any());
     }
 
@@ -95,7 +88,7 @@ public class TransportPreviewDatafeedActionTests extends ESTestCase {
         assertThat(previewDatafeed.getChunkingConfig(), equalTo(datafeed.build().getChunkingConfig()));
     }
 
-    public void testPreviewDatafed_GivenEmptyStream() throws IOException {
+    public void testPreviewDatafeed_GivenEmptyStream() throws IOException {
         when(dataExtractor.next()).thenReturn(Optional.empty());
 
         TransportPreviewDatafeedAction.previewDatafeed(dataExtractor, actionListener);
@@ -105,7 +98,7 @@ public class TransportPreviewDatafeedActionTests extends ESTestCase {
         verify(dataExtractor).cancel();
     }
 
-    public void testPreviewDatafed_GivenNonEmptyStream() throws IOException {
+    public void testPreviewDatafeed_GivenNonEmptyStream() throws IOException {
         String streamAsString = "{\"a\":1, \"b\":2} {\"c\":3, \"d\":4}\n{\"e\":5, \"f\":6}";
         InputStream stream = new ByteArrayInputStream(streamAsString.getBytes(StandardCharsets.UTF_8));
         when(dataExtractor.next()).thenReturn(Optional.of(stream));
@@ -117,7 +110,7 @@ public class TransportPreviewDatafeedActionTests extends ESTestCase {
         verify(dataExtractor).cancel();
     }
 
-    public void testPreviewDatafed_GivenFailure() throws IOException {
+    public void testPreviewDatafeed_GivenFailure() throws IOException {
         doThrow(new RuntimeException("failed")).when(dataExtractor).next();
 
         TransportPreviewDatafeedAction.previewDatafeed(dataExtractor, actionListener);