Browse Source

Close Index API should force a flush if a sync is needed (#37961)

This commit changes the TransportVerifyShardBeforeCloseAction so that it issues a 
forced flush, forcing the translog and the Lucene commit to contain the same max seq 
number and global checkpoint in the case the Translog contains operations that were 
not written in the IndexWriter (like a Delete that touches a non existing doc). This way 
the assertion added in #37426 won't trip.

Related to #33888
Tanguy Leroux 6 years ago
parent
commit
460f10ce60

+ 7 - 2
server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java

@@ -18,6 +18,8 @@
  */
 package org.elasticsearch.action.admin.indices.close;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.flush.FlushRequest;
 import org.elasticsearch.action.support.ActionFilters;
@@ -50,6 +52,7 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
     TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> {
 
     public static final String NAME = CloseIndexAction.NAME + "[s]";
+    protected Logger logger = LogManager.getLogger(getClass());
 
     @Inject
     public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService,
@@ -111,8 +114,10 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
             throw new IllegalStateException("Global checkpoint [" + indexShard.getGlobalCheckpoint()
                 + "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId);
         }
-        indexShard.flush(new FlushRequest());
-        logger.debug("{} shard is ready for closing", shardId);
+
+        final boolean forced = indexShard.isSyncNeeded();
+        indexShard.flush(new FlushRequest().force(forced));
+        logger.trace("{} shard is ready for closing [forced:{}]", shardId, forced);
     }
 
     @Override

+ 30 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java

@@ -8,6 +8,7 @@ package org.elasticsearch.index.engine;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
+import org.elasticsearch.action.delete.DeleteResponse;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.action.support.IndicesOptions;
@@ -26,6 +27,7 @@ import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardTestCase;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchService;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.internal.AliasFilter;
@@ -46,6 +48,8 @@ import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDI
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
 
 public class FrozenIndexTests extends ESSingleNodeTestCase {
 
@@ -340,4 +344,30 @@ public class FrozenIndexTests extends ESSingleNodeTestCase {
         assertThat(client().admin().cluster().prepareState().get().getState().metaData().index(index).getSettingsVersion(),
             equalTo(settingsVersion + 1));
     }
+
+    public void testFreezeEmptyIndexWithTranslogOps() throws Exception {
+        final String indexName = "empty";
+        createIndex(indexName, Settings.builder()
+            .put("index.number_of_shards", 1)
+            .put("index.number_of_replicas", 0)
+            .put("index.refresh_interval", TimeValue.MINUS_ONE)
+            .build());
+
+        final long nbNoOps = randomIntBetween(1, 10);
+        for (long i = 0; i < nbNoOps; i++) {
+            final DeleteResponse deleteResponse = client().prepareDelete(indexName, "_doc", Long.toString(i)).get();
+            assertThat(deleteResponse.status(), is(RestStatus.NOT_FOUND));
+        }
+
+        final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
+        assertBusy(() -> {
+            final Index index = client().admin().cluster().prepareState().get().getState().metaData().index(indexName).getIndex();
+            final IndexService indexService = indicesService.indexService(index);
+            assertThat(indexService.hasShard(0), is(true));
+            assertThat(indexService.getShard(0).getGlobalCheckpoint(), greaterThanOrEqualTo(nbNoOps - 1L));
+        });
+
+        assertAcked(new XPackClient(client()).freeze(new TransportFreezeIndexAction.FreezeRequest(indexName)));
+        assertIndexFrozen(indexName);
+    }
 }