浏览代码

Assert TransportReplicationActions acquire permits (#41271)

Today we do not distinguish "no operations in flight" from "operations are
blocked", since both return `0` from `IndexShard#getActiveOperationsCount()`.
We therefore cannot assert that every `TransportReplicationAction` performs its
actions under permit(s). This commit fixes this by returning
`IndexShard#OPERATIONS_BLOCKED` if operations are blocked, allowing these two
cases to be distinguished.
David Turner 6 年之前
父节点
当前提交
918232a9c6

+ 2 - 2
server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java

@@ -101,8 +101,8 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
 
     private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) {
         final ShardId shardId = indexShard.shardId();
-        if (indexShard.getActiveOperationsCount() != 0) {
-            throw new IllegalStateException("On-going operations in progress while checking index shard " + shardId + " before closing");
+        if (indexShard.getActiveOperationsCount() != IndexShard.OPERATIONS_BLOCKED) {
+            throw new IllegalStateException("Index shard " + shardId + " is not blocking all operations during closing");
         }
 
         final ClusterBlocks clusterBlocks = clusterService.state().blocks();

+ 2 - 0
server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

@@ -513,6 +513,7 @@ public abstract class TransportReplicationAction<
         @Override
         public void onResponse(Releasable releasable) {
             try {
+                assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit";
                 final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica);
                 releasable.close(); // release shard operation lock before responding to caller
                 final TransportReplicationAction.ReplicaResponse response =
@@ -912,6 +913,7 @@ public abstract class TransportReplicationAction<
                     return result;
                 });
             }
+            assert indexShard.getActiveOperationsCount() != 0 : "must perform shard operation under a permit";
             shardOperationOnPrimary(request, indexShard, listener);
         }
 

+ 13 - 4
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -636,7 +636,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
                 forceRefreshes.close();
                 // no shard operation permits are being held here, move state from started to relocated
-                assert indexShardOperationPermits.getActiveOperationsCount() == 0 :
+                assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED :
                         "in-flight operations in progress while moving shard state to relocated";
                 /*
                  * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a
@@ -1553,7 +1553,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                 assert assertReplicationTarget();
             } else {
                 assert origin == Engine.Operation.Origin.LOCAL_RESET;
-                assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]";
+                assert getActiveOperationsCount() == OPERATIONS_BLOCKED
+                    : "locally resetting without blocking operations, active operations are [" + getActiveOperations() + "]";
             }
             if (writeAllowedStates.contains(state) == false) {
                 throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " +
@@ -2740,8 +2741,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > getOperationPrimaryTerm());
     }
 
+    public static final int OPERATIONS_BLOCKED = -1;
+
+    /**
+     * Obtain the active operation count, or {@link IndexShard#OPERATIONS_BLOCKED} if all permits are held (even if there are
+     * outstanding operations in flight).
+     *
+     * @return the active operation count, or {@link IndexShard#OPERATIONS_BLOCKED} when all permits are held.
+     */
     public int getActiveOperationsCount() {
-        // refCount is incremented on successful acquire and decremented on close
         return indexShardOperationPermits.getActiveOperationsCount();
     }
 
@@ -3069,7 +3077,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
      * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
      */
     void resetEngineToGlobalCheckpoint() throws IOException {
-        assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]";
+        assert getActiveOperationsCount() == OPERATIONS_BLOCKED
+            : "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']';
         sync(); // persist the global checkpoint to disk
         final SeqNoStats seqNoStats = seqNoStats();
         final TranslogStats translogStats = translogStats();

+ 3 - 8
server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java

