|
@@ -23,7 +23,6 @@ import com.carrotsearch.hppc.ObjectLongHashMap;
|
|
|
import com.carrotsearch.hppc.ObjectLongMap;
|
|
|
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
|
|
|
import org.elasticsearch.common.SuppressForbidden;
|
|
|
-import org.elasticsearch.common.collect.LongTuple;
|
|
|
import org.elasticsearch.index.IndexSettings;
|
|
|
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
|
|
import org.elasticsearch.index.shard.PrimaryContext;
|
|
@@ -377,22 +376,43 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
|
|
|
* that we have to sort the incoming local checkpoints from smallest to largest lest we violate that the global checkpoint does not
|
|
|
* regress.
|
|
|
*/
|
|
|
- final List<LongTuple<String>> inSync =
|
|
|
+
|
|
|
+ class AllocationIdLocalCheckpointPair {
|
|
|
+
|
|
|
+ private final String allocationId;
|
|
|
+
|
|
|
+ public String allocationId() {
|
|
|
+ return allocationId;
|
|
|
+ }
|
|
|
+
|
|
|
+ private final long localCheckpoint;
|
|
|
+
|
|
|
+ public long localCheckpoint() {
|
|
|
+ return localCheckpoint;
|
|
|
+ }
|
|
|
+
|
|
|
+ private AllocationIdLocalCheckpointPair(final String allocationId, final long localCheckpoint) {
|
|
|
+ this.allocationId = allocationId;
|
|
|
+ this.localCheckpoint = localCheckpoint;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ final List<AllocationIdLocalCheckpointPair> inSync =
|
|
|
StreamSupport
|
|
|
.stream(primaryContext.inSyncLocalCheckpoints().spliterator(), false)
|
|
|
- .map(e -> LongTuple.tuple(e.key, e.value))
|
|
|
+ .map(e -> new AllocationIdLocalCheckpointPair(e.key, e.value))
|
|
|
.collect(Collectors.toList());
|
|
|
+ inSync.sort(Comparator.comparingLong(AllocationIdLocalCheckpointPair::localCheckpoint));
|
|
|
|
|
|
- inSync.sort(Comparator.comparingLong(LongTuple::v2));
|
|
|
-
|
|
|
- for (final LongTuple<String> cursor : inSync) {
|
|
|
- assert cursor.v2() >= globalCheckpoint
|
|
|
- : "local checkpoint [" + cursor.v2() + "] "
|
|
|
- + "for allocation ID [" + cursor.v1() + "] "
|
|
|
+ for (final AllocationIdLocalCheckpointPair cursor : inSync) {
|
|
|
+ assert cursor.localCheckpoint() >= globalCheckpoint
|
|
|
+ : "local checkpoint [" + cursor.localCheckpoint() + "] "
|
|
|
+ + "for allocation ID [" + cursor.allocationId() + "] "
|
|
|
+ "violates being at least the global checkpoint [" + globalCheckpoint + "]";
|
|
|
- updateLocalCheckpoint(cursor.v1(), cursor.v2());
|
|
|
- if (trackingLocalCheckpoints.containsKey(cursor.v1())) {
|
|
|
- moveAllocationIdFromTrackingToInSync(cursor.v1(), "relocation");
|
|
|
+ updateLocalCheckpoint(cursor.allocationId(), cursor.localCheckpoint());
|
|
|
+ if (trackingLocalCheckpoints.containsKey(cursor.allocationId())) {
|
|
|
+ moveAllocationIdFromTrackingToInSync(cursor.allocationId(), "relocation");
|
|
|
updateGlobalCheckpointOnPrimary();
|
|
|
}
|
|
|
}
|