|
|
@@ -23,7 +23,6 @@ import org.elasticsearch.plugins.Plugin;
|
|
|
import org.elasticsearch.reindex.ReindexPlugin;
|
|
|
import org.elasticsearch.test.AbstractMultiClustersTestCase;
|
|
|
import org.elasticsearch.test.disruption.NetworkDisruption;
|
|
|
-import org.elasticsearch.test.junit.annotations.TestIssueLogging;
|
|
|
import org.elasticsearch.xpack.core.XPackSettings;
|
|
|
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
|
|
|
import org.elasticsearch.xpack.core.ml.MachineLearningField;
|
|
|
@@ -102,10 +101,6 @@ public class DatafeedCcsIT extends AbstractMultiClustersTestCase {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- @TestIssueLogging(
|
|
|
- value = "org.elasticsearch.xpack.ml.datafeed:DEBUG",
|
|
|
- issueUrl = "https://github.com/elastic/elasticsearch/issues/84290"
|
|
|
- )
|
|
|
public void testDatafeedWithCcsRemoteHealthy() throws Exception {
|
|
|
setSkipUnavailable(randomBoolean());
|
|
|
String jobId = "ccs-healthy-job";
|
|
|
@@ -113,14 +108,27 @@ public class DatafeedCcsIT extends AbstractMultiClustersTestCase {
|
|
|
long numDocs = randomIntBetween(32, 2048);
|
|
|
long endTimeMs = indexRemoteDocs(numDocs);
|
|
|
setupJobAndDatafeed(jobId, datafeedId, endTimeMs);
|
|
|
- // Datafeed should complete and auto-close the job
|
|
|
- // Use a 60 second timeout because multiple suites run in parallel in CI which slows things down a lot
|
|
|
- assertBusy(() -> {
|
|
|
- JobStats jobStats = getJobStats(jobId);
|
|
|
- assertThat(jobStats.getState(), is(JobState.CLOSED));
|
|
|
- assertThat(jobStats.getDataCounts().getProcessedRecordCount(), is(numDocs));
|
|
|
- }, 60, TimeUnit.SECONDS);
|
|
|
- clearSkipUnavailable();
|
|
|
+ try {
|
|
|
+ // Datafeed should complete and auto-close the job.
|
|
|
+ // Use a 3 minute timeout because multiple suites run in parallel in CI which slows things down a lot.
|
|
|
+ // (Usually the test completes within 1 minute and much faster than that if run locally with nothing major running in parallel.)
|
|
|
+ assertBusy(() -> {
|
|
|
+ JobStats jobStats = getJobStats(jobId);
|
|
|
+ assertThat(jobStats.getState(), is(JobState.CLOSED));
|
|
|
+ assertThat(jobStats.getDataCounts().getProcessedRecordCount(), is(numDocs));
|
|
|
+ }, 3, TimeUnit.MINUTES);
|
|
|
+ } catch (AssertionError ae) {
|
|
|
+ // On failure close the job, because otherwise there will be masses of noise in the logs from the job fighting with the
|
|
|
+ // post-test cleanup which obscures the original failure. Force closing the job also stops the datafeed if necessary.
|
|
|
+ try {
|
|
|
+ client(LOCAL_CLUSTER).execute(CloseJobAction.INSTANCE, new CloseJobAction.Request(jobId).setForce(true)).actionGet();
|
|
|
+ } catch (Exception e) {
|
|
|
+ ae.addSuppressed(e);
|
|
|
+ }
|
|
|
+ throw ae;
|
|
|
+ } finally {
|
|
|
+ clearSkipUnavailable();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/84268")
|
|
|
@@ -148,13 +156,14 @@ public class DatafeedCcsIT extends AbstractMultiClustersTestCase {
|
|
|
}
|
|
|
});
|
|
|
networkDisruption.removeAndEnsureHealthy(cluster(REMOTE_CLUSTER));
|
|
|
- // Datafeed should eventually read all the docs
|
|
|
- // Use a 60 second timeout because multiple suites run in parallel in CI which slows things down a lot
|
|
|
+ // Datafeed should eventually read all the docs.
|
|
|
+ // Use a 3 minute timeout because multiple suites run in parallel in CI which slows things down a lot.
|
|
|
+ // (Usually the test completes within 1 minute and much faster than that if run locally with nothing major running in parallel.)
|
|
|
assertBusy(() -> {
|
|
|
JobStats jobStats = getJobStats(jobId);
|
|
|
assertThat(jobStats.getState(), is(JobState.OPENED));
|
|
|
assertThat(jobStats.getDataCounts().getProcessedRecordCount(), is(numDocs));
|
|
|
- }, 60, TimeUnit.SECONDS);
|
|
|
+ }, 3, TimeUnit.MINUTES);
|
|
|
} finally {
|
|
|
client(LOCAL_CLUSTER).execute(StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request(datafeedId)).actionGet();
|
|
|
client(LOCAL_CLUSTER).execute(CloseJobAction.INSTANCE, new CloseJobAction.Request(jobId)).actionGet();
|