|
@@ -32,17 +32,21 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
|
|
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
|
|
|
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
|
|
|
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
|
|
|
+import org.hamcrest.Matcher;
|
|
|
+import org.hamcrest.Matchers;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
import static org.hamcrest.Matchers.anyOf;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
+import static org.hamcrest.Matchers.hasItems;
|
|
|
+import static org.hamcrest.Matchers.hasSize;
|
|
|
import static org.hamcrest.Matchers.is;
|
|
|
import static org.hamcrest.Matchers.nullValue;
|
|
|
-import static org.hamcrest.Matchers.startsWith;
|
|
|
|
|
|
/**
|
|
|
* Base class of ML integration tests that use a native data_frame_analytics process
|
|
@@ -187,18 +191,16 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
|
|
|
// Since calls to write the AbstractAuditor are sent and forgot (async) we could have returned from the start,
|
|
|
// finished the job (as this is a very short analytics job), all without the audit being fully written.
|
|
|
assertBusy(() -> assertTrue(indexExists(AuditorField.NOTIFICATIONS_INDEX)));
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ Matcher<String>[] itemMatchers = Arrays.stream(expectedAuditMessagePrefixes).map(Matchers::startsWith).toArray(Matcher[]::new);
|
|
|
assertBusy(() -> {
|
|
|
- String[] actualAuditMessages = fetchAllAuditMessages(configId);
|
|
|
- assertThat("Messages: " + Arrays.toString(actualAuditMessages), actualAuditMessages.length,
|
|
|
- equalTo(expectedAuditMessagePrefixes.length));
|
|
|
- for (int i = 0; i < actualAuditMessages.length; i++) {
|
|
|
- assertThat(actualAuditMessages[i], startsWith(expectedAuditMessagePrefixes[i]));
|
|
|
- }
|
|
|
+ final List<String> allAuditMessages = fetchAllAuditMessages(configId);
|
|
|
+ assertThat(allAuditMessages, hasItems(itemMatchers));
|
|
|
+ assertThat("Messages: " + allAuditMessages, allAuditMessages, hasSize(expectedAuditMessagePrefixes.length));
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- private static String[] fetchAllAuditMessages(String dataFrameAnalyticsId) throws Exception {
|
|
|
+ private static List<String> fetchAllAuditMessages(String dataFrameAnalyticsId) {
|
|
|
RefreshRequest refreshRequest = new RefreshRequest(AuditorField.NOTIFICATIONS_INDEX);
|
|
|
RefreshResponse refreshResponse = client().execute(RefreshAction.INSTANCE, refreshRequest).actionGet();
|
|
|
assertThat(refreshResponse.getStatus().getStatus(), anyOf(equalTo(200), equalTo(201)));
|
|
@@ -212,6 +214,6 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
|
|
|
|
|
|
return Arrays.stream(searchResponse.getHits().getHits())
|
|
|
.map(hit -> (String) hit.getSourceAsMap().get("message"))
|
|
|
- .toArray(String[]::new);
|
|
|
+ .collect(Collectors.toList());
|
|
|
}
|
|
|
}
|