فهرست منبع

Override merge scheduler when setting up store (#72084)

Today we create a temporary `IndexWriter` when setting up a shard
(either from empty, or from an existing store) to create a Lucene commit
from which the shard will start. This `IndexWriter` has the default
merge scheduler, `ConcurrentMergeScheduler`, which is rather expensive
to initialize.

This commit overrides it to use `NoMergeScheduler` instead, and also
renames the related methods to make it clear that the `IndexWriter` is
only for temporary use and shouldn't see any actual writes to the index
since merging is disabled.
David Turner 4 سال پیش
والد
کامیت
e5873ced4e
1فایلهای تغییر یافته به همراه16 افزوده شده و 13 حذف شده
  1. 16 13
      server/src/main/java/org/elasticsearch/index/store/Store.java

+ 16 - 13
server/src/main/java/org/elasticsearch/index/store/Store.java

@@ -22,6 +22,7 @@ import org.apache.lucene.index.IndexNotFoundException;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.NoMergePolicy;
+import org.apache.lucene.index.NoMergeScheduler;
 import org.apache.lucene.index.SegmentCommitInfo;
 import org.apache.lucene.index.SegmentInfos;
 import org.apache.lucene.store.AlreadyClosedException;
@@ -1391,7 +1392,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
     public void createEmpty() throws IOException {
         Version luceneVersion = indexSettings.getIndexVersionCreated().luceneVersion;
         metadataLock.writeLock().lock();
-        try (IndexWriter writer = newEmptyIndexWriter(directory, luceneVersion)) {
+        try (IndexWriter writer = newTemporaryEmptyIndexWriter(directory, luceneVersion)) {
             final Map<String, String> map = new HashMap<>();
             map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
             map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED));
@@ -1429,7 +1430,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
      */
     public void bootstrapNewHistory(long localCheckpoint, long maxSeqNo) throws IOException {
         metadataLock.writeLock().lock();
-        try (IndexWriter writer = newAppendingIndexWriter(directory, null)) {
+        try (IndexWriter writer = newTemporaryAppendingIndexWriter(directory, null)) {
             final Map<String, String> map = new HashMap<>();
             map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
             map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
@@ -1447,7 +1448,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
      */
     public void associateIndexWithNewTranslog(final String translogUUID) throws IOException {
         metadataLock.writeLock().lock();
-        try (IndexWriter writer = newAppendingIndexWriter(directory, null)) {
+        try (IndexWriter writer = newTemporaryAppendingIndexWriter(directory, null)) {
             if (translogUUID.equals(getUserData(writer).get(Translog.TRANSLOG_UUID_KEY))) {
                 throw new IllegalArgumentException("a new translog uuid can't be equal to existing one. got [" + translogUUID + "]");
             }
@@ -1485,7 +1486,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
             final long lastSyncedGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
             final IndexCommit startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
             if (startingIndexCommit.equals(lastIndexCommit) == false) {
-                try (IndexWriter writer = newAppendingIndexWriter(directory, startingIndexCommit)) {
+                try (IndexWriter writer = newTemporaryAppendingIndexWriter(directory, startingIndexCommit)) {
                     // this achieves two things:
                     // - by committing a new commit based on the starting commit, it make sure the starting commit will be opened
                     // - deletes any other commit (by lucene standard deletion policy)
@@ -1533,27 +1534,29 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
         return userData;
     }
 
-    private static IndexWriter newAppendingIndexWriter(final Directory dir, final IndexCommit commit) throws IOException {
-        IndexWriterConfig iwc = newIndexWriterConfig()
+    private static IndexWriter newTemporaryAppendingIndexWriter(final Directory dir, final IndexCommit commit) throws IOException {
+        IndexWriterConfig iwc = newTemporaryIndexWriterConfig()
             .setIndexCommit(commit)
             .setOpenMode(IndexWriterConfig.OpenMode.APPEND);
         return new IndexWriter(dir, iwc);
     }
 
-    private static IndexWriter newEmptyIndexWriter(final Directory dir, final Version luceneVersion) throws IOException {
-        IndexWriterConfig iwc = newIndexWriterConfig()
+    private static IndexWriter newTemporaryEmptyIndexWriter(final Directory dir, final Version luceneVersion) throws IOException {
+        IndexWriterConfig iwc = newTemporaryIndexWriterConfig()
             .setOpenMode(IndexWriterConfig.OpenMode.CREATE)
             .setIndexCreatedVersionMajor(luceneVersion.major);
         return new IndexWriter(dir, iwc);
     }
 
-    private static IndexWriterConfig newIndexWriterConfig() {
+    private static IndexWriterConfig newTemporaryIndexWriterConfig() {
         return new IndexWriterConfig(null)
                 .setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
                 .setCommitOnClose(false)
-                // we don't want merges to happen here - we call maybe merge on the engine
-                // later once we stared it up otherwise we would need to wait for it here
-                // we also don't specify a codec here and merges should use the engines for this index
-                .setMergePolicy(NoMergePolicy.INSTANCE);
+                // this config is only used for temporary IndexWriter instances, used to initialize the index or update the commit data,
+                // so we don't want any merges to happen
+                .setMergePolicy(NoMergePolicy.INSTANCE)
+                // also override the default of ConcurrentMergeScheduler which calls IOUtils#spins on all mount points, which is problematic
+                // if the system has an otherwise-irrelevant mount point that's not responding
+                .setMergeScheduler(NoMergeScheduler.INSTANCE);
     }
 }