|
@@ -16,17 +16,16 @@
|
|
|
* specific language governing permissions and limitations
|
|
|
* under the License.
|
|
|
*/
|
|
|
-package org.elasticsearch.indices;
|
|
|
+package org.elasticsearch.indices.flush;
|
|
|
|
|
|
-import com.google.common.collect.ImmutableMap;
|
|
|
import org.elasticsearch.ElasticsearchException;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
|
|
+import org.elasticsearch.action.support.IndicesOptions;
|
|
|
import org.elasticsearch.cluster.ClusterService;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
-import org.elasticsearch.cluster.routing.ImmutableShardRouting;
|
|
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
|
|
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
|
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
|
@@ -36,26 +35,27 @@ import org.elasticsearch.common.inject.Inject;
|
|
|
import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
-import org.elasticsearch.common.unit.TimeValue;
|
|
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
|
import org.elasticsearch.common.util.concurrent.CountDown;
|
|
|
import org.elasticsearch.index.IndexService;
|
|
|
import org.elasticsearch.index.IndexShardMissingException;
|
|
|
import org.elasticsearch.index.engine.Engine;
|
|
|
-import org.elasticsearch.index.shard.IllegalIndexShardStateException;
|
|
|
import org.elasticsearch.index.shard.IndexShard;
|
|
|
import org.elasticsearch.index.shard.IndexShardException;
|
|
|
import org.elasticsearch.index.shard.ShardId;
|
|
|
+import org.elasticsearch.indices.IndexClosedException;
|
|
|
+import org.elasticsearch.indices.IndexMissingException;
|
|
|
+import org.elasticsearch.indices.IndicesLifecycle;
|
|
|
+import org.elasticsearch.indices.IndicesService;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.transport.*;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
-import java.util.concurrent.CountDownLatch;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
public class SyncedFlushService extends AbstractComponent {
|
|
|
|
|
@@ -82,10 +82,10 @@ public class SyncedFlushService extends AbstractComponent {
|
|
|
public void onShardInactive(final IndexShard indexShard) {
|
|
|
// we only want to call sync flush once, so only trigger it when we are on a primary
|
|
|
if (indexShard.routingEntry().primary()) {
|
|
|
- attemptSyncedFlush(indexShard.shardId(), new ActionListener<SyncedFlushResult>() {
|
|
|
+ attemptSyncedFlush(indexShard.shardId(), new ActionListener<ShardsSyncedFlushResult>() {
|
|
|
@Override
|
|
|
- public void onResponse(SyncedFlushResult syncedFlushResult) {
|
|
|
- logger.debug("{} sync flush on inactive shard returned successfully for sync_id: {}", syncedFlushResult.getShardId(), syncedFlushResult.syncId());
|
|
|
+ public void onResponse(ShardsSyncedFlushResult syncedFlushResult) {
|
|
|
+ logger.trace("{} sync flush on inactive shard returned successfully for sync_id: {}", syncedFlushResult.getShardId(), syncedFlushResult.syncId());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -98,6 +98,48 @@ public class SyncedFlushService extends AbstractComponent {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ public void attemptSyncedFlush(final String[] aliasesOrIndices, IndicesOptions indicesOptions, final ActionListener<IndicesSyncedFlushResult> listener) {
|
|
|
+ final ClusterState state = clusterService.state();
|
|
|
+ final String[] concreteIndices = state.metaData().concreteIndices(indicesOptions, aliasesOrIndices);
|
|
|
+ final Map<String, List<ShardsSyncedFlushResult>> results = ConcurrentCollections.newConcurrentMap();
|
|
|
+ int totalNumberOfShards = 0;
|
|
|
+ int numberOfShards = 0;
|
|
|
+ for (String index : concreteIndices) {
|
|
|
+ final IndexMetaData indexMetaData = state.metaData().index(index);
|
|
|
+ totalNumberOfShards += indexMetaData.totalNumberOfShards();
|
|
|
+ numberOfShards += indexMetaData.getNumberOfShards();
|
|
|
+ results.put(index, Collections.synchronizedList(new ArrayList<ShardsSyncedFlushResult>()));
|
|
|
+
|
|
|
+ }
|
|
|
+ final int finalTotalNumberOfShards = totalNumberOfShards;
|
|
|
+ final CountDown countDown = new CountDown(numberOfShards);
|
|
|
+
|
|
|
+ for (final String index : concreteIndices) {
|
|
|
+ final int indexNumberOfShards = state.metaData().index(index).getNumberOfShards();
|
|
|
+ for (int shard = 0; shard < indexNumberOfShards; shard++) {
|
|
|
+ final ShardId shardId = new ShardId(index, shard);
|
|
|
+ attemptSyncedFlush(shardId, new ActionListener<ShardsSyncedFlushResult>() {
|
|
|
+ @Override
|
|
|
+ public void onResponse(ShardsSyncedFlushResult syncedFlushResult) {
|
|
|
+ results.get(index).add(syncedFlushResult);
|
|
|
+ if (countDown.countDown()) {
|
|
|
+ listener.onResponse(new IndicesSyncedFlushResult(results));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(Throwable e) {
|
|
|
+ logger.debug("{} unexpected error while executing synced flush", shardId);
|
|
|
+ results.get(index).add(new ShardsSyncedFlushResult(shardId, finalTotalNumberOfShards, e.getMessage()));
|
|
|
+ if (countDown.countDown()) {
|
|
|
+ listener.onResponse(new IndicesSyncedFlushResult(results));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/*
|
|
|
* Tries to flush all copies of a shard and write a sync id to it.
|
|
|
* After a synced flush two shard copies may only contain the same sync id if they contain the same documents.
|
|
@@ -124,28 +166,36 @@ public class SyncedFlushService extends AbstractComponent {
|
|
|
*
|
|
|
* Synced flush is a best effort operation. The sync id may be written on all, some or none of the copies.
|
|
|
**/
|
|
|
- public void attemptSyncedFlush(final ShardId shardId, final ActionListener<SyncedFlushResult> actionListener) {
|
|
|
+ public void attemptSyncedFlush(final ShardId shardId, final ActionListener<ShardsSyncedFlushResult> actionListener) {
|
|
|
try {
|
|
|
final ClusterState state = clusterService.state();
|
|
|
- final IndexShardRoutingTable shardRoutingTable = getActiveShardRoutings(shardId, state);
|
|
|
+ final IndexShardRoutingTable shardRoutingTable = getShardRoutingTable(shardId, state);
|
|
|
final List<ShardRouting> activeShards = shardRoutingTable.activeShards();
|
|
|
+ final int totalShards = shardRoutingTable.getSize();
|
|
|
+
|
|
|
+ if (activeShards.size() == 0) {
|
|
|
+ actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "no active shards"));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
final ActionListener<Map<String, Engine.CommitId>> commitIdsListener = new ActionListener<Map<String, Engine.CommitId>>() {
|
|
|
@Override
|
|
|
public void onResponse(final Map<String, Engine.CommitId> commitIds) {
|
|
|
if (commitIds.isEmpty()) {
|
|
|
- actionListener.onResponse(new SyncedFlushResult(shardId, "all shards failed to commit on pre-sync"));
|
|
|
+ actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "all shards failed to commit on pre-sync"));
|
|
|
+ return;
|
|
|
}
|
|
|
final ActionListener<InFlightOpsResponse> inflightOpsListener = new ActionListener<InFlightOpsResponse>() {
|
|
|
@Override
|
|
|
public void onResponse(InFlightOpsResponse response) {
|
|
|
final int inflight = response.opCount();
|
|
|
- assert inflight >= -1;
|
|
|
- if (inflight != 1) { // 1 means that there are no write operations are in flight (>1) and the shard is not closed (0).
|
|
|
- actionListener.onResponse(new SyncedFlushResult(shardId, "operation counter on primary is non zero [" + inflight + "]"));
|
|
|
+ assert inflight >= 0;
|
|
|
+ if (inflight != 0) {
|
|
|
+ actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + "] ongoing operations on primary"));
|
|
|
} else {
|
|
|
// 3. now send the sync request to all the shards
|
|
|
String syncId = Strings.base64UUID();
|
|
|
- sendSyncRequests(syncId, activeShards, state, commitIds, shardId, actionListener);
|
|
|
+ sendSyncRequests(syncId, activeShards, state, commitIds, shardId, totalShards, actionListener);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -171,7 +221,7 @@ public class SyncedFlushService extends AbstractComponent {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- final IndexShardRoutingTable getActiveShardRoutings(ShardId shardId, ClusterState state) {
|
|
|
+ final IndexShardRoutingTable getShardRoutingTable(ShardId shardId, ClusterState state) {
|
|
|
final IndexRoutingTable indexRoutingTable = state.routingTable().index(shardId.index().name());
|
|
|
if (indexRoutingTable == null) {
|
|
|
IndexMetaData index = state.getMetaData().index(shardId.index().getName());
|
|
@@ -188,7 +238,7 @@ public class SyncedFlushService extends AbstractComponent {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * returns the number of inflight operations on primary. -1 upon error.
|
|
|
+ * returns the number of in flight operations on primary. -1 upon error.
|
|
|
*/
|
|
|
protected void getInflightOpsCount(final ShardId shardId, ClusterState state, IndexShardRoutingTable shardRoutingTable, final ActionListener<InFlightOpsResponse> listener) {
|
|
|
try {
|
|
@@ -214,7 +264,7 @@ public class SyncedFlushService extends AbstractComponent {
|
|
|
|
|
|
@Override
|
|
|
public void handleException(TransportException exp) {
|
|
|
- logger.debug("{} unexpected error while retrieving inflight op count", shardId);
|
|
|
+ logger.debug("{} unexpected error while retrieving in flight op count", shardId);
|
|
|
listener.onFailure(exp);
|
|
|
}
|
|
|
|
|
@@ -229,7 +279,8 @@ public class SyncedFlushService extends AbstractComponent {
|
|
|
}
|
|
|
|
|
|
|
|
|
- void sendSyncRequests(final String syncId, final List<ShardRouting> shards, ClusterState state, Map<String, Engine.CommitId> expectedCommitIds, final ShardId shardId, final ActionListener<SyncedFlushResult> listener) {
|
|
|
+ void sendSyncRequests(final String syncId, final List<ShardRouting> shards, ClusterState state, Map<String, Engine.CommitId> expectedCommitIds,
|
|
|
+ final ShardId shardId, final int totalShards, final ActionListener<ShardsSyncedFlushResult> listener) {
|
|
|
final CountDown countDown = new CountDown(shards.size());
|
|
|
final Map<ShardRouting, SyncedFlushResponse> results = ConcurrentCollections.newConcurrentMap();
|
|
|
for (final ShardRouting shard : shards) {
|
|
@@ -237,14 +288,14 @@ public class SyncedFlushService extends AbstractComponent {
|
|
|
if (node == null) {
|
|
|
logger.trace("{} is assigned to an unknown node. skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
|
|
|
results.put(shard, new SyncedFlushResponse("unknown node"));
|
|
|
- contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results);
|
|
|
+ contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
|
|
|
continue;
|
|
|
}
|
|
|
final Engine.CommitId expectedCommitId = expectedCommitIds.get(shard.currentNodeId());
|
|
|
if (expectedCommitId == null) {
|
|
|
logger.trace("{} can't resolve expected commit id for {}, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
|
|
|
results.put(shard, new SyncedFlushResponse("no commit id from pre-sync flush"));
|
|
|
- contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results);
|
|
|
+ contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
|
|
|
continue;
|
|
|
}
|
|
|
logger.trace("{} sending synced flush request to {}. sync id [{}].", shardId, shard, syncId);
|
|
@@ -260,14 +311,14 @@ public class SyncedFlushService extends AbstractComponent {
|
|
|
SyncedFlushResponse existing = results.put(shard, response);
|
|
|
assert existing == null : "got two answers for node [" + node + "]";
|
|
|
// count after the assert so we won't decrement twice in handleException
|
|
|
- contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results);
|
|
|
+ contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void handleException(TransportException exp) {
|
|
|
logger.trace("{} error while performing synced flush on [{}], skipping", exp, shardId, shard);
|
|
|
results.put(shard, new SyncedFlushResponse(exp.getMessage()));
|
|
|
- contDownAndSendResponseIfDone(syncId, shards, shardId, listener, countDown, results);
|
|
|
+ contDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -279,10 +330,12 @@ public class SyncedFlushService extends AbstractComponent {
|
|
|
|
|
|
}
|
|
|
|
|
|
- private void contDownAndSendResponseIfDone(String syncId, List<ShardRouting> shards, ShardId shardId, ActionListener<SyncedFlushResult> listener, CountDown countDown, Map<ShardRouting, SyncedFlushResponse> results) {
|
|
|
+ private void contDownAndSendResponseIfDone(String syncId, List<ShardRouting> shards, ShardId shardId, int totalShards,
|
|
|
+ ActionListener<ShardsSyncedFlushResult> listener, CountDown countDown, Map<ShardRouting,
|
|
|
+ SyncedFlushResponse> results) {
|
|
|
if (countDown.countDown()) {
|
|
|
assert results.size() == shards.size();
|
|
|
- listener.onResponse(new SyncedFlushResult(shardId, syncId, results));
|
|
|
+ listener.onResponse(new ShardsSyncedFlushResult(shardId, syncId, totalShards, results));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -297,8 +350,8 @@ public class SyncedFlushService extends AbstractComponent {
|
|
|
final DiscoveryNode node = state.nodes().get(shard.currentNodeId());
|
|
|
if (node == null) {
|
|
|
logger.trace("{} shard routing {} refers to an unknown node. skipping.", shardId, shard);
|
|
|
- if(countDown.countDown()) {
|
|
|
- listener.onResponse(commitIds);
|
|
|
+ if (countDown.countDown()) {
|
|
|
+ listener.onResponse(commitIds);
|
|
|
}
|
|
|
continue;
|
|
|
}
|
|
@@ -313,7 +366,7 @@ public class SyncedFlushService extends AbstractComponent {
|
|
|
Engine.CommitId existing = commitIds.putIfAbsent(node.id(), response.commitId());
|
|
|
assert existing == null : "got two answers for node [" + node + "]";
|
|
|
// count after the assert so we won't decrement twice in handleException
|
|
|
- if(countDown.countDown()) {
|
|
|
+ if (countDown.countDown()) {
|
|
|
listener.onResponse(commitIds);
|
|
|
}
|
|
|
}
|
|
@@ -321,7 +374,7 @@ public class SyncedFlushService extends AbstractComponent {
|
|
|
@Override
|
|
|
public void handleException(TransportException exp) {
|
|
|
logger.trace("{} error while performing pre synced flush on [{}], skipping", shardId, exp, shard);
|
|
|
- if(countDown.countDown()) {
|
|
|
+ if (countDown.countDown()) {
|
|
|
listener.onResponse(commitIds);
|
|
|
}
|
|
|
}
|
|
@@ -348,7 +401,7 @@ public class SyncedFlushService extends AbstractComponent {
|
|
|
IndexShard indexShard = indexService.shardSafe(request.shardId().id());
|
|
|
logger.trace("{} performing sync flush. sync id [{}], expected commit id {}", request.shardId(), request.syncId(), request.expectedCommitId());
|
|
|
Engine.SyncedFlushResult result = indexShard.syncFlush(request.syncId(), request.expectedCommitId());
|
|
|
- logger.trace("{} sync flush done. sync id [{}], result [{}]", request.shardId(), request.syncId(), result);
|
|
|
+ logger.trace("{} sync flush done. sync id [{}], result [{}]", request.shardId(), request.syncId(), result);
|
|
|
switch (result) {
|
|
|
case SUCCESS:
|
|
|
return new SyncedFlushResponse();
|
|
@@ -372,124 +425,6 @@ public class SyncedFlushService extends AbstractComponent {
|
|
|
return new InFlightOpsResponse(opCount);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Result for all copies of a shard
|
|
|
- */
|
|
|
- public static class SyncedFlushResult extends TransportResponse {
|
|
|
- private String failureReason;
|
|
|
- private Map<ShardRouting, SyncedFlushResponse> shardResponses;
|
|
|
- private String syncId;
|
|
|
- private ShardId shardId;
|
|
|
-
|
|
|
- public SyncedFlushResult() {
|
|
|
- }
|
|
|
-
|
|
|
- public ShardId getShardId() {
|
|
|
- return shardId;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * failure constructor
|
|
|
- */
|
|
|
- public SyncedFlushResult(ShardId shardId, String failureReason) {
|
|
|
- this.syncId = null;
|
|
|
- this.failureReason = failureReason;
|
|
|
- this.shardResponses = ImmutableMap.of();
|
|
|
- this.shardId = shardId;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * success constructor
|
|
|
- */
|
|
|
- public SyncedFlushResult(ShardId shardId, String syncId, Map<ShardRouting, SyncedFlushResponse> shardResponses) {
|
|
|
- this.failureReason = null;
|
|
|
- ImmutableMap.Builder<ShardRouting, SyncedFlushResponse> builder = ImmutableMap.builder();
|
|
|
- this.shardResponses = builder.putAll(shardResponses).build();
|
|
|
- this.syncId = syncId;
|
|
|
- this.shardId = shardId;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * @return true if one or more shard copies was successful, false if all failed before step three of synced flush
|
|
|
- */
|
|
|
- public boolean success() {
|
|
|
- return syncId != null;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * @return the reason for the failure if synced flush failed before step three of synced flush
|
|
|
- */
|
|
|
- public String failureReason() {
|
|
|
- return failureReason;
|
|
|
- }
|
|
|
-
|
|
|
- public String syncId() {
|
|
|
- return syncId;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * @return total number of shards for which a sync attempt was made
|
|
|
- */
|
|
|
- public int totalShards() {
|
|
|
- return shardResponses.size();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * @return total number of successful shards
|
|
|
- */
|
|
|
- public int successfulShards() {
|
|
|
- int i = 0;
|
|
|
- for (SyncedFlushResponse result : shardResponses.values()) {
|
|
|
- if (result.success()) {
|
|
|
- i++;
|
|
|
- }
|
|
|
- }
|
|
|
- return i;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * @return Individual responses for each shard copy with a detailed failure message if the copy failed to perform the synced flush.
|
|
|
- * Empty if synced flush failed before step three.
|
|
|
- */
|
|
|
- public Map<ShardRouting, SyncedFlushResponse> shardResponses() {
|
|
|
- return shardResponses;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void writeTo(StreamOutput out) throws IOException {
|
|
|
- super.writeTo(out);
|
|
|
- out.writeOptionalString(failureReason);
|
|
|
- out.writeOptionalString(syncId);
|
|
|
- out.writeVInt(shardResponses.size());
|
|
|
- for (Map.Entry<ShardRouting, SyncedFlushResponse> result : shardResponses.entrySet()) {
|
|
|
- result.getKey().writeTo(out);
|
|
|
- result.getValue().writeTo(out);
|
|
|
- }
|
|
|
- shardId.writeTo(out);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void readFrom(StreamInput in) throws IOException {
|
|
|
- super.readFrom(in);
|
|
|
- failureReason = in.readOptionalString();
|
|
|
- syncId = in.readOptionalString();
|
|
|
- int size = in.readVInt();
|
|
|
- ImmutableMap.Builder<ShardRouting, SyncedFlushResponse> builder = ImmutableMap.builder();
|
|
|
- for (int i = 0; i < size; i++) {
|
|
|
- ImmutableShardRouting shardRouting = ImmutableShardRouting.readShardRoutingEntry(in);
|
|
|
- SyncedFlushResponse syncedFlushRsponse = new SyncedFlushResponse();
|
|
|
- syncedFlushRsponse.readFrom(in);
|
|
|
- builder.put(shardRouting, syncedFlushRsponse);
|
|
|
- }
|
|
|
- shardResponses = builder.build();
|
|
|
- shardId = ShardId.readShardId(in);
|
|
|
- }
|
|
|
-
|
|
|
- public ShardId shardId() {
|
|
|
- return shardId;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
final static class PreSyncedFlushRequest extends TransportRequest {
|
|
|
private ShardId shardId;
|
|
|
|