Sfoglia il codice sorgente

Close translog view after primary-replica resync (#25862)

The translog view was being closed too early, possibly causing a failed resync. Note: The bug only affects unreleased code.

Relates to #24841
Yannick Welsch 8 anni fa
parent
commit
020ba41c5d

+ 1 - 2
core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -49,7 +49,6 @@ import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.common.Booleans;
-import org.elasticsearch.common.CheckedBiConsumer;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -360,7 +359,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     @Override
     public void updateShardState(final ShardRouting newRouting,
                                  final long newPrimaryTerm,
-                                 final CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer,
+                                 final BiConsumer<IndexShard, ActionListener<ResyncTask>> primaryReplicaSyncer,
                                  final long applyingClusterStateVersion,
                                  final Set<String> inSyncAllocationIds,
                                  final IndexShardRoutingTable routingTable,

+ 32 - 6
core/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java

@@ -78,8 +78,30 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
         this.chunkSize = chunkSize;
     }
 
-    public void resync(IndexShard indexShard, ActionListener<ResyncTask> listener) throws IOException {
-        try (Translog.View view = indexShard.acquireTranslogView()) {
+    public void resync(IndexShard indexShard, ActionListener<ResyncTask> listener) {
+        final Translog.View view = indexShard.acquireTranslogView();
+        ActionListener<ResyncTask> wrappedListener = new ActionListener<ResyncTask>() {
+            @Override
+            public void onResponse(ResyncTask resyncTask) {
+                try {
+                    view.close();
+                } catch (IOException e) {
+                    onFailure(e);
+                }
+                listener.onResponse(resyncTask);
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                try {
+                    view.close();
+                } catch (IOException inner) {
+                    e.addSuppressed(inner);
+                }
+                listener.onFailure(e);
+            }
+        };
+        try {
             final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
             Translog.Snapshot snapshot = view.snapshot(startingSeqNo);
             ShardId shardId = indexShard.shardId();
@@ -96,16 +118,20 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
 
                 @Override
                 public synchronized Translog.Operation next() throws IOException {
-                    if (indexShard.state() != IndexShardState.STARTED) {
-                        assert indexShard.state() != IndexShardState.RELOCATED : "resync should never happen on a relocated shard";
-                        throw new IndexShardNotStartedException(shardId, indexShard.state());
+                    IndexShardState state = indexShard.state();
+                    if (state == IndexShardState.CLOSED) {
+                        throw new IndexShardClosedException(shardId);
+                    } else {
+                        assert state == IndexShardState.STARTED : "resync should only happen on a started shard, but state was: " + state;
                     }
                     return snapshot.next();
                 }
             };
 
             resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot,
-                startingSeqNo, listener);
+                startingSeqNo, wrappedListener);
+        } catch (Exception e) {
+            wrappedListener.onFailure(e);
         }
     }
 

+ 2 - 2
core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

@@ -41,7 +41,6 @@ import org.elasticsearch.cluster.routing.RoutingNode;
 import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.common.CheckedBiConsumer;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.inject.Inject;
@@ -87,6 +86,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -746,7 +746,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
          */
         void updateShardState(ShardRouting shardRouting,
                               long primaryTerm,
-                              CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer,
+                              BiConsumer<IndexShard, ActionListener<ResyncTask>> primaryReplicaSyncer,
                               long applyingClusterStateVersion,
                               Set<String> inSyncAllocationIds,
                               IndexShardRoutingTable routingTable,

+ 45 - 0
core/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java

@@ -18,6 +18,7 @@
  */
 package org.elasticsearch.index.shard;
 
+import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.action.resync.ResyncReplicationResponse;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@@ -33,6 +34,7 @@ import org.elasticsearch.tasks.TaskManager;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.hamcrest.Matchers.containsString;
@@ -90,6 +92,49 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
         closeShards(shard);
     }
 
+    public void testSyncerOnClosingShard() throws Exception {
+        IndexShard shard = newStartedShard(true);
+        AtomicBoolean syncActionCalled = new AtomicBoolean();
+        CountDownLatch syncCalledLatch = new CountDownLatch(1);
+        PrimaryReplicaSyncer.SyncAction syncAction =
+            (request, parentTask, allocationId, primaryTerm, listener) -> {
+                logger.info("Sending off {} operations", request.getOperations().size());
+                syncActionCalled.set(true);
+                syncCalledLatch.countDown();
+                threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse()));
+            };
+        PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, new TaskManager(Settings.EMPTY), syncAction);
+        syncer.setChunkSize(new ByteSizeValue(1)); // every document is sent off separately
+
+        int numDocs = 10;
+        for (int i = 0; i < numDocs; i++) {
+            indexDoc(shard, "test", Integer.toString(i));
+        }
+
+        String allocationId = shard.routingEntry().allocationId().getId();
+        shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId),
+            new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), Collections.emptySet());
+
+        PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>();
+        threadPool.generic().execute(() -> {
+            try {
+                syncer.resync(shard, fut);
+            } catch (AlreadyClosedException ace) {
+                fut.onFailure(ace);
+            }
+        });
+        if (randomBoolean()) {
+            syncCalledLatch.await();
+        }
+        closeShards(shard);
+        try {
+            fut.actionGet();
+            assertTrue("Sync action was not called", syncActionCalled.get());
+        } catch (AlreadyClosedException | IndexShardClosedException ignored) {
+            // ignore
+        }
+    }
+
     public void testStatusSerialization() throws IOException {
         PrimaryReplicaSyncer.ResyncTask.Status status = new PrimaryReplicaSyncer.ResyncTask.Status(randomAlphaOfLength(10),
             randomIntBetween(0, 1000), randomIntBetween(0, 1000), randomIntBetween(0, 1000));

+ 2 - 3
core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java

@@ -25,7 +25,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 import org.elasticsearch.cluster.routing.RoutingNode;
 import org.elasticsearch.cluster.routing.ShardRouting;
-import org.elasticsearch.common.CheckedBiConsumer;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
@@ -54,8 +53,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
-import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.unmodifiableMap;
@@ -345,7 +344,7 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
         @Override
         public void updateShardState(ShardRouting shardRouting,
                                      long newPrimaryTerm,
-                                     CheckedBiConsumer<IndexShard, ActionListener<ResyncTask>, IOException> primaryReplicaSyncer,
+                                     BiConsumer<IndexShard, ActionListener<ResyncTask>> primaryReplicaSyncer,
                                      long applyingClusterStateVersion,
                                      Set<String> inSyncAllocationIds,
                                      IndexShardRoutingTable routingTable,