|
|
@@ -86,14 +86,20 @@ public class SyncedFlushService implements IndexEventListener {
|
|
|
private final IndexNameExpressionResolver indexNameExpressionResolver;
|
|
|
|
|
|
@Inject
|
|
|
- public SyncedFlushService(IndicesService indicesService, ClusterService clusterService, TransportService transportService, IndexNameExpressionResolver indexNameExpressionResolver) {
|
|
|
+ public SyncedFlushService(IndicesService indicesService,
|
|
|
+ ClusterService clusterService,
|
|
|
+ TransportService transportService,
|
|
|
+ IndexNameExpressionResolver indexNameExpressionResolver) {
|
|
|
this.indicesService = indicesService;
|
|
|
this.clusterService = clusterService;
|
|
|
this.transportService = transportService;
|
|
|
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
|
|
- transportService.registerRequestHandler(PRE_SYNCED_FLUSH_ACTION_NAME, PreShardSyncedFlushRequest::new, ThreadPool.Names.FLUSH, new PreSyncedFlushTransportHandler());
|
|
|
- transportService.registerRequestHandler(SYNCED_FLUSH_ACTION_NAME, ShardSyncedFlushRequest::new, ThreadPool.Names.FLUSH, new SyncedFlushTransportHandler());
|
|
|
- transportService.registerRequestHandler(IN_FLIGHT_OPS_ACTION_NAME, InFlightOpsRequest::new, ThreadPool.Names.SAME, new InFlightOpCountTransportHandler());
|
|
|
+ transportService.registerRequestHandler(PRE_SYNCED_FLUSH_ACTION_NAME, PreShardSyncedFlushRequest::new, ThreadPool.Names.FLUSH,
|
|
|
+ new PreSyncedFlushTransportHandler());
|
|
|
+ transportService.registerRequestHandler(SYNCED_FLUSH_ACTION_NAME, ShardSyncedFlushRequest::new, ThreadPool.Names.FLUSH,
|
|
|
+ new SyncedFlushTransportHandler());
|
|
|
+ transportService.registerRequestHandler(IN_FLIGHT_OPS_ACTION_NAME, InFlightOpsRequest::new, ThreadPool.Names.SAME,
|
|
|
+ new InFlightOpCountTransportHandler());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@@ -103,7 +109,8 @@ public class SyncedFlushService implements IndexEventListener {
|
|
|
attemptSyncedFlush(indexShard.shardId(), new ActionListener<ShardsSyncedFlushResult>() {
|
|
|
@Override
|
|
|
public void onResponse(ShardsSyncedFlushResult syncedFlushResult) {
|
|
|
- logger.trace("{} sync flush on inactive shard returned successfully for sync_id: {}", syncedFlushResult.getShardId(), syncedFlushResult.syncId());
|
|
|
+ logger.trace("{} sync flush on inactive shard returned successfully for sync_id: {}",
|
|
|
+ syncedFlushResult.getShardId(), syncedFlushResult.syncId());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@@ -115,10 +122,13 @@ public class SyncedFlushService implements IndexEventListener {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * a utility method to perform a synced flush for all shards of multiple indices. see {@link #attemptSyncedFlush(ShardId, ActionListener)}
|
|
|
+ * a utility method to perform a synced flush for all shards of multiple indices.
|
|
|
+ * see {@link #attemptSyncedFlush(ShardId, ActionListener)}
|
|
|
* for more details.
|
|
|
*/
|
|
|
- public void attemptSyncedFlush(final String[] aliasesOrIndices, IndicesOptions indicesOptions, final ActionListener<SyncedFlushResponse> listener) {
|
|
|
+ public void attemptSyncedFlush(final String[] aliasesOrIndices,
|
|
|
+ IndicesOptions indicesOptions,
|
|
|
+ final ActionListener<SyncedFlushResponse> listener) {
|
|
|
final ClusterState state = clusterService.state();
|
|
|
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, indicesOptions, aliasesOrIndices);
|
|
|
final Map<String, List<ShardsSyncedFlushResult>> results = ConcurrentCollections.newConcurrentMap();
|
|
|
@@ -176,12 +186,14 @@ public class SyncedFlushService implements IndexEventListener {
|
|
|
* a) the shard has no uncommitted changes since the last flush
|
|
|
* b) the last flush was the one executed in 1 (use the collected commit id to verify this)
|
|
|
*
|
|
|
- * This alone is not enough to ensure that all copies contain the same documents. Without step 2 a sync id would be written for inconsistent copies in the following scenario:
|
|
|
+ * This alone is not enough to ensure that all copies contain the same documents.
|
|
|
+ * Without step 2 a sync id would be written for inconsistent copies in the following scenario:
|
|
|
*
|
|
|
- * Write operation has completed on a primary and is being sent to replicas. The write request does not reach the replicas until sync flush is finished.
|
|
|
+ * Write operation has completed on a primary and is being sent to replicas. The write request does not reach the
|
|
|
+ * replicas until sync flush is finished.
|
|
|
* Step 1 is executed. After the flush the commit points on primary contains a write operation that the replica does not have.
|
|
|
- * Step 3 will be executed on primary and replica as well because there are no uncommitted changes on primary (the first flush committed them) and there are no uncommitted
|
|
|
- * changes on the replica (the write operation has not reached the replica yet).
|
|
|
+ * Step 3 will be executed on primary and replica as well because there are no uncommitted changes on primary (the first flush
|
|
|
+ * committed them) and there are no uncommitted changes on the replica (the write operation has not reached the replica yet).
|
|
|
*
|
|
|
* Step 2 detects this scenario and fails the whole synced flush if a write operation is ongoing on the primary.
|
|
|
* Together with the conditions for step 3 (same commit id and no uncommitted changes) this guarantees that a snc id will only
|
|
|
@@ -194,7 +206,9 @@ public class SyncedFlushService implements IndexEventListener {
|
|
|
innerAttemptSyncedFlush(shardId, clusterService.state(), actionListener);
|
|
|
}
|
|
|
|
|
|
- private void innerAttemptSyncedFlush(final ShardId shardId, final ClusterState state, final ActionListener<ShardsSyncedFlushResult> actionListener) {
|
|
|
+ private void innerAttemptSyncedFlush(final ShardId shardId,
|
|
|
+ final ClusterState state,
|
|
|
+ final ActionListener<ShardsSyncedFlushResult> actionListener) {
|
|
|
try {
|
|
|
final IndexShardRoutingTable shardRoutingTable = getShardRoutingTable(shardId, state);
|
|
|
final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
|
|
|
@@ -205,11 +219,13 @@ public class SyncedFlushService implements IndexEventListener {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- final ActionListener<Map<String, PreSyncedFlushResponse>> presyncListener = new ActionListener<Map<String, PreSyncedFlushResponse>>() {
|
|
|
+ final ActionListener<Map<String, PreSyncedFlushResponse>> presyncListener =
|
|
|
+ new ActionListener<Map<String, PreSyncedFlushResponse>>() {
|
|
|
@Override
|
|
|
public void onResponse(final Map<String, PreSyncedFlushResponse> presyncResponses) {
|
|
|
if (presyncResponses.isEmpty()) {
|
|
|
- actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "all shards failed to commit on pre-sync"));
|
|
|
+ actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards,
|
|
|
+ "all shards failed to commit on pre-sync"));
|
|
|
return;
|
|
|
}
|
|
|
final ActionListener<InFlightOpsResponse> inflightOpsListener = new ActionListener<InFlightOpsResponse>() {
|
|
|
@@ -218,14 +234,17 @@ public class SyncedFlushService implements IndexEventListener {
|
|
|
final int inflight = response.opCount();
|
|
|
assert inflight >= 0;
|
|
|
if (inflight != 0) {
|
|
|
- actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + "] ongoing operations on primary"));
|
|
|
+ actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight +
|
|
|
+ "] ongoing operations on primary"));
|
|
|
} else {
|
|
|
// 3. now send the sync request to all the shards;
|
|
|
final String sharedSyncId = sharedExistingSyncId(presyncResponses);
|
|
|
if (sharedSyncId != null) {
|
|
|
assert presyncResponses.values().stream().allMatch(r -> r.existingSyncId.equals(sharedSyncId)) :
|
|
|
- "Not all shards have the same existing sync id [" + sharedSyncId + "], responses [" + presyncResponses + "]";
|
|
|
- reportSuccessWithExistingSyncId(shardId, sharedSyncId, activeShards, totalShards, presyncResponses, actionListener);
|
|
|
+ "Not all shards have the same existing sync id [" + sharedSyncId + "], responses [" +
|
|
|
+ presyncResponses + "]";
|
|
|
+ reportSuccessWithExistingSyncId(shardId, sharedSyncId, activeShards, totalShards,
|
|
|
+ presyncResponses, actionListener);
|
|
|
}else {
|
|
|
String syncId = UUIDs.randomBase64UUID();
|
|
|
sendSyncRequests(syncId, activeShards, state, presyncResponses, shardId, totalShards, actionListener);
|
|
|
@@ -271,8 +290,12 @@ public class SyncedFlushService implements IndexEventListener {
|
|
|
return existingSyncId;
|
|
|
}
|
|
|
|
|
|
- private void reportSuccessWithExistingSyncId(ShardId shardId, String existingSyncId, List<ShardRouting> shards, int totalShards,
|
|
|
- Map<String, PreSyncedFlushResponse> preSyncResponses, ActionListener<ShardsSyncedFlushResult> listener) {
|
|
|
+ private void reportSuccessWithExistingSyncId(ShardId shardId,
|
|
|
+ String existingSyncId,
|
|
|
+ List<ShardRouting> shards,
|
|
|
+ int totalShards,
|
|
|
+ Map<String, PreSyncedFlushResponse> preSyncResponses,
|
|
|
+ ActionListener<ShardsSyncedFlushResult> listener) {
|
|
|
final Map<ShardRouting, ShardSyncedFlushResponse> results = new HashMap<>();
|
|
|
for (final ShardRouting shard : shards) {
|
|
|
if (preSyncResponses.containsKey(shard.currentNodeId())) {
|
|
|
@@ -301,7 +324,10 @@ public class SyncedFlushService implements IndexEventListener {
|
|
|
/**
|
|
|
* returns the number of in flight operations on primary. -1 upon error.
|
|
|
*/
|
|
|
- protected void getInflightOpsCount(final ShardId shardId, ClusterState state, IndexShardRoutingTable shardRoutingTable, final ActionListener<InFlightOpsResponse> listener) {
|
|
|
+ protected void getInflightOpsCount(final ShardId shardId,
|
|
|
+ ClusterState state,
|
|
|
+ IndexShardRoutingTable shardRoutingTable,
|
|
|
+ final ActionListener<InFlightOpsResponse> listener) {
|
|
|
try {
|
|
|
final ShardRouting primaryShard = shardRoutingTable.primaryShard();
|
|
|
final DiscoveryNode primaryNode = state.nodes().get(primaryShard.currentNodeId());
|
|
|
@@ -353,8 +379,13 @@ public class SyncedFlushService implements IndexEventListener {
|
|
|
return PreSyncedFlushResponse.UNKNOWN_NUM_DOCS;
|
|
|
}
|
|
|
|
|
|
- void sendSyncRequests(final String syncId, final List<ShardRouting> shards, ClusterState state, Map<String, PreSyncedFlushResponse> preSyncResponses,
|
|
|
- final ShardId shardId, final int totalShards, final ActionListener<ShardsSyncedFlushResult> listener) {
|
|
|
+ void sendSyncRequests(final String syncId,
|
|
|
+ final List<ShardRouting> shards,
|
|
|
+ ClusterState state,
|
|
|
+ Map<String, PreSyncedFlushResponse> preSyncResponses,
|
|
|
+ final ShardId shardId,
|
|
|
+ final int totalShards,
|
|
|
+ final ActionListener<ShardsSyncedFlushResult> listener) {
|
|
|
final CountDown countDown = new CountDown(shards.size());
|
|
|
final Map<ShardRouting, ShardSyncedFlushResponse> results = ConcurrentCollections.newConcurrentMap();
|
|
|
final int numDocsOnPrimary = numDocsOnPrimary(shards, preSyncResponses);
|
|
|
@@ -368,13 +399,15 @@ public class SyncedFlushService implements IndexEventListener {
|
|
|
}
|
|
|
final PreSyncedFlushResponse preSyncedResponse = preSyncResponses.get(shard.currentNodeId());
|
|
|
if (preSyncedResponse == null) {
|
|
|
- logger.trace("{} can't resolve expected commit id for current node, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
|
|
|
+ logger.trace("{} can't resolve expected commit id for current node, skipping for sync id [{}]. shard routing {}",
|
|
|
+ shardId, syncId, shard);
|
|
|
results.put(shard, new ShardSyncedFlushResponse("no commit id from pre-sync flush"));
|
|
|
countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
|
|
|
continue;
|
|
|
}
|
|
|
- if (preSyncedResponse.numDocs != numDocsOnPrimary
|
|
|
- && preSyncedResponse.numDocs != PreSyncedFlushResponse.UNKNOWN_NUM_DOCS && numDocsOnPrimary != PreSyncedFlushResponse.UNKNOWN_NUM_DOCS) {
|
|
|
+ if (preSyncedResponse.numDocs != numDocsOnPrimary &&
|
|
|
+ preSyncedResponse.numDocs != PreSyncedFlushResponse.UNKNOWN_NUM_DOCS &&
|
|
|
+ numDocsOnPrimary != PreSyncedFlushResponse.UNKNOWN_NUM_DOCS) {
|
|
|
logger.warn("{} can't to issue sync id [{}] for out of sync replica [{}] with num docs [{}]; num docs on primary [{}]",
|
|
|
shardId, syncId, shard, preSyncedResponse.numDocs, numDocsOnPrimary);
|
|
|
results.put(shard, new ShardSyncedFlushResponse("out of sync replica; " +
|
|
|
@@ -383,7 +416,8 @@ public class SyncedFlushService implements IndexEventListener {
|
|
|
continue;
|
|
|
}
|
|
|
logger.trace("{} sending synced flush request to {}. sync id [{}].", shardId, shard, syncId);
|
|
|
- transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, new ShardSyncedFlushRequest(shard.shardId(), syncId, preSyncedResponse.commitId),
|
|
|
+ ShardSyncedFlushRequest syncedFlushRequest = new ShardSyncedFlushRequest(shard.shardId(), syncId, preSyncedResponse.commitId);
|
|
|
+ transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, syncedFlushRequest,
|
|
|
new TransportResponseHandler<ShardSyncedFlushResponse>() {
|
|
|
@Override
|
|
|
public ShardSyncedFlushResponse read(StreamInput in) throws IOException {
|
|
|
@@ -402,7 +436,8 @@ public class SyncedFlushService implements IndexEventListener {
|
|
|
|
|
|
@Override
|
|
|
public void handleException(TransportException exp) {
|
|
|
- logger.trace(() -> new ParameterizedMessage("{} error while performing synced flush on [{}], skipping", shardId, shard), exp);
|
|
|
+ logger.trace(() -> new ParameterizedMessage("{} error while performing synced flush on [{}], skipping",
|
|
|
+ shardId, shard), exp);
|
|
|
results.put(shard, new ShardSyncedFlushResponse(exp.getMessage()));
|
|
|
countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
|
|
|
}
|
|
|
@@ -416,8 +451,13 @@ public class SyncedFlushService implements IndexEventListener {
|
|
|
|
|
|
}
|
|
|
|
|
|
- private void countDownAndSendResponseIfDone(String syncId, List<ShardRouting> shards, ShardId shardId, int totalShards,
|
|
|
- ActionListener<ShardsSyncedFlushResult> listener, CountDown countDown, Map<ShardRouting, ShardSyncedFlushResponse> results) {
|
|
|
+ private void countDownAndSendResponseIfDone(String syncId,
|
|
|
+ List<ShardRouting> shards,
|
|
|
+ ShardId shardId,
|
|
|
+ int totalShards,
|
|
|
+ ActionListener<ShardsSyncedFlushResult> listener,
|
|
|
+ CountDown countDown,
|
|
|
+ Map<ShardRouting, ShardSyncedFlushResponse> results) {
|
|
|
if (countDown.countDown()) {
|
|
|
assert results.size() == shards.size();
|
|
|
listener.onResponse(new ShardsSyncedFlushResult(shardId, syncId, totalShards, results));
|
|
|
@@ -427,7 +467,10 @@ public class SyncedFlushService implements IndexEventListener {
|
|
|
/**
|
|
|
* send presync requests to all started copies of the given shard
|
|
|
*/
|
|
|
- void sendPreSyncRequests(final List<ShardRouting> shards, final ClusterState state, final ShardId shardId, final ActionListener<Map<String, PreSyncedFlushResponse>> listener) {
|
|
|
+ void sendPreSyncRequests(final List<ShardRouting> shards,
|
|
|
+ final ClusterState state,
|
|
|
+ final ShardId shardId,
|
|
|
+ final ActionListener<Map<String, PreSyncedFlushResponse>> listener) {
|
|
|
final CountDown countDown = new CountDown(shards.size());
|
|
|
final ConcurrentMap<String, PreSyncedFlushResponse> presyncResponses = ConcurrentCollections.newConcurrentMap();
|
|
|
for (final ShardRouting shard : shards) {
|
|
|
@@ -440,7 +483,8 @@ public class SyncedFlushService implements IndexEventListener {
|
|
|
}
|
|
|
continue;
|
|
|
}
|
|
|
- transportService.sendRequest(node, PRE_SYNCED_FLUSH_ACTION_NAME, new PreShardSyncedFlushRequest(shard.shardId()), new TransportResponseHandler<PreSyncedFlushResponse>() {
|
|
|
+ transportService.sendRequest(node, PRE_SYNCED_FLUSH_ACTION_NAME, new PreShardSyncedFlushRequest(shard.shardId()),
|
|
|
+ new TransportResponseHandler<PreSyncedFlushResponse>() {
|
|
|
@Override
|
|
|
public PreSyncedFlushResponse read(StreamInput in) throws IOException {
|
|
|
PreSyncedFlushResponse response = new PreSyncedFlushResponse();
|
|
|
@@ -460,7 +504,8 @@ public class SyncedFlushService implements IndexEventListener {
|
|
|
|
|
|
@Override
|
|
|
public void handleException(TransportException exp) {
|
|
|
- logger.trace(() -> new ParameterizedMessage("{} error while performing pre synced flush on [{}], skipping", shardId, shard), exp);
|
|
|
+ logger.trace(() -> new ParameterizedMessage("{} error while performing pre synced flush on [{}], skipping",
|
|
|
+ shardId, shard), exp);
|
|
|
if (countDown.countDown()) {
|
|
|
listener.onResponse(presyncResponses);
|
|
|
}
|
|
|
@@ -488,7 +533,8 @@ public class SyncedFlushService implements IndexEventListener {
|
|
|
private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) {
|
|
|
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
|
|
|
IndexShard indexShard = indexService.getShard(request.shardId().id());
|
|
|
- logger.trace("{} performing sync flush. sync id [{}], expected commit id {}", request.shardId(), request.syncId(), request.expectedCommitId());
|
|
|
+ logger.trace("{} performing sync flush. sync id [{}], expected commit id {}",
|
|
|
+ request.shardId(), request.syncId(), request.expectedCommitId());
|
|
|
Engine.SyncedFlushResult result = indexShard.syncFlush(request.syncId(), request.expectedCommitId());
|
|
|
logger.trace("{} sync flush done. sync id [{}], result [{}]", request.shardId(), request.syncId(), result);
|
|
|
switch (result) {
|