|
@@ -60,8 +60,9 @@ import java.nio.charset.StandardCharsets;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collections;
|
|
|
-import java.util.HashSet;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.Future;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
@@ -559,11 +560,11 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
|
|
|
boolean assertMaxSeqNoOfUpdatesOrDeletes) throws Exception {
|
|
|
final List<Tuple<String, Long>> docAndSeqNosOnLeader = getDocIdAndSeqNos(leader.getPrimary()).stream()
|
|
|
.map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList());
|
|
|
- final Set<Tuple<Long, Translog.Operation.Type>> operationsOnLeader = new HashSet<>();
|
|
|
+ final Map<Long, Translog.Operation> operationsOnLeader = new HashMap<>();
|
|
|
try (Translog.Snapshot snapshot = leader.getPrimary().newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
|
|
|
Translog.Operation op;
|
|
|
while ((op = snapshot.next()) != null) {
|
|
|
- operationsOnLeader.add(Tuple.tuple(op.seqNo(), op.opType()));
|
|
|
+ operationsOnLeader.put(op.seqNo(), op);
|
|
|
}
|
|
|
}
|
|
|
for (IndexShard followingShard : follower) {
|
|
@@ -573,14 +574,14 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
|
|
|
List<Tuple<String, Long>> docAndSeqNosOnFollower = getDocIdAndSeqNos(followingShard).stream()
|
|
|
.map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList());
|
|
|
assertThat(docAndSeqNosOnFollower, equalTo(docAndSeqNosOnLeader));
|
|
|
- final Set<Tuple<Long, Translog.Operation.Type>> operationsOnFollower = new HashSet<>();
|
|
|
try (Translog.Snapshot snapshot = followingShard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) {
|
|
|
Translog.Operation op;
|
|
|
while ((op = snapshot.next()) != null) {
|
|
|
- operationsOnFollower.add(Tuple.tuple(op.seqNo(), op.opType()));
|
|
|
+ Translog.Operation leaderOp = operationsOnLeader.get(op.seqNo());
|
|
|
+ assertThat(TransportBulkShardOperationsAction.rewriteOperationWithPrimaryTerm(op, leaderOp.primaryTerm()),
|
|
|
+ equalTo(leaderOp));
|
|
|
}
|
|
|
}
|
|
|
- assertThat(followingShard.routingEntry().toString(), operationsOnFollower, equalTo(operationsOnLeader));
|
|
|
}
|
|
|
}
|
|
|
|