|
@@ -539,9 +539,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|
|
"checkpoints map should always have an entry for the current shard";
|
|
|
|
|
|
// local checkpoints only set during primary mode
|
|
|
- assert primaryMode || checkpoints.values().stream()
|
|
|
- .allMatch(lcps -> lcps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO ||
|
|
|
- lcps.localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT);
|
|
|
+ assert primaryMode || checkpoints.values().stream().allMatch(lcps -> lcps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO);
|
|
|
|
|
|
// global checkpoints for other shards only set during primary mode
|
|
|
assert primaryMode
|
|
@@ -550,9 +548,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|
|
.stream()
|
|
|
.filter(e -> e.getKey().equals(shardAllocationId) == false)
|
|
|
.map(Map.Entry::getValue)
|
|
|
- .allMatch(cps ->
|
|
|
- (cps.globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO
|
|
|
- || cps.globalCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT));
|
|
|
+ .allMatch(cps -> cps.globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO);
|
|
|
|
|
|
// relocation handoff can only occur in primary mode
|
|
|
assert !handoffInProgress || primaryMode;
|
|
@@ -631,7 +627,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|
|
.stream()
|
|
|
.filter(cps -> cps.inSync)
|
|
|
.mapToLong(function)
|
|
|
- .filter(v -> v != SequenceNumbers.PRE_60_NODE_CHECKPOINT && v != SequenceNumbers.UNASSIGNED_SEQ_NO));
|
|
|
+ .filter(v -> v != SequenceNumbers.UNASSIGNED_SEQ_NO));
|
|
|
return value.isPresent() ? value.getAsLong() : SequenceNumbers.UNASSIGNED_SEQ_NO;
|
|
|
}
|
|
|
|
|
@@ -916,13 +912,9 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|
|
}
|
|
|
|
|
|
private boolean updateLocalCheckpoint(String allocationId, CheckpointState cps, long localCheckpoint) {
|
|
|
- // a local checkpoint of PRE_60_NODE_CHECKPOINT cannot be overridden
|
|
|
- assert cps.localCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT ||
|
|
|
- localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT :
|
|
|
- "pre-6.0 shard copy " + allocationId + " unexpected to send valid local checkpoint " + localCheckpoint;
|
|
|
- // a local checkpoint for a shard copy should be a valid sequence number or the pre-6.0 sequence number indicator
|
|
|
- assert localCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO :
|
|
|
- "invalid local checkpoint for shard copy [" + allocationId + "]";
|
|
|
+ // a local checkpoint for a shard copy should be a valid sequence number
|
|
|
+ assert localCheckpoint >= SequenceNumbers.NO_OPS_PERFORMED :
|
|
|
+ "invalid local checkpoint [" + localCheckpoint + "] for shard copy [" + allocationId + "]";
|
|
|
if (localCheckpoint > cps.localCheckpoint) {
|
|
|
logger.trace("updated local checkpoint of [{}] from [{}] to [{}]", allocationId, cps.localCheckpoint, localCheckpoint);
|
|
|
cps.localCheckpoint = localCheckpoint;
|
|
@@ -981,8 +973,6 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|
|
if (cps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
|
|
// unassigned in-sync replica
|
|
|
return fallback;
|
|
|
- } else if (cps.localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT) {
|
|
|
- // 5.x replica, ignore for global checkpoint calculation
|
|
|
} else {
|
|
|
minLocalCheckpoint = Math.min(cps.localCheckpoint, minLocalCheckpoint);
|
|
|
}
|
|
@@ -1054,18 +1044,11 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|
|
handoffInProgress = false;
|
|
|
relocated = true;
|
|
|
// forget all checkpoint information except for global checkpoint of current shard
|
|
|
- checkpoints.entrySet().stream().forEach(e -> {
|
|
|
- final CheckpointState cps = e.getValue();
|
|
|
- if (cps.localCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO &&
|
|
|
- cps.localCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT) {
|
|
|
- cps.localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
|
|
|
- }
|
|
|
- if (e.getKey().equals(shardAllocationId) == false) {
|
|
|
+ checkpoints.forEach((key, cps) -> {
|
|
|
+ cps.localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
|
|
|
+ if (key.equals(shardAllocationId) == false) {
|
|
|
// don't throw global checkpoint information of current shard away
|
|
|
- if (cps.globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO &&
|
|
|
- cps.globalCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT) {
|
|
|
- cps.globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
|
|
|
- }
|
|
|
+ cps.globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
|
|
|
}
|
|
|
});
|
|
|
assert invariant();
|