@@ -293,19 +293,14 @@ final class IndexShardOperationPermits implements Closeable {
     }
 
     /**
-     * Obtain the active operation count, or zero if all permits are held (even if there are outstanding operations in flight).
+     * Obtain the active operation count, or {@link IndexShard#OPERATIONS_BLOCKED} if all permits are held.
      *
-     * @return the active operation count, or zero when all permits are held
+     * @return the active operation count, or {@link IndexShard#OPERATIONS_BLOCKED} when all permits are held.
      */
     int getActiveOperationsCount() {
         int availablePermits = semaphore.availablePermits();
         if (availablePermits == 0) {
-            /*
-             * This occurs when either doBlockOperations is holding all the permits or there are outstanding operations in flight and the
-             * remainder of the permits are held by doBlockOperations. We do not distinguish between these two cases and simply say that
-             * the active operations count is zero.
-             */
-            return 0;
+            return IndexShard.OPERATIONS_BLOCKED; // This occurs when blockOperations() has acquired all the permits.
         } else {
             return TOTAL_PERMITS - availablePermits;
         }

+ 2 - 1
server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java

@@ -538,7 +538,7 @@ public class SyncedFlushService implements IndexEventListener {
             throw new IllegalStateException("[" + request.shardId() +"] expected a primary shard");
         }
         int opCount = indexShard.getActiveOperationsCount();
-        return new InFlightOpsResponse(opCount);
+        return new InFlightOpsResponse(opCount == IndexShard.OPERATIONS_BLOCKED ? 0 : opCount);
     }
 
     public static final class PreShardSyncedFlushRequest extends TransportRequest {
@@ -781,6 +781,7 @@ public class SyncedFlushService implements IndexEventListener {
         }
 
         InFlightOpsResponse(int opCount) {
+            assert opCount >= 0 : opCount;
             this.opCount = opCount;
         }
 

+ 4 - 4
server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java

@@ -100,7 +100,7 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
         super.setUp();
 
         indexShard = mock(IndexShard.class);
-        when(indexShard.getActiveOperationsCount()).thenReturn(0);
+        when(indexShard.getActiveOperationsCount()).thenReturn(IndexShard.OPERATIONS_BLOCKED);
 
         final ShardId shardId = new ShardId("index", "_na_", randomIntBetween(0, 3));
         when(indexShard.shardId()).thenReturn(shardId);
@@ -165,12 +165,12 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
         assertThat(flushRequest.getValue().force(), is(true));
     }
 
-    public void testOperationFailsWithOnGoingOps() {
-        when(indexShard.getActiveOperationsCount()).thenReturn(randomIntBetween(1, 10));
+    public void testOperationFailsWhenNotBlocked() {
+        when(indexShard.getActiveOperationsCount()).thenReturn(randomIntBetween(0, 10));
 
         IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
         assertThat(exception.getMessage(),
-            equalTo("On-going operations in progress while checking index shard " + indexShard.shardId() + " before closing"));
+            equalTo("Index shard " + indexShard.shardId() + " is not blocking all operations during closing"));
         verify(indexShard, times(0)).flush(any(FlushRequest.class));
     }
 

+ 6 - 1
server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java

@@ -58,6 +58,7 @@ import java.nio.charset.Charset;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Collections.emptyList;
 import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
