Browse Source

TEST: write ops should execute under shard permit (#28966)

Currently ESIndexLevelReplicationTestCase executes write operations
without acquiring  index shard permit. This may prevent the primary term
on replica from being updated or cause a race between resync and
indexing on primary. This commit ensures that write operations are
always executed under shard permit like the production code.
Nhat Nguyen 7 years ago
parent
commit
c75790e7c0

+ 21 - 8
server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

@@ -456,6 +456,10 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
             }
         }
 
+        IndexShard getPrimaryShard() {
+            return replicationGroup.primary;
+        }
+
         protected abstract PrimaryResult performOnPrimary(IndexShard primary, Request request) throws Exception;
 
         protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica) throws Exception;
@@ -592,7 +596,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
 
         @Override
         protected void performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
-            executeShardBulkOnReplica(replica, request);
+            executeShardBulkOnReplica(request, replica, getPrimaryShard().getPrimaryTerm(), getPrimaryShard().getGlobalCheckpoint());
         }
     }
 
@@ -602,15 +606,24 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
                 ((IndexRequest) itemRequest.request()).process(Version.CURRENT, null, index.getName());
             }
         }
-        final TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardResponse> result =
-                TransportShardBulkAction.performOnPrimary(request, primary, null,
-                System::currentTimeMillis, new TransportShardBulkActionTests.NoopMappingUpdatePerformer());
+        final PlainActionFuture<Releasable> permitAcquiredFuture = new PlainActionFuture<>();
+        primary.acquirePrimaryOperationPermit(permitAcquiredFuture, ThreadPool.Names.SAME, request);
+        final TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardResponse> result;
+        try (Releasable ignored = permitAcquiredFuture.actionGet()) {
+            result = TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis,
+                new TransportShardBulkActionTests.NoopMappingUpdatePerformer());
+        }
         TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.location, logger);
         return result;
     }
 
-    private void executeShardBulkOnReplica(IndexShard replica, BulkShardRequest request) throws Exception {
-        final Translog.Location location = TransportShardBulkAction.performOnReplica(request, replica);
+    private void executeShardBulkOnReplica(BulkShardRequest request, IndexShard replica, long operationPrimaryTerm, long globalCheckpointOnPrimary) throws Exception {
+        final PlainActionFuture<Releasable> permitAcquiredFuture = new PlainActionFuture<>();
+        replica.acquireReplicaOperationPermit(operationPrimaryTerm, globalCheckpointOnPrimary, permitAcquiredFuture, ThreadPool.Names.SAME, request);
+        final Translog.Location location;
+        try (Releasable ignored = permitAcquiredFuture.actionGet()) {
+            location = TransportShardBulkAction.performOnReplica(request, replica);
+        }
         TransportWriteActionTestHelper.performPostWriteActions(replica, request, location, logger);
     }
 
@@ -630,8 +643,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
     /**
      * indexes the given requests on the supplied replica shard
      */
-    void indexOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
-        executeShardBulkOnReplica(replica, request);
+    void indexOnReplica(BulkShardRequest request, ReplicationGroup group, IndexShard replica) throws Exception {
+        executeShardBulkOnReplica(request, replica, group.primary.getPrimaryTerm(), group.primary.getGlobalCheckpoint());
     }
 
     class GlobalCheckpointSync extends ReplicationAction<

+ 2 - 2
server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java

@@ -209,7 +209,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
             logger.info("--> isolated replica " + replica1.routingEntry());
             BulkShardRequest replicationRequest = indexOnPrimary(indexRequest, shards.getPrimary());
             for (int i = 1; i < replicas.size(); i++) {
-                indexOnReplica(replicationRequest, replicas.get(i));
+                indexOnReplica(replicationRequest, shards, replicas.get(i));
             }
 
             logger.info("--> promoting replica to primary " + replica1.routingEntry());
@@ -318,7 +318,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
             logger.info("--> Isolate replica1");
             IndexRequest indexDoc1 = new IndexRequest(index.getName(), "type", "d1").source("{}", XContentType.JSON);
             BulkShardRequest replicationRequest = indexOnPrimary(indexDoc1, shards.getPrimary());
-            indexOnReplica(replicationRequest, replica2);
+            indexOnReplica(replicationRequest, shards, replica2);
 
             final Translog.Operation op1;
             final List<Translog.Operation> initOperations = new ArrayList<>(initDocs);

+ 3 - 3
server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java

@@ -236,7 +236,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
                     final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "rollback_" + i)
                             .source("{}", XContentType.JSON);
                     final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary);
-                    indexOnReplica(bulkShardRequest, replica);
+                    indexOnReplica(bulkShardRequest, shards, replica);
                 }
                 if (randomBoolean()) {
                     oldPrimary.flush(new FlushRequest(index.getName()));
@@ -326,7 +326,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
                 final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "stale_" + i)
                     .source("{}", XContentType.JSON);
                 final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary);
-                indexOnReplica(bulkShardRequest, replica);
+                indexOnReplica(bulkShardRequest, shards, replica);
             }
             shards.flush();
             shards.promoteReplicaToPrimary(newPrimary).get();
@@ -374,7 +374,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
                 final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "extra_" + i)
                     .source("{}", XContentType.JSON);
                 final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary);
-                indexOnReplica(bulkShardRequest, newPrimary);
+                indexOnReplica(bulkShardRequest, shards, newPrimary);
             }
             logger.info("--> resyncing replicas");
             PrimaryReplicaSyncer.ResyncTask task = shards.promoteReplicaToPrimary(newPrimary).get();