|
|
@@ -11,6 +11,7 @@ import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
|
|
|
import org.elasticsearch.common.bytes.BytesReference;
|
|
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
|
|
import org.elasticsearch.plugins.Plugin;
|
|
|
+import org.elasticsearch.tasks.TaskInfo;
|
|
|
import org.elasticsearch.xcontent.json.JsonXContent;
|
|
|
import org.elasticsearch.xpack.core.async.AsyncStopRequest;
|
|
|
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
|
|
|
@@ -28,10 +29,13 @@ import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
|
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
|
|
|
+import static org.elasticsearch.xpack.esql.action.AbstractCrossClusterTestCase.getDriverTasks;
|
|
|
import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.deleteAsyncId;
|
|
|
import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.startAsyncQuery;
|
|
|
import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.waitForCluster;
|
|
|
+import static org.hamcrest.Matchers.empty;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
+import static org.hamcrest.Matchers.not;
|
|
|
|
|
|
// This tests if enrich after stop works correctly
|
|
|
public class CrossClusterAsyncEnrichStopIT extends AbstractEnrichBasedCrossClusterTestCase {
|
|
|
@@ -87,10 +91,23 @@ public class CrossClusterAsyncEnrichStopIT extends AbstractEnrichBasedCrossClust
|
|
|
// wait until c1 is done
|
|
|
waitForCluster(client(), "c1", asyncExecutionId);
|
|
|
waitForCluster(client(), LOCAL_CLUSTER, asyncExecutionId);
|
|
|
+ // wait until remote reduce task starts on c2
|
|
|
+ assertBusy(() -> {
|
|
|
+ List<TaskInfo> tasks = getDriverTasks(client(REMOTE_CLUSTER_2));
|
|
|
+ List<TaskInfo> reduceTasks = tasks.stream().filter(t -> t.description().contains("_LuceneSourceOperator") == false).toList();
|
|
|
+ assertThat(reduceTasks, not(empty()));
|
|
|
+ });
|
|
|
|
|
|
// Run the stop request
|
|
|
var stopRequest = new AsyncStopRequest(asyncExecutionId);
|
|
|
var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest);
|
|
|
+ // wait until remote reduce tasks are gone
|
|
|
+ assertBusy(() -> {
|
|
|
+ List<TaskInfo> tasks = getDriverTasks(client(REMOTE_CLUSTER_2));
|
|
|
+ List<TaskInfo> reduceTasks = tasks.stream().filter(t -> t.description().contains("_LuceneSourceOperator") == false).toList();
|
|
|
+ assertThat(reduceTasks, empty());
|
|
|
+ });
|
|
|
+
|
|
|
// Allow the processing to proceed
|
|
|
SimplePauseFieldPlugin.allowEmitting.countDown();
|
|
|
|