|
@@ -488,6 +488,7 @@ public class SnapshotShutdownIT extends AbstractSnapshotIntegTestCase {
|
|
|
final var otherNode = internalCluster().startDataOnlyNode();
|
|
|
final var otherIndex = randomIdentifier();
|
|
|
createIndexWithContent(otherIndex, indexSettings(numShards, 0).put(REQUIRE_NODE_NAME_SETTING, otherNode).build());
|
|
|
+ indexAllShardsToAnEqualOrGreaterMinimumSize(otherIndex, ByteSizeValue.of(2, ByteSizeUnit.KB).getBytes());
|
|
|
blockDataNode(repoName, otherNode);
|
|
|
|
|
|
final var nodeForRemoval = internalCluster().startDataOnlyNode(
|
|
@@ -498,6 +499,7 @@ public class SnapshotShutdownIT extends AbstractSnapshotIntegTestCase {
|
|
|
final var indexName = randomIdentifier();
|
|
|
createIndexWithContent(indexName, indexSettings(numShards, 0).put(REQUIRE_NODE_NAME_SETTING, nodeForRemoval).build());
|
|
|
indexAllShardsToAnEqualOrGreaterMinimumSize(indexName, ByteSizeValue.of(2, ByteSizeUnit.KB).getBytes());
|
|
|
+ logger.info("---> nodeForRemovalId: " + nodeForRemovalId + ", numShards: " + numShards);
|
|
|
|
|
|
// Start the snapshot with blocking in place on the data node not to allow shard snapshots to finish yet.
|
|
|
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
|
|
@@ -507,7 +509,21 @@ public class SnapshotShutdownIT extends AbstractSnapshotIntegTestCase {
|
|
|
|
|
|
waitForBlock(otherNode, repoName);
|
|
|
|
|
|
- logger.info("---> nodeForRemovalId: " + nodeForRemovalId + ", numShards: " + numShards);
|
|
|
+ // Block on the master when a shard snapshot request comes in, until we can verify that the Tracker saw the outgoing request.
|
|
|
+ final CountDownLatch snapshotStatusUpdateLatch = new CountDownLatch(1);
|
|
|
+ final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName());
|
|
|
+ masterTransportService.addRequestHandlingBehavior(
|
|
|
+ SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
|
|
|
+ (handler, request, channel, task) -> masterTransportService.getThreadPool().generic().execute(() -> {
|
|
|
+ safeAwait(snapshotStatusUpdateLatch);
|
|
|
+ try {
|
|
|
+ handler.messageReceived(request, channel, task);
|
|
|
+ } catch (Exception e) {
|
|
|
+ fail(e);
|
|
|
+ }
|
|
|
+ })
|
|
|
+ );
|
|
|
+
|
|
|
mockLog.addExpectation(
|
|
|
new MockLog.SeenEventExpectation(
|
|
|
"SnapshotShutdownProgressTracker start log message",
|
|
@@ -555,21 +571,6 @@ public class SnapshotShutdownIT extends AbstractSnapshotIntegTestCase {
|
|
|
mockLog.awaitAllExpectationsMatched();
|
|
|
resetMockLog();
|
|
|
|
|
|
- // Block on the master when a shard snapshot request comes in, until we can verify that the Tracker saw the outgoing request.
|
|
|
- final CountDownLatch snapshotStatusUpdateLatch = new CountDownLatch(1);
|
|
|
- final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName());
|
|
|
- masterTransportService.addRequestHandlingBehavior(
|
|
|
- SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
|
|
|
- (handler, request, channel, task) -> masterTransportService.getThreadPool().generic().execute(() -> {
|
|
|
- safeAwait(snapshotStatusUpdateLatch);
|
|
|
- try {
|
|
|
- handler.messageReceived(request, channel, task);
|
|
|
- } catch (Exception e) {
|
|
|
- fail(e);
|
|
|
- }
|
|
|
- })
|
|
|
- );
|
|
|
-
|
|
|
mockLog.addExpectation(
|
|
|
new MockLog.SeenEventExpectation(
|
|
|
"SnapshotShutdownProgressTracker shard snapshot has paused log message",
|