|
|
@@ -15,7 +15,6 @@ import org.elasticsearch.action.ActionRequest;
|
|
|
import org.elasticsearch.action.ActionRequestBuilder;
|
|
|
import org.elasticsearch.action.ActionResponse;
|
|
|
import org.elasticsearch.action.index.IndexResponse;
|
|
|
-import org.elasticsearch.action.support.PlainActionFuture;
|
|
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
|
|
@@ -29,7 +28,6 @@ import org.elasticsearch.cluster.routing.RoutingTable;
|
|
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
|
|
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
-import org.elasticsearch.common.Priority;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
import org.elasticsearch.index.Index;
|
|
|
@@ -134,43 +132,15 @@ public class RareClusterStateIT extends ESIntegTestCase {
|
|
|
// Wait for no publication in progress to not accidentally cancel a publication different from the one triggered by the given
|
|
|
// request.
|
|
|
final Coordinator masterCoordinator = internalCluster().getCurrentMasterNodeInstance(Coordinator.class);
|
|
|
-
|
|
|
- ensureNoPendingMasterTasks().actionGet(TimeValue.timeValueSeconds(30));
|
|
|
+ assertBusy(() -> {
|
|
|
+ assertFalse(masterCoordinator.publicationInProgress());
|
|
|
+ final long applierVersion = masterCoordinator.getApplierState().version();
|
|
|
+ for (Coordinator instance : internalCluster().getInstances(Coordinator.class)) {
|
|
|
+ assertEquals(instance.getApplierState().version(), applierVersion);
|
|
|
+ }
|
|
|
+ });
|
|
|
ActionFuture<Res> future = req.execute();
|
|
|
-
|
|
|
- // cancel the first cluster state update produced by the request above
|
|
|
assertBusy(() -> assertTrue(masterCoordinator.cancelCommittedPublication()));
|
|
|
- // await and cancel any other forked cluster state updates that might be produced by the request
|
|
|
- var task = ensureNoPendingMasterTasks();
|
|
|
- while (task.isDone() == false) {
|
|
|
- masterCoordinator.cancelCommittedPublication();
|
|
|
- Thread.onSpinWait();
|
|
|
- }
|
|
|
- task.actionGet(TimeValue.timeValueSeconds(30));
|
|
|
-
|
|
|
- return future;
|
|
|
- }
|
|
|
-
|
|
|
- private PlainActionFuture<Void> ensureNoPendingMasterTasks() {
|
|
|
- var future = new PlainActionFuture<Void>();
|
|
|
- internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
|
|
|
- .submitUnbatchedStateUpdateTask("test", new ClusterStateUpdateTask(Priority.LANGUID, TimeValue.timeValueSeconds(30)) {
|
|
|
-
|
|
|
- @Override
|
|
|
- public ClusterState execute(ClusterState currentState) {
|
|
|
- return currentState;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
|
|
|
- future.onResponse(null);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(Exception e) {
|
|
|
- future.onFailure(e);
|
|
|
- }
|
|
|
- });
|
|
|
return future;
|
|
|
}
|
|
|
|
|
|
@@ -188,9 +158,8 @@ public class RareClusterStateIT extends ESIntegTestCase {
|
|
|
indexDoc("test", "1");
|
|
|
refresh();
|
|
|
disruption.startDisrupting();
|
|
|
- logger.info("--> delete index");
|
|
|
+ logger.info("--> delete index and recreate it");
|
|
|
executeAndCancelCommittedPublication(client().admin().indices().prepareDelete("test").setTimeout("0s")).get(10, TimeUnit.SECONDS);
|
|
|
- logger.info("--> and recreate it");
|
|
|
executeAndCancelCommittedPublication(
|
|
|
prepareCreate("test").setSettings(
|
|
|
Settings.builder()
|