浏览代码

Force flush in ReplicaShardAllocatorSyncIdIT (#52028)

We must not flush after performing a synced-flush; otherwise, we will 
destroy sync_ids.

1. Force flush before synced flush to advance the translog_generation 
commit tag so we won't recover any translog operation in store recovery.

2. Force merge before synced flush to ensure that background merges 
won't happen. The condition IndexWriter#hasUncommittedChanges 
becomes true after merges.

Closes #51926
Nhat Nguyen 5 年之前
父节点
当前提交
e63ef39e12
共有 1 个文件被更改,包括 28 次插入11 次删除
  1. 28 11
      server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorSyncIdIT.java

+ 28 - 11
server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorSyncIdIT.java

@@ -20,7 +20,6 @@
 package org.elasticsearch.gateway;
 
 import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.util.LuceneTestCase;
 import org.elasticsearch.action.admin.indices.stats.ShardStats;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.routing.UnassignedInfo;
@@ -29,11 +28,11 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.IndexSettings;
-import org.elasticsearch.index.MergePolicyConfig;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.EngineConfig;
 import org.elasticsearch.index.engine.EngineFactory;
 import org.elasticsearch.index.engine.InternalEngine;
+import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardTestCase;
 import org.elasticsearch.index.translog.Translog;
@@ -47,6 +46,7 @@ import org.elasticsearch.test.InternalSettingsPlugin;
 import org.elasticsearch.test.InternalTestCluster;
 import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.transport.TransportService;
+import org.junit.Before;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -56,6 +56,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -71,9 +72,10 @@ import static org.hamcrest.Matchers.hasSize;
  * TODO: Remove this test in 9.0
  */
 @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
-@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/51926")
 public class ReplicaShardAllocatorSyncIdIT extends ESIntegTestCase {
 
+    private static final AtomicBoolean allowFlush = new AtomicBoolean();
+
     private static class SyncedFlushEngine extends InternalEngine {
         private volatile IndexWriter indexWriter;
 
@@ -83,15 +85,27 @@ public class ReplicaShardAllocatorSyncIdIT extends ESIntegTestCase {
 
         @Override
         protected void commitIndexWriter(IndexWriter writer, Translog translog) throws IOException {
+            if (allowFlush.get() == false) {
+                throw new AssertionError("flush is not allowed:" +
+                    "global checkpoint [" + getLastSyncedGlobalCheckpoint() + "] " +
+                    "last commit [" + getLastCommittedSegmentInfos().userData + "]");
+            }
             indexWriter = writer;
             super.commitIndexWriter(writer, translog);
         }
 
         void syncFlush(String syncId) throws IOException {
+            // make sure that we have committed translog; otherwise, we can flush after relaying translog in store recovery
+            flush(true, true);
+            // make sure that background merges won't happen; otherwise, IndexWriter#hasUncommittedChanges can become true again
+            forceMerge(false);
             assertNotNull(indexWriter);
             try (ReleasableLock ignored = writeLock.acquire()) {
-                assertFalse(indexWriter.hasUncommittedChanges());
+                assertThat(getTranslogStats().getUncommittedOperations(), equalTo(0));
                 Map<String, String> userData = new HashMap<>(getLastCommittedSegmentInfos().userData);
+                SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet());
+                assertThat(commitInfo.localCheckpoint, equalTo(getLastSyncedGlobalCheckpoint()));
+                assertThat(commitInfo.maxSeqNo, equalTo(getLastSyncedGlobalCheckpoint()));
                 userData.put(Engine.SYNC_COMMIT_ID, syncId);
                 indexWriter.setLiveCommitData(userData.entrySet());
                 indexWriter.commit();
@@ -116,6 +130,11 @@ public class ReplicaShardAllocatorSyncIdIT extends ESIntegTestCase {
         return Arrays.asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class, SyncedFlushPlugin.class);
     }
 
+    @Before
+    public void allowFlush() {
+        allowFlush.set(true);
+    }
+
     private void syncFlush(String index) throws IOException {
         String syncId = randomAlphaOfLength(10);
         final Set<String> nodes = internalCluster().nodesInclude(index);
@@ -126,6 +145,8 @@ public class ReplicaShardAllocatorSyncIdIT extends ESIntegTestCase {
                 engine.syncFlush(syncId);
             }
         }
+        // Once we have synced flushed, we do not allow regular flush as it will destroy the sync_id.
+        allowFlush.set(false);
     }
 
     public void testPreferCopyCanPerformNoopRecovery() throws Exception {
@@ -136,7 +157,6 @@ public class ReplicaShardAllocatorSyncIdIT extends ESIntegTestCase {
                 .setSettings(Settings.builder()
                     .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
                     .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
-                    .put(MergePolicyConfig.INDEX_MERGE_ENABLED, "false")
                     .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.getKey(), "1ms") // expire PRRLs quickly
                     .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")
                     .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms")
@@ -146,9 +166,8 @@ public class ReplicaShardAllocatorSyncIdIT extends ESIntegTestCase {
         ensureGreen(indexName);
         indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(100, 500))
             .mapToObj(n -> client().prepareIndex(indexName).setSource("f", "v")).collect(Collectors.toList()));
-        client().admin().indices().prepareFlush(indexName).get();
         if (randomBoolean()) {
-            client().admin().indices().prepareForceMerge(indexName).get();
+            client().admin().indices().prepareFlush(indexName).get();
         }
         ensureGlobalCheckpointAdvancedAndSynced(indexName);
         syncFlush(indexName);
@@ -195,17 +214,15 @@ public class ReplicaShardAllocatorSyncIdIT extends ESIntegTestCase {
                 .setSettings(Settings.builder()
                     .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
                     .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numOfReplicas)
-                    .put(MergePolicyConfig.INDEX_MERGE_ENABLED, "false")
                     .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), randomIntBetween(10, 100) + "kb")
                     .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.getKey(), "1ms") // expire PRRLs quickly
                     .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms")
                     .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")));
         ensureGreen(indexName);
-        indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, between(200, 500))
+        indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(200, 500))
             .mapToObj(n -> client().prepareIndex(indexName).setSource("f", "v")).collect(Collectors.toList()));
-        client().admin().indices().prepareFlush(indexName).get();
         if (randomBoolean()) {
-            client().admin().indices().prepareForceMerge(indexName).get();
+            client().admin().indices().prepareFlush(indexName).get();
         }
         ensureGlobalCheckpointAdvancedAndSynced(indexName);
         syncFlush(indexName);