|
|
@@ -17,7 +17,9 @@ import java.util.Map;
|
|
|
|
|
|
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
|
|
|
import static org.hamcrest.CoreMatchers.equalTo;
|
|
|
+import static org.hamcrest.CoreMatchers.not;
|
|
|
import static org.hamcrest.CoreMatchers.notNullValue;
|
|
|
+import static org.hamcrest.Matchers.empty;
|
|
|
import static org.hamcrest.Matchers.is;
|
|
|
|
|
|
public class DataFrameAuditorIT extends DataFrameRestTestCase {
|
|
|
@@ -49,7 +51,6 @@ public class DataFrameAuditorIT extends DataFrameRestTestCase {
|
|
|
setupUser(TEST_USER_NAME, Arrays.asList("data_frame_transforms_admin", DATA_ACCESS_ROLE));
|
|
|
}
|
|
|
|
|
|
- @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40594")
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void testAuditorWritesAudits() throws Exception {
|
|
|
String transformId = "simplePivotForAudit";
|
|
|
@@ -62,17 +63,26 @@ public class DataFrameAuditorIT extends DataFrameRestTestCase {
|
|
|
startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
|
|
|
|
|
|
// Make sure we wrote to the audit
|
|
|
- assertTrue(indexExists(DataFrameInternalIndex.AUDIT_INDEX));
|
|
|
- refreshIndex(DataFrameInternalIndex.AUDIT_INDEX);
|
|
|
- Request request = new Request("GET", DataFrameInternalIndex.AUDIT_INDEX + "/_search");
|
|
|
+ final Request request = new Request("GET", DataFrameInternalIndex.AUDIT_INDEX + "/_search");
|
|
|
request.setJsonEntity("{\"query\":{\"term\":{\"transform_id\":\"simplePivotForAudit\"}}}");
|
|
|
- Map<String, Object> response = entityAsMap(client().performRequest(request));
|
|
|
- Map<?, ?> hitRsp = (Map<?, ?>) ((List<?>) ((Map<?, ?>)response.get("hits")).get("hits")).get(0);
|
|
|
- Map<String, Object> source = (Map<String, Object>)hitRsp.get("_source");
|
|
|
- assertThat(source.get("transform_id"), equalTo(transformId));
|
|
|
- assertThat(source.get("level"), equalTo("info"));
|
|
|
- assertThat(source.get("message"), is(notNullValue()));
|
|
|
- assertThat(source.get("node_name"), is(notNullValue()));
|
|
|
- assertThat(source.get("timestamp"), is(notNullValue()));
|
|
|
+ assertBusy(() -> {
|
|
|
+ assertTrue(indexExists(DataFrameInternalIndex.AUDIT_INDEX));
|
|
|
+ });
|
|
|
+ // Since calls to write the Auditor are sent and forgot (async) we could have returned from the start,
|
|
|
+ // finished the job (as this is a very short DF job), all without the audit being fully written.
|
|
|
+ assertBusy(() -> {
|
|
|
+ refreshIndex(DataFrameInternalIndex.AUDIT_INDEX);
|
|
|
+ Map<String, Object> response = entityAsMap(client().performRequest(request));
|
|
|
+ List<?> hitList = ((List<?>) ((Map<?, ?>)response.get("hits")).get("hits"));
|
|
|
+ assertThat(hitList, is(not(empty())));
|
|
|
+ Map<?, ?> hitRsp = (Map<?, ?>) hitList.get(0);
|
|
|
+ Map<String, Object> source = (Map<String, Object>)hitRsp.get("_source");
|
|
|
+ assertThat(source.get("transform_id"), equalTo(transformId));
|
|
|
+ assertThat(source.get("level"), equalTo("info"));
|
|
|
+ assertThat(source.get("message"), is(notNullValue()));
|
|
|
+ assertThat(source.get("node_name"), is(notNullValue()));
|
|
|
+ assertThat(source.get("timestamp"), is(notNullValue()));
|
|
|
+ });
|
|
|
+
|
|
|
}
|
|
|
}
|