Browse Source

Do not ignore shard not-available exceptions in replication (#28571)

The shard not-available exceptions are currently ignored in the
replication as the best effort avoids failing not-yet-ready shards.
However these exceptions can also happen from fully active shards. If
this is the case, we may have skipped important failures from replicas.
Since #28049, only fully initialized shards are received write requests.
This restriction allows us to handle all exceptions in the replication.

There is a side-effect with this change. If a replica retries its peer
recovery second time after being tracked in the replication group, it
can receive replication requests even though it's not-yet-ready. That
shard may be failed and allocated to another node even though it has a
good lucene index on that node.

This PR does not change the way we report replication errors to users,
hence the shard not-available exceptions won't be reported as before.

Relates #28049
Relates #28534
Nhat Nguyen 7 years ago
parent
commit
dbf9fb31e4

+ 9 - 15
server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

@@ -178,25 +178,19 @@ public class ReplicationOperation<
 
             @Override
             public void onFailure(Exception replicaException) {
-                logger.trace(
-                    (org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
-                        "[{}] failure while performing [{}] on replica {}, request [{}]",
-                        shard.shardId(),
-                        opType,
-                        shard,
-                        replicaRequest),
-                    replicaException);
-                if (TransportActions.isShardNotAvailableException(replicaException)) {
-                    decPendingAndFinishIfNeeded();
-                } else {
+                logger.trace((org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
+                    "[{}] failure while performing [{}] on replica {}, request [{}]",
+                    shard.shardId(), opType, shard, replicaRequest), replicaException);
+                // Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
+                if (TransportActions.isShardNotAvailableException(replicaException) == false) {
                     RestStatus restStatus = ExceptionsHelper.status(replicaException);
                     shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
                         shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
-                    String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
-                    replicasProxy.failShardIfNeeded(shard, message,
-                            replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
-                            ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
                 }
+                String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
+                replicasProxy.failShardIfNeeded(shard, message,
+                    replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
+                    ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
             }
         });
     }

+ 9 - 13
server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java

@@ -29,7 +29,6 @@ import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
@@ -54,7 +53,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 
 import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
 import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
@@ -95,28 +93,26 @@ public class ReplicationOperationTests extends ESTestCase {
 
         final Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, initialState, trackedShards);
 
-        final Map<ShardRouting, Exception> expectedFailures = new HashMap<>();
-        final Set<ShardRouting> expectedFailedShards = new HashSet<>();
+        final Map<ShardRouting, Exception> simulatedFailures = new HashMap<>();
+        final Map<ShardRouting, Exception> reportedFailures = new HashMap<>();
         for (ShardRouting replica : expectedReplicas) {
             if (randomBoolean()) {
                 Exception t;
                 boolean criticalFailure = randomBoolean();
                 if (criticalFailure) {
                     t = new CorruptIndexException("simulated", (String) null);
+                    reportedFailures.put(replica, t);
                 } else {
                     t = new IndexShardNotStartedException(shardId, IndexShardState.RECOVERING);
                 }
                 logger.debug("--> simulating failure on {} with [{}]", replica, t.getClass().getSimpleName());
-                expectedFailures.put(replica, t);
-                if (criticalFailure) {
-                    expectedFailedShards.add(replica);
-                }
+                simulatedFailures.put(replica, t);
             }
         }
 
         Request request = new Request(shardId);
         PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
-        final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, expectedFailures);
+        final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, simulatedFailures);
 
         final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup);
         final TestReplicationOperation op = new TestReplicationOperation(request,
@@ -125,13 +121,13 @@ public class ReplicationOperationTests extends ESTestCase {
 
         assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
         assertThat(request.processedOnReplicas, equalTo(expectedReplicas));
-        assertThat(replicasProxy.failedReplicas, equalTo(expectedFailedShards));
+        assertThat(replicasProxy.failedReplicas, equalTo(simulatedFailures.keySet()));
         assertThat(replicasProxy.markedAsStaleCopies, equalTo(staleAllocationIds));
         assertTrue("listener is not marked as done", listener.isDone());
         ShardInfo shardInfo = listener.actionGet().getShardInfo();
-        assertThat(shardInfo.getFailed(), equalTo(expectedFailedShards.size()));
-        assertThat(shardInfo.getFailures(), arrayWithSize(expectedFailedShards.size()));
-        assertThat(shardInfo.getSuccessful(), equalTo(1 + expectedReplicas.size() - expectedFailures.size()));
+        assertThat(shardInfo.getFailed(), equalTo(reportedFailures.size()));
+        assertThat(shardInfo.getFailures(), arrayWithSize(reportedFailures.size()));
+        assertThat(shardInfo.getSuccessful(), equalTo(1 + expectedReplicas.size() - simulatedFailures.size()));
         final List<ShardRouting> unassignedShards =
             indexShardRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED);
         final int totalShards = 1 + expectedReplicas.size() + unassignedShards.size() + untrackedShards.size();