|
@@ -8,14 +8,21 @@
|
|
|
*/
|
|
|
package org.elasticsearch.cluster.routing.allocation;
|
|
|
|
|
|
+import org.elasticsearch.ElasticsearchException;
|
|
|
+import org.elasticsearch.ExceptionsHelper;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
+import org.elasticsearch.cluster.metadata.Metadata;
|
|
|
+import org.elasticsearch.cluster.service.ClusterService;
|
|
|
+import org.elasticsearch.common.Priority;
|
|
|
import org.elasticsearch.index.IndexService;
|
|
|
import org.elasticsearch.index.shard.IndexShard;
|
|
|
import org.elasticsearch.indices.IndicesService;
|
|
|
import org.elasticsearch.test.ESIntegTestCase;
|
|
|
|
|
|
+import java.util.concurrent.CyclicBarrier;
|
|
|
+
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
|
|
|
public class ShardStateIT extends ESIntegTestCase {
|
|
@@ -76,4 +83,58 @@ public class ShardStateIT extends ESIntegTestCase {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ public void testGetPendingTasksSourceStringDataForFailedShard() throws Exception {
|
|
|
+ internalCluster().ensureAtLeastNumDataNodes(1);
|
|
|
+ prepareCreate("test").setSettings(indexSettings(1, 0)).get();
|
|
|
+ ensureGreen();
|
|
|
+
|
|
|
+ final var masterNodeClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
|
|
|
+ final var barrier = new CyclicBarrier(2);
|
|
|
+
|
|
|
+ // Used to block the master service task processing so we have a chance to get the pending shard-failed task.
|
|
|
+ masterNodeClusterService.createTaskQueue("initial-block", Priority.NORMAL, batchExecutionContext -> {
|
|
|
+ safeAwait(barrier);
|
|
|
+ safeAwait(barrier);
|
|
|
+ batchExecutionContext.taskContexts().forEach(c -> c.success(() -> {}));
|
|
|
+ return batchExecutionContext.initialState();
|
|
|
+ }).submitTask("initial-block", e -> fail(e, "unexpected"), null);
|
|
|
+
|
|
|
+ // Sync up with the blocking executor.
|
|
|
+ safeAwait(barrier);
|
|
|
+
|
|
|
+ // Obtain a reference to the IndexShard for shard 0.
|
|
|
+ final var state = masterNodeClusterService.state();
|
|
|
+ final var shard0RoutingTable = state.routingTable(Metadata.DEFAULT_PROJECT_ID).index("test").shard(0);
|
|
|
+ assertNotNull(shard0RoutingTable);
|
|
|
+ final var nodeId = shard0RoutingTable.primaryShard().currentNodeId();
|
|
|
+ final var node = state.nodes().get(nodeId).getName();
|
|
|
+ final var indicesService = internalCluster().getInstance(IndicesService.class, node);
|
|
|
+ final var shard0 = indicesService.indexService(resolveIndex("test")).getShard(0);
|
|
|
+ assertNotNull(shard0);
|
|
|
+
|
|
|
+ // Create a failed shard state action for shard 0.
|
|
|
+ final var shardFailedReason = "simulated test failure";
|
|
|
+ final var shardFailedException = new ElasticsearchException("simulated exception");
|
|
|
+ shard0.failShard(shardFailedReason, shardFailedException);
|
|
|
+
|
|
|
+ // Get the pending tasks and verify we see the shard-failed state action and expected source string components.
|
|
|
+ final var masterService = masterNodeClusterService.getMasterService();
|
|
|
+ assertBusy(() -> {
|
|
|
+ assertTrue(masterService.pendingTasks().stream().anyMatch(task -> {
|
|
|
+ final var src = task.getSource().string();
|
|
|
+ // We expect the failure reason and exception message, but not the stack trace.
|
|
|
+ return src.startsWith("shard-failed ")
|
|
|
+ && src.contains("[test][0]")
|
|
|
+ && src.contains(shardFailedReason)
|
|
|
+ && src.contains(shardFailedException.getMessage())
|
|
|
+ && src.contains(ExceptionsHelper.stackTrace(shardFailedException)) == false;
|
|
|
+ }));
|
|
|
+ });
|
|
|
+
|
|
|
+ // Unblock the master service from the executor above.
|
|
|
+ safeAwait(barrier);
|
|
|
+ // Wait for the failed shard task to get processed and then for the shard and cluster to recover.
|
|
|
+ ensureGreen();
|
|
|
+ }
|
|
|
}
|