|
@@ -279,28 +279,30 @@ public class DataFrameTransformCheckpoint implements Writeable, ToXContentObject
|
|
|
throw new IllegalArgumentException("old checkpoint is newer than new checkpoint");
|
|
|
}
|
|
|
|
|
|
- // get the sum of of shard checkpoints
|
|
|
+ // get the sum of of shard operations (that are fully replicated), which is 1 higher than the global checkpoint for each shard
|
|
|
// note: we require shard checkpoints to strictly increase and never decrease
|
|
|
- long oldCheckPointSum = 0;
|
|
|
- long newCheckPointSum = 0;
|
|
|
+ long oldCheckPointOperationsSum = 0;
|
|
|
+ long newCheckPointOperationsSum = 0;
|
|
|
|
|
|
for (Entry<String, long[]> entry : oldCheckpoint.indicesCheckpoints.entrySet()) {
|
|
|
// ignore entries that aren't part of newCheckpoint, e.g. deleted indices
|
|
|
if (newCheckpoint.indicesCheckpoints.containsKey(entry.getKey())) {
|
|
|
- oldCheckPointSum += Arrays.stream(entry.getValue()).sum();
|
|
|
+ // Add 1 per shard as sequence numbers start at 0, i.e. sequence number 0 means there has been 1 operation
|
|
|
+ oldCheckPointOperationsSum += Arrays.stream(entry.getValue()).sum() + entry.getValue().length;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
for (long[] v : newCheckpoint.indicesCheckpoints.values()) {
|
|
|
- newCheckPointSum += Arrays.stream(v).sum();
|
|
|
+ // Add 1 per shard as sequence numbers start at 0, i.e. sequence number 0 means there has been 1 operation
|
|
|
+ newCheckPointOperationsSum += Arrays.stream(v).sum() + v.length;
|
|
|
}
|
|
|
|
|
|
// this should not be possible
|
|
|
- if (newCheckPointSum < oldCheckPointSum) {
|
|
|
+ if (newCheckPointOperationsSum < oldCheckPointOperationsSum) {
|
|
|
return -1L;
|
|
|
}
|
|
|
|
|
|
- return newCheckPointSum - oldCheckPointSum;
|
|
|
+ return newCheckPointOperationsSum - oldCheckPointOperationsSum;
|
|
|
}
|
|
|
|
|
|
private static Map<String, long[]> readCheckpoints(Map<String, Object> readMap) {
|