@@ -118,13 +119,17 @@ public class TransportResyncReplicationActionTests extends ESTestCase {
                 final String allocationId = primaryShardRouting.allocationId().getId();
                 final long primaryTerm = indexMetaData.primaryTerm(shardId.id());
 
+                final AtomicInteger acquiredPermits = new AtomicInteger();
                 final IndexShard indexShard = mock(IndexShard.class);
                 when(indexShard.shardId()).thenReturn(shardId);
                 when(indexShard.routingEntry()).thenReturn(primaryShardRouting);
                 when(indexShard.getPendingPrimaryTerm()).thenReturn(primaryTerm);
+                when(indexShard.getOperationPrimaryTerm()).thenReturn(primaryTerm);
+                when(indexShard.getActiveOperationsCount()).then(i -> acquiredPermits.get());
                 doAnswer(invocation -> {
                     ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[0];
-                    callback.onResponse(() -> logger.trace("released"));
+                    acquiredPermits.incrementAndGet();
+                    callback.onResponse(acquiredPermits::decrementAndGet);
                     return null;
                 }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject());
                 when(indexShard.getReplicationGroup()).thenReturn(

+ 14 - 8
server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

@@ -87,6 +87,7 @@ import org.elasticsearch.transport.nio.MockNioTransport;
 import org.hamcrest.Matcher;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 
@@ -678,16 +679,17 @@ public class TransportReplicationActionTests extends ESTestCase {
         };
         TestAction.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable);
         final Request request = new Request(NO_SHARD_ID);
-        primary.perform(request, ActionTestUtils.assertNoFailureListener(r -> {
-            final ElasticsearchException exception = new ElasticsearchException("testing");
-            primary.failShard("test", exception);
+        shard.runUnderPrimaryPermit(() ->
+            primary.perform(request, ActionTestUtils.assertNoFailureListener(r -> {
+                final ElasticsearchException exception = new ElasticsearchException("testing");
+                primary.failShard("test", exception);
 
-            verify(shard).failShard("test", exception);
+                verify(shard).failShard("test", exception);
 
-            primary.close();
+                primary.close();
 
-            assertTrue(closed.get());
-        }));
+                assertTrue(closed.get());
+            })), Assert::assertNotNull, null, null);
     }
 
     public void testReplicaProxy() throws InterruptedException, ExecutionException {
@@ -775,10 +777,12 @@ public class TransportReplicationActionTests extends ESTestCase {
                 inSyncIds,
                 shardRoutingTable.getAllAllocationIds()));
         doAnswer(invocation -> {
+            count.incrementAndGet();
             //noinspection unchecked
-            ((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> {});
+            ((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(count::decrementAndGet);
             return null;
         }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject());
+        when(shard.getActiveOperationsCount()).thenAnswer(i -> count.get());
 
         final IndexService indexService = mock(IndexService.class);
         when(indexService.getShard(shard.shardId().id())).thenReturn(shard);
@@ -1286,6 +1290,8 @@ public class TransportReplicationActionTests extends ESTestCase {
             return null;
         }).when(indexShard)
             .acquireReplicaOperationPermit(anyLong(), anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject());
+        when(indexShard.getActiveOperationsCount()).thenAnswer(i -> count.get());
+
         when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
             final ClusterState state = clusterService.state();
             final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());

+ 4 - 3
server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java

@@ -316,7 +316,8 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
                 allPermitsAction.new AsyncPrimaryAction(primaryRequest, allPermitFuture, null) {
                     @Override
                     void runWithPrimaryShardReference(final TransportReplicationAction.PrimaryShardReference reference) {
-                        assertEquals("All permits must be acquired", 0, reference.indexShard.getActiveOperationsCount());
+                        assertEquals("All permits must be acquired",
+                            IndexShard.OPERATIONS_BLOCKED, reference.indexShard.getActiveOperationsCount());
                         assertSame(primary, reference.indexShard);
 
                         final ClusterState clusterState = clusterService.state();
@@ -549,13 +550,13 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
         @Override
         protected void shardOperationOnPrimary(Request shardRequest, IndexShard shard,
                 ActionListener<PrimaryResult<Request, Response>> listener) {
-            assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount());
+            assertEquals("All permits must be acquired", IndexShard.OPERATIONS_BLOCKED, shard.getActiveOperationsCount());
             super.shardOperationOnPrimary(shardRequest, shard, listener);
         }
 
         @Override
         protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception {
-            assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount());
+            assertEquals("All permits must be acquired", IndexShard.OPERATIONS_BLOCKED, shard.getActiveOperationsCount());
             return super.shardOperationOnReplica(shardRequest, shard);
         }
     }

+ 13 - 1
server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java

@@ -84,6 +84,7 @@ import org.elasticsearch.test.InternalSettingsPlugin;
 import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPoolStats;
+import org.junit.Assert;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -878,7 +879,18 @@ public class IndexShardIT extends ESSingleNodeTestCase {
         shard.refresh("test");
         assertThat(client().search(countRequest).actionGet().getHits().getTotalHits().value, equalTo(numDocs));
         assertThat(shard.getLocalCheckpoint(), equalTo(shard.seqNoStats().getMaxSeqNo()));
-        shard.resetEngineToGlobalCheckpoint();
+
+        final CountDownLatch engineResetLatch = new CountDownLatch(1);
+        shard.acquireAllPrimaryOperationsPermits(ActionListener.wrap(r -> {
+            try {
+                shard.resetEngineToGlobalCheckpoint();
+            } finally {
+                r.close();
+                engineResetLatch.countDown();
+            }
+        }, Assert::assertNotNull), TimeValue.timeValueMinutes(1L));
+        engineResetLatch.await();
+
         final long moreDocs = between(10, 20);
         for (int i = 0; i < moreDocs; i++) {
             client().prepareIndex("test", "_doc", Long.toString(i + numDocs)).setSource("{}", XContentType.JSON).get();

+ 2 - 2
server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java

@@ -523,8 +523,8 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
         future2.get().close();
         assertThat(permits.getActiveOperationsCount(), equalTo(0));
 
-        try (Releasable releasable = blockAndWait()) {
-            assertThat(permits.getActiveOperationsCount(), equalTo(0));
+        try (Releasable ignored = blockAndWait()) {
+            assertThat(permits.getActiveOperationsCount(), equalTo(IndexShard.OPERATIONS_BLOCKED));
         }
 
         PlainActionFuture<Releasable> future3 = new PlainActionFuture<>();

+ 14 - 3
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -126,6 +126,7 @@ import org.elasticsearch.test.DummyShardLock;
 import org.elasticsearch.test.FieldMaskingReader;
 import org.elasticsearch.test.VersionUtils;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.Assert;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
@@ -707,7 +708,7 @@ public class IndexShardTests extends IndexShardTestCase {
                     if (singlePermit) {
                         assertThat(indexShard.getActiveOperationsCount(), greaterThan(0));
                     } else {
-                        assertThat(indexShard.getActiveOperationsCount(), equalTo(0));
+                        assertThat(indexShard.getActiveOperationsCount(), equalTo(IndexShard.OPERATIONS_BLOCKED));
                     }
                     releasable.close();
                     super.onResponse(releasable);
@@ -757,7 +758,7 @@ public class IndexShardTests extends IndexShardTestCase {
         indexShard.acquireAllPrimaryOperationsPermits(futureAllPermits, TimeValue.timeValueSeconds(30L));
         allPermitsAcquired.await();
         assertTrue(blocked.get());
-        assertEquals(0, indexShard.getActiveOperationsCount());
+        assertEquals(IndexShard.OPERATIONS_BLOCKED, indexShard.getActiveOperationsCount());
         assertTrue("Expected no results, operations are blocked", results.asList().isEmpty());
         futures.forEach(future -> assertFalse(future.isDone()));
 
@@ -3660,7 +3661,17 @@ public class IndexShardTests extends IndexShardTestCase {
         });
         thread.start();
         latch.await();
-        shard.resetEngineToGlobalCheckpoint();
+
+        final CountDownLatch engineResetLatch = new CountDownLatch(1);
+        shard.acquireAllReplicaOperationsPermits(shard.getOperationPrimaryTerm(), globalCheckpoint, 0L, ActionListener.wrap(r -> {
+            try {
+                shard.resetEngineToGlobalCheckpoint();
+            } finally {
+                r.close();
+                engineResetLatch.countDown();
+            }
+        }, Assert::assertNotNull), TimeValue.timeValueMinutes(1L));
+        engineResetLatch.await();
         assertThat(getShardDocUIDs(shard), equalTo(docBelowGlobalCheckpoint));
         assertThat(shard.seqNoStats().getMaxSeqNo(), equalTo(globalCheckpoint));
         assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(translogStats.estimatedNumberOfOperations()));