|
@@ -72,6 +72,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
private Queue<Long> mappingVersions;
|
|
|
private Queue<Exception> settingsUpdateFailures;
|
|
|
private Queue<Long> settingsVersions;
|
|
|
+ private Queue<Exception> aliasesUpdateFailures;
|
|
|
+ private Queue<Long> aliasesVersions;
|
|
|
private Queue<Long> leaderGlobalCheckpoints;
|
|
|
private Queue<Long> followerGlobalCheckpoints;
|
|
|
private Queue<Long> maxSeqNos;
|
|
@@ -88,7 +90,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
task.coordinateReads();
|
|
|
assertThat(shardChangesRequests, contains(new long[]{0L, 8L})); // treat this a peak request
|
|
|
shardChangesRequests.clear();
|
|
|
- task.innerHandleReadResponse(0, 5L, generateShardChangesResponse(0, 5L, 0L, 0L, 60L));
|
|
|
+ task.innerHandleReadResponse(0, 5L, generateShardChangesResponse(0, 5L, 0L, 0L, 1L, 60L));
|
|
|
assertThat(shardChangesRequests, contains(new long[][]{
|
|
|
{6L, 8L}, {14L, 8L}, {22L, 8L}, {30L, 8L}, {38L, 8L}, {46L, 8L}, {54L, 7L}}
|
|
|
));
|
|
@@ -113,7 +115,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
|
|
|
shardChangesRequests.clear();
|
|
|
// Also invokes the coordinatesReads() method:
|
|
|
- task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 128L));
|
|
|
+ task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 1L, 128L));
|
|
|
assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer count limit has been reached
|
|
|
|
|
|
ShardFollowNodeTaskStatus status = task.getStatus();
|
|
@@ -139,7 +141,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
|
|
|
shardChangesRequests.clear();
|
|
|
// Also invokes the coordinatesReads() method:
|
|
|
- task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 128L));
|
|
|
+ task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 1L, 128L));
|
|
|
assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because write buffer size limit has been reached
|
|
|
|
|
|
ShardFollowNodeTaskStatus status = task.getStatus();
|
|
@@ -204,7 +206,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
task.markAsCompleted();
|
|
|
shardChangesRequests.clear();
|
|
|
// Also invokes the coordinatesReads() method:
|
|
|
- task.innerHandleReadResponse(0L, 15L, generateShardChangesResponse(0, 15, 0L, 0L, 31L));
|
|
|
+ task.innerHandleReadResponse(0L, 15L, generateShardChangesResponse(0, 15, 0L, 0L, 1L, 31L));
|
|
|
assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because task has been cancelled
|
|
|
assertThat(bulkShardOperationRequests.size(), equalTo(0)); // no more writes, because task has been cancelled
|
|
|
|
|
@@ -234,7 +236,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
task.markAsCompleted();
|
|
|
shardChangesRequests.clear();
|
|
|
// Also invokes the coordinatesReads() method:
|
|
|
- task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 128L));
|
|
|
+ task.innerHandleReadResponse(0L, 63L, generateShardChangesResponse(0, 63, 0L, 0L, 1L, 128L));
|
|
|
assertThat(shardChangesRequests.size(), equalTo(0)); // no more reads, because task has been cancelled
|
|
|
assertThat(bulkShardOperationRequests.size(), equalTo(0)); // no more writes, because task has been cancelled
|
|
|
|
|
@@ -483,7 +485,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
startTask(task, 63, -1);
|
|
|
|
|
|
task.coordinateReads();
|
|
|
- ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L);
|
|
|
+ ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L);
|
|
|
task.innerHandleReadResponse(0L, 63L, response);
|
|
|
|
|
|
assertThat(bulkShardOperationRequests.size(), equalTo(1));
|
|
@@ -513,7 +515,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
|
|
|
|
|
|
shardChangesRequests.clear();
|
|
|
- ShardChangesAction.Response response = generateShardChangesResponse(0, 20, 0L, 0L, 31L);
|
|
|
+ ShardChangesAction.Response response = generateShardChangesResponse(0, 20, 0L, 0L, 1L, 31L);
|
|
|
task.innerHandleReadResponse(0L, 63L, response);
|
|
|
|
|
|
assertThat(shardChangesRequests.size(), equalTo(1));
|
|
@@ -542,7 +544,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
|
|
|
shardChangesRequests.clear();
|
|
|
task.markAsCompleted();
|
|
|
- ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 0L, 31L);
|
|
|
+ ShardChangesAction.Response response = generateShardChangesResponse(0, 31, 0L, 0L, 1L, 31L);
|
|
|
task.innerHandleReadResponse(0L, 64L, response);
|
|
|
|
|
|
assertThat(shardChangesRequests.size(), equalTo(0));
|
|
@@ -568,7 +570,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
|
|
|
|
|
|
shardChangesRequests.clear();
|
|
|
- task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 0, 100, new Translog.Operation[0], 1L));
|
|
|
+ task.innerHandleReadResponse(0L, 63L, new ShardChangesAction.Response(0, 0, 0, 0, 0, 100, new Translog.Operation[0], 1L));
|
|
|
|
|
|
assertThat(shardChangesRequests.size(), equalTo(1));
|
|
|
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
|
|
@@ -591,7 +593,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
|
|
|
mappingVersions.add(1L);
|
|
|
task.coordinateReads();
|
|
|
- ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 0L, 63L);
|
|
|
+ ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 0L, 0L, 63L);
|
|
|
task.handleReadResponse(0L, 63L, response);
|
|
|
|
|
|
assertThat(bulkShardOperationRequests.size(), equalTo(1));
|
|
@@ -620,7 +622,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
}
|
|
|
mappingVersions.add(1L);
|
|
|
task.coordinateReads();
|
|
|
- ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 0L, 63L);
|
|
|
+ ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 0L, 0L, 63L);
|
|
|
task.handleReadResponse(0L, 63L, response);
|
|
|
|
|
|
assertThat(mappingUpdateFailures.size(), equalTo(0));
|
|
@@ -645,7 +647,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
|
|
|
mappingUpdateFailures.add(new RuntimeException());
|
|
|
task.coordinateReads();
|
|
|
- ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 0L, 64L);
|
|
|
+ ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 0L, 0L, 64L);
|
|
|
task.handleReadResponse(0L, 64L, response);
|
|
|
|
|
|
assertThat(bulkShardOperationRequests.size(), equalTo(0));
|
|
@@ -668,7 +670,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
|
|
|
settingsVersions.add(1L);
|
|
|
task.coordinateReads();
|
|
|
- ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 1L, 63L);
|
|
|
+ ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 1L, 0L, 63L);
|
|
|
task.handleReadResponse(0L, 63L, response);
|
|
|
|
|
|
assertThat(bulkShardOperationRequests.size(), equalTo(1));
|
|
@@ -677,6 +679,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
ShardFollowNodeTaskStatus status = task.getStatus();
|
|
|
assertThat(status.followerMappingVersion(), equalTo(0L));
|
|
|
assertThat(status.followerSettingsVersion(), equalTo(1L));
|
|
|
+ assertThat(status.followerAliasesVersion(), equalTo(0L));
|
|
|
assertThat(status.outstandingReadRequests(), equalTo(1));
|
|
|
assertThat(status.outstandingWriteRequests(), equalTo(1));
|
|
|
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
|
|
@@ -698,15 +701,16 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
}
|
|
|
settingsVersions.add(1L);
|
|
|
task.coordinateReads();
|
|
|
- ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 1L, 63L);
|
|
|
+ ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 1L, 0L, 63L);
|
|
|
task.handleReadResponse(0L, 63L, response);
|
|
|
|
|
|
- assertThat(mappingUpdateFailures.size(), equalTo(0));
|
|
|
+ assertThat(settingsUpdateFailures.size(), equalTo(0));
|
|
|
assertThat(bulkShardOperationRequests.size(), equalTo(1));
|
|
|
assertThat(task.isStopped(), equalTo(false));
|
|
|
ShardFollowNodeTaskStatus status = task.getStatus();
|
|
|
assertThat(status.followerMappingVersion(), equalTo(0L));
|
|
|
assertThat(status.followerSettingsVersion(), equalTo(1L));
|
|
|
+ assertThat(status.followerAliasesVersion(), equalTo(0L));
|
|
|
assertThat(status.outstandingReadRequests(), equalTo(1));
|
|
|
assertThat(status.outstandingWriteRequests(), equalTo(1));
|
|
|
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
|
|
@@ -723,7 +727,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
|
|
|
settingsUpdateFailures.add(new RuntimeException());
|
|
|
task.coordinateReads();
|
|
|
- ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 1L, 64L);
|
|
|
+ ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 1L, 0L, 64L);
|
|
|
task.handleReadResponse(0L, 64L, response);
|
|
|
|
|
|
assertThat(bulkShardOperationRequests.size(), equalTo(0));
|
|
@@ -731,6 +735,89 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
ShardFollowNodeTaskStatus status = task.getStatus();
|
|
|
assertThat(status.followerMappingVersion(), equalTo(0L));
|
|
|
assertThat(status.followerSettingsVersion(), equalTo(0L));
|
|
|
+ assertThat(status.followerAliasesVersion(), equalTo(0L));
|
|
|
+ assertThat(status.outstandingReadRequests(), equalTo(1));
|
|
|
+ assertThat(status.outstandingWriteRequests(), equalTo(0));
|
|
|
+ assertThat(status.lastRequestedSeqNo(), equalTo(63L));
|
|
|
+ assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testAliasUpdate() {
|
|
|
+ final ShardFollowTaskParams params = new ShardFollowTaskParams();
|
|
|
+ params.maxReadRequestOperationCount = 64;
|
|
|
+ params.maxOutstandingReadRequests = 1;
|
|
|
+ params.maxOutstandingWriteRequests = 1;
|
|
|
+ final ShardFollowNodeTask task = createShardFollowTask(params);
|
|
|
+ startTask(task, 63, -1);
|
|
|
+
|
|
|
+ aliasesVersions.add(1L);
|
|
|
+ task.coordinateReads();
|
|
|
+ final ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L);
|
|
|
+ task.handleReadResponse(0L, 63L, response);
|
|
|
+
|
|
|
+ assertThat(bulkShardOperationRequests.size(), equalTo(1));
|
|
|
+ assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations())));
|
|
|
+
|
|
|
+ final ShardFollowNodeTaskStatus status = task.getStatus();
|
|
|
+ assertThat(status.followerMappingVersion(), equalTo(0L));
|
|
|
+ assertThat(status.followerSettingsVersion(), equalTo(0L));
|
|
|
+ assertThat(status.followerAliasesVersion(), equalTo(1L));
|
|
|
+ assertThat(status.outstandingReadRequests(), equalTo(1));
|
|
|
+ assertThat(status.outstandingWriteRequests(), equalTo(1));
|
|
|
+ assertThat(status.lastRequestedSeqNo(), equalTo(63L));
|
|
|
+ assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
|
|
|
+ assertThat(status.followerGlobalCheckpoint(), equalTo(-1L));
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testAliasUpdateRetryableError() {
|
|
|
+ final ShardFollowTaskParams params = new ShardFollowTaskParams();
|
|
|
+ params.maxReadRequestOperationCount = 64;
|
|
|
+ params.maxOutstandingReadRequests = 1;
|
|
|
+ params.maxOutstandingWriteRequests = 1;
|
|
|
+ final ShardFollowNodeTask task = createShardFollowTask(params);
|
|
|
+ startTask(task, 63, -1);
|
|
|
+
|
|
|
+ int max = randomIntBetween(1, 30);
|
|
|
+ for (int i = 0; i < max; i++) {
|
|
|
+ aliasesUpdateFailures.add(new ConnectException());
|
|
|
+ }
|
|
|
+ aliasesVersions.add(1L);
|
|
|
+ task.coordinateReads();
|
|
|
+ final ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L);
|
|
|
+ task.handleReadResponse(0L, 63L, response);
|
|
|
+
|
|
|
+ assertThat(aliasesUpdateFailures.size(), equalTo(0));
|
|
|
+ assertThat(bulkShardOperationRequests.size(), equalTo(1));
|
|
|
+ assertThat(task.isStopped(), equalTo(false));
|
|
|
+ final ShardFollowNodeTaskStatus status = task.getStatus();
|
|
|
+ assertThat(status.followerMappingVersion(), equalTo(0L));
|
|
|
+ assertThat(status.followerSettingsVersion(), equalTo(0L));
|
|
|
+ assertThat(status.followerAliasesVersion(), equalTo(1L));
|
|
|
+ assertThat(status.outstandingReadRequests(), equalTo(1));
|
|
|
+ assertThat(status.outstandingWriteRequests(), equalTo(1));
|
|
|
+ assertThat(status.lastRequestedSeqNo(), equalTo(63L));
|
|
|
+ assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testAliasUpdateNonRetryableError() {
|
|
|
+ final ShardFollowTaskParams params = new ShardFollowTaskParams();
|
|
|
+ params.maxReadRequestOperationCount = 64;
|
|
|
+ params.maxOutstandingReadRequests = 1;
|
|
|
+ params.maxOutstandingWriteRequests = 1;
|
|
|
+ final ShardFollowNodeTask task = createShardFollowTask(params);
|
|
|
+ startTask(task, 63, -1);
|
|
|
+
|
|
|
+ aliasesUpdateFailures.add(new RuntimeException());
|
|
|
+ task.coordinateReads();
|
|
|
+ final ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 0L, 0L, 1L, 64L);
|
|
|
+ task.handleReadResponse(0L, 64L, response);
|
|
|
+
|
|
|
+ assertThat(bulkShardOperationRequests.size(), equalTo(0));
|
|
|
+ assertThat(task.isStopped(), equalTo(true));
|
|
|
+ final ShardFollowNodeTaskStatus status = task.getStatus();
|
|
|
+ assertThat(status.followerMappingVersion(), equalTo(0L));
|
|
|
+ assertThat(status.followerSettingsVersion(), equalTo(0L));
|
|
|
+ assertThat(status.followerAliasesVersion(), equalTo(0L));
|
|
|
assertThat(status.outstandingReadRequests(), equalTo(1));
|
|
|
assertThat(status.outstandingWriteRequests(), equalTo(0));
|
|
|
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
|
|
@@ -752,7 +839,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
|
|
|
assertThat(shardChangesRequests.get(0)[1], equalTo(128L));
|
|
|
|
|
|
- ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L);
|
|
|
+ ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L);
|
|
|
// Also invokes coordinatesWrites()
|
|
|
task.innerHandleReadResponse(0L, 63L, response);
|
|
|
|
|
@@ -772,7 +859,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
params.maxWriteRequestOperationCount = 64;
|
|
|
params.maxOutstandingWriteRequests = 2;
|
|
|
ShardFollowNodeTask task = createShardFollowTask(params);
|
|
|
- ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 0L, 256L);
|
|
|
+ ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 0L, 1L, 256L);
|
|
|
// Also invokes coordinatesWrites()
|
|
|
task.innerHandleReadResponse(0L, 64L, response);
|
|
|
|
|
@@ -785,7 +872,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
|
|
|
params.maxOutstandingWriteRequests = 4; // change to 4 outstanding writers
|
|
|
task = createShardFollowTask(params);
|
|
|
- response = generateShardChangesResponse(0, 256, 0L, 0L, 256L);
|
|
|
+ response = generateShardChangesResponse(0, 256, 0L, 0L, 1L, 256L);
|
|
|
// Also invokes coordinatesWrites()
|
|
|
task.innerHandleReadResponse(0L, 64L, response);
|
|
|
|
|
@@ -804,7 +891,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
params.maxWriteRequestOperationCount = 8;
|
|
|
params.maxOutstandingWriteRequests = 32;
|
|
|
ShardFollowNodeTask task = createShardFollowTask(params);
|
|
|
- ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 0L, 256L);
|
|
|
+ ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 0L, 1L, 256L);
|
|
|
// Also invokes coordinatesWrites()
|
|
|
task.innerHandleReadResponse(0L, 64L, response);
|
|
|
|
|
@@ -835,7 +922,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
for (int i = 0; i < max; i++) {
|
|
|
writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
|
|
|
}
|
|
|
- ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L);
|
|
|
+ ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L);
|
|
|
// Also invokes coordinatesWrites()
|
|
|
task.innerHandleReadResponse(0L, 63L, response);
|
|
|
|
|
@@ -864,7 +951,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
|
|
|
|
|
|
writeFailures.add(new RuntimeException());
|
|
|
- ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L);
|
|
|
+ ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L);
|
|
|
// Also invokes coordinatesWrites()
|
|
|
task.innerHandleReadResponse(0L, 63L, response);
|
|
|
|
|
@@ -891,7 +978,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
|
|
|
assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
|
|
|
|
|
|
- ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 64L);
|
|
|
+ ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 64L);
|
|
|
// Also invokes coordinatesWrites()
|
|
|
task.innerHandleReadResponse(0L, 64L, response);
|
|
|
|
|
@@ -914,7 +1001,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
|
|
|
shardChangesRequests.clear();
|
|
|
followerGlobalCheckpoints.add(63L);
|
|
|
- ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 63L);
|
|
|
+ ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 0L, 1L, 63L);
|
|
|
// Also invokes coordinatesWrites()
|
|
|
task.innerHandleReadResponse(0L, 63L, response);
|
|
|
|
|
@@ -1013,6 +1100,8 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
mappingVersions = new LinkedList<>();
|
|
|
settingsUpdateFailures = new LinkedList<>();
|
|
|
settingsVersions = new LinkedList<>();
|
|
|
+ aliasesUpdateFailures = new LinkedList<>();
|
|
|
+ aliasesVersions = new LinkedList<>();
|
|
|
leaderGlobalCheckpoints = new LinkedList<>();
|
|
|
followerGlobalCheckpoints = new LinkedList<>();
|
|
|
maxSeqNos = new LinkedList<>();
|
|
@@ -1048,6 +1137,20 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ protected void innerUpdateAliases(final LongConsumer handler, final Consumer<Exception> errorHandler) {
|
|
|
+ final Exception failure = aliasesUpdateFailures.poll();
|
|
|
+ if (failure != null) {
|
|
|
+ errorHandler.accept(failure);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ final Long aliasesVersion = aliasesVersions.poll();
|
|
|
+ if (aliasesVersion != null) {
|
|
|
+ handler.accept(aliasesVersion);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected void innerSendBulkShardOperationsRequest(
|
|
|
String followerHistoryUUID, final List<Translog.Operation> operations,
|
|
@@ -1086,6 +1189,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
final ShardChangesAction.Response response = new ShardChangesAction.Response(
|
|
|
mappingVersions.poll(),
|
|
|
0L,
|
|
|
+ 0L,
|
|
|
leaderGlobalCheckpoints.poll(),
|
|
|
maxSeqNos.poll(),
|
|
|
randomNonNegativeLong(),
|
|
@@ -1153,6 +1257,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
long toSeqNo,
|
|
|
long mappingVersion,
|
|
|
long settingsVersion,
|
|
|
+ long aliasesVersion,
|
|
|
long leaderGlobalCheckPoint) {
|
|
|
List<Translog.Operation> ops = new ArrayList<>();
|
|
|
for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) {
|
|
@@ -1163,6 +1268,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
|
|
|
return new ShardChangesAction.Response(
|
|
|
mappingVersion,
|
|
|
settingsVersion,
|
|
|
+ aliasesVersion,
|
|
|
leaderGlobalCheckPoint,
|
|
|
leaderGlobalCheckPoint,
|
|
|
randomNonNegativeLong(),
|