|
@@ -15,7 +15,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
|
|
|
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
|
|
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
|
|
|
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
|
|
|
-import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
|
|
import org.elasticsearch.action.index.IndexRequestBuilder;
|
|
|
import org.elasticsearch.action.search.SearchRequestBuilder;
|
|
|
import org.elasticsearch.action.search.SearchResponse;
|
|
@@ -25,7 +24,6 @@ import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
|
|
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
|
|
import org.elasticsearch.cluster.routing.RecoverySource;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
-import org.elasticsearch.common.Strings;
|
|
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
|
|
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
|
import org.elasticsearch.common.blobstore.support.FilterBlobContainer;
|
|
@@ -1563,26 +1561,23 @@ public class SnapshotBasedIndexRecoveryIT extends AbstractSnapshotIntegTestCase
|
|
|
|
|
|
// Ensure that the safe commit == latest commit
|
|
|
assertBusy(() -> {
|
|
|
- ShardStats stats = indicesAdmin().prepareStats(indexName)
|
|
|
- .clear()
|
|
|
- .get()
|
|
|
- .asMap()
|
|
|
- .entrySet()
|
|
|
- .stream()
|
|
|
- .filter(e -> e.getKey().shardId().getId() == 0)
|
|
|
- .map(Map.Entry::getValue)
|
|
|
- .findFirst()
|
|
|
- .orElse(null);
|
|
|
- assertThat(stats, is(notNullValue()));
|
|
|
- assertThat(stats.getSeqNoStats(), is(notNullValue()));
|
|
|
-
|
|
|
- assertThat(stats.getSeqNoStats().getMaxSeqNo(), is(greaterThan(-1L)));
|
|
|
- assertThat(stats.getSeqNoStats().getGlobalCheckpoint(), is(greaterThan(-1L)));
|
|
|
- assertThat(
|
|
|
- Strings.toString(stats.getSeqNoStats()),
|
|
|
- stats.getSeqNoStats().getMaxSeqNo(),
|
|
|
- equalTo(stats.getSeqNoStats().getGlobalCheckpoint())
|
|
|
- );
|
|
|
+ ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
|
|
+ var indexShardRoutingTable = clusterState.routingTable().index(indexName).shard(0);
|
|
|
+ assertThat(indexShardRoutingTable, is(notNullValue()));
|
|
|
+
|
|
|
+ var assignedNodeId = indexShardRoutingTable.primaryShard().currentNodeId();
|
|
|
+ var assignedNodeName = clusterState.nodes().resolveNode(assignedNodeId).getName();
|
|
|
+
|
|
|
+ var indexShard = internalCluster().getInstance(IndicesService.class, assignedNodeName)
|
|
|
+ .indexService(resolveIndex(indexName))
|
|
|
+ .getShard(0);
|
|
|
+ assertThat(indexShard, is(notNullValue()));
|
|
|
+
|
|
|
+ // The safe commit is determined using the last synced global checkpoint, hence we should wait until the translog is synced
|
|
|
+ // to cover cases where the translog is synced asynchronously
|
|
|
+ var lastSyncedGlobalCheckpoint = indexShard.getLastSyncedGlobalCheckpoint();
|
|
|
+ var maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
|
|
|
+ assertThat(lastSyncedGlobalCheckpoint, equalTo(maxSeqNo));
|
|
|
}, 60, TimeUnit.SECONDS);
|
|
|
}
|
|
|
|