|
@@ -19,6 +19,7 @@
|
|
|
|
|
|
package org.elasticsearch.cluster.action.shard;
|
|
|
|
|
|
+import org.elasticsearch.ExceptionsHelper;
|
|
|
import org.elasticsearch.cluster.ClusterService;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
|
@@ -30,6 +31,7 @@ import org.elasticsearch.cluster.routing.*;
|
|
|
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
|
|
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
|
|
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
|
|
+import org.elasticsearch.common.Nullable;
|
|
|
import org.elasticsearch.common.Priority;
|
|
|
import org.elasticsearch.common.component.AbstractComponent;
|
|
|
import org.elasticsearch.common.inject.Inject;
|
|
@@ -77,22 +79,22 @@ public class ShardStateAction extends AbstractComponent {
|
|
|
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardRoutingEntry.class, ThreadPool.Names.SAME, new ShardFailedTransportHandler());
|
|
|
}
|
|
|
|
|
|
- public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String reason) {
|
|
|
+ public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure) {
|
|
|
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
|
|
|
if (masterNode == null) {
|
|
|
logger.warn("can't send shard failed for {}, no master known.", shardRouting);
|
|
|
return;
|
|
|
}
|
|
|
- innerShardFailed(shardRouting, indexUUID, reason, masterNode);
|
|
|
+ innerShardFailed(shardRouting, indexUUID, masterNode, message, failure);
|
|
|
}
|
|
|
|
|
|
- public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final String reason, final DiscoveryNode masterNode) {
|
|
|
- logger.trace("{} re-sending failed shard for {}, indexUUID [{}], reason [{}]", shardRouting.shardId(), shardRouting, indexUUID, reason);
|
|
|
- innerShardFailed(shardRouting, indexUUID, reason, masterNode);
|
|
|
+ public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, @Nullable final Throwable failure) {
|
|
|
+ logger.trace("{} re-sending failed shard for {}, indexUUID [{}], reason [{}]", failure, shardRouting.shardId(), shardRouting, indexUUID, message);
|
|
|
+ innerShardFailed(shardRouting, indexUUID, masterNode, message, failure);
|
|
|
}
|
|
|
|
|
|
- private void innerShardFailed(final ShardRouting shardRouting, final String indexUUID, final String reason, final DiscoveryNode masterNode) {
|
|
|
- ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason);
|
|
|
+ private void innerShardFailed(final ShardRouting shardRouting, final String indexUUID, final DiscoveryNode masterNode, final String message, final Throwable failure) {
|
|
|
+ ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
|
|
|
transportService.sendRequest(masterNode,
|
|
|
SHARD_FAILED_ACTION_NAME, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
|
|
@Override
|
|
@@ -105,20 +107,17 @@ public class ShardStateAction extends AbstractComponent {
|
|
|
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason) {
|
|
|
DiscoveryNode masterNode = clusterService.state().nodes().masterNode();
|
|
|
if (masterNode == null) {
|
|
|
- logger.warn("can't send shard started for {}. no master known.", shardRouting);
|
|
|
+ logger.warn("{} can't send shard started for {}, no master known.", shardRouting.shardId(), shardRouting);
|
|
|
return;
|
|
|
}
|
|
|
shardStarted(shardRouting, indexUUID, reason, masterNode);
|
|
|
}
|
|
|
|
|
|
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String reason, final DiscoveryNode masterNode) {
|
|
|
-
|
|
|
- ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason);
|
|
|
-
|
|
|
- logger.debug("sending shard started for {}", shardRoutingEntry);
|
|
|
-
|
|
|
+ ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, reason, null);
|
|
|
+ logger.debug("{} sending shard started for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
|
|
|
transportService.sendRequest(masterNode,
|
|
|
- SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
|
|
+ SHARD_STARTED_ACTION_NAME, new ShardRoutingEntry(shardRouting, indexUUID, reason, null), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
|
|
@Override
|
|
|
public void handleException(TransportException exp) {
|
|
|
logger.warn("failed to send shard started to [{}]", exp, masterNode);
|
|
@@ -128,9 +127,9 @@ public class ShardStateAction extends AbstractComponent {
|
|
|
}
|
|
|
|
|
|
private void handleShardFailureOnMaster(final ShardRoutingEntry shardRoutingEntry) {
|
|
|
- logger.warn("{} received shard failed for {}", shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
|
|
|
+ logger.warn("{} received shard failed for {}", shardRoutingEntry.failure, shardRoutingEntry.shardRouting.shardId(), shardRoutingEntry);
|
|
|
failedShardQueue.add(shardRoutingEntry);
|
|
|
- clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.reason + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
|
|
|
+ clusterService.submitStateUpdateTask("shard-failed (" + shardRoutingEntry.shardRouting + "), message [" + shardRoutingEntry.message + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
|
|
|
|
|
|
@Override
|
|
|
public ClusterState execute(ClusterState currentState) {
|
|
@@ -151,7 +150,7 @@ public class ShardStateAction extends AbstractComponent {
|
|
|
|
|
|
List<FailedRerouteAllocation.FailedShard> shardRoutingsToBeApplied = new ArrayList<>(shardRoutingEntries.size());
|
|
|
for (ShardRoutingEntry entry : extractShardsToBeApplied(shardRoutingEntries, "failed", metaData, logger)) {
|
|
|
- shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(entry.shardRouting, entry.reason));
|
|
|
+ shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(entry.shardRouting, entry.message, entry.failure));
|
|
|
}
|
|
|
|
|
|
// mark all entries as processed
|
|
@@ -214,7 +213,7 @@ public class ShardStateAction extends AbstractComponent {
|
|
|
// process started events as fast as possible, to make shards available
|
|
|
startedShardsQueue.add(shardRoutingEntry);
|
|
|
|
|
|
- clusterService.submitStateUpdateTask("shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.reason + "]", Priority.URGENT,
|
|
|
+ clusterService.submitStateUpdateTask("shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]", Priority.URGENT,
|
|
|
new ClusterStateUpdateTask() {
|
|
|
@Override
|
|
|
public ClusterState execute(ClusterState currentState) {
|
|
@@ -284,41 +283,43 @@ public class ShardStateAction extends AbstractComponent {
|
|
|
static class ShardRoutingEntry extends TransportRequest {
|
|
|
|
|
|
ShardRouting shardRouting;
|
|
|
-
|
|
|
String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
|
|
|
-
|
|
|
- String reason;
|
|
|
+ String message;
|
|
|
+ Throwable failure;
|
|
|
|
|
|
volatile boolean processed; // state field, no need to serialize
|
|
|
|
|
|
ShardRoutingEntry() {
|
|
|
}
|
|
|
|
|
|
- ShardRoutingEntry(ShardRouting shardRouting, String indexUUID, String reason) {
|
|
|
+ ShardRoutingEntry(ShardRouting shardRouting, String indexUUID, String message, @Nullable Throwable failure) {
|
|
|
this.shardRouting = shardRouting;
|
|
|
- this.reason = reason;
|
|
|
this.indexUUID = indexUUID;
|
|
|
+ this.message = message;
|
|
|
+ this.failure = failure;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void readFrom(StreamInput in) throws IOException {
|
|
|
super.readFrom(in);
|
|
|
shardRouting = readShardRoutingEntry(in);
|
|
|
- reason = in.readString();
|
|
|
indexUUID = in.readString();
|
|
|
+ message = in.readString();
|
|
|
+ failure = in.readThrowable();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void writeTo(StreamOutput out) throws IOException {
|
|
|
super.writeTo(out);
|
|
|
shardRouting.writeTo(out);
|
|
|
- out.writeString(reason);
|
|
|
out.writeString(indexUUID);
|
|
|
+ out.writeString(message);
|
|
|
+ out.writeThrowable(failure);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
- return "" + shardRouting + ", indexUUID [" + indexUUID + "], reason [" + reason + "]";
|
|
|
+ return "" + shardRouting + ", indexUUID [" + indexUUID + "], message [" + message + "], failure [" + ExceptionsHelper.detailedMessage(failure) + "]";
|
|
|
}
|
|
|
}
|
|
|
}
|