Selaa lähdekoodia

[ENGINE] Implement retries for ShadowEngine creation

When using a filesystem that may have lag between an index being created
on the primary and a on the replica, creation of the ShadowEngine can
fail because there are no segments in the directory.

In these situations, we retry during engine creation to wait until an
index is present in the directory. The number wait delay is
configurable, defaulting to waiting for 5 seconds from an index to
become available.

Resolves #10637
Lee Hinman 10 vuotta sitten
vanhempi
commit
2627324ac2

+ 29 - 0
src/main/java/org/elasticsearch/common/lucene/Lucene.java

@@ -632,6 +632,35 @@ public class Lucene {
         return DirectoryReader.indexExists(directory);
     }
 
+    /**
+     * Wait for an index to exist for up to {@code timeLimitMillis}. Returns
+     * true if the index eventually exists, false if not.
+     *
+     * Will retry the directory every second for at least {@code timeLimitMillis}
+     */
+    public static final boolean waitForIndex(final Directory directory, final long timeLimitMillis)
+            throws IOException {
+        final long DELAY = 1000;
+        long waited = 0;
+        try {
+            while (true) {
+                if (waited >= timeLimitMillis) {
+                    break;
+                }
+                if (indexExists(directory)) {
+                    return true;
+                }
+                Thread.sleep(DELAY);
+                waited += DELAY;
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return false;
+        }
+        // one more try after all retries
+        return indexExists(directory);
+    }
+
     /**
      * Returns <tt>true</tt> iff the given exception or
      * one of it's causes is an instance of {@link CorruptIndexException}, 

+ 19 - 4
src/main/java/org/elasticsearch/index/engine/ShadowEngine.java

@@ -28,8 +28,10 @@ import org.apache.lucene.util.IOUtils;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ReleasableLock;
 import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
+import org.elasticsearch.index.shard.IndexShardException;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -55,6 +57,10 @@ import java.util.List;
  */
 public class ShadowEngine extends Engine {
 
+    /** how long to wait for an index to exist */
+    public final static String NONEXISTENT_INDEX_RETRY_WAIT = "index.shadow.wait_for_initial_commit";
+    public final static TimeValue DEFAULT_NONEXISTENT_INDEX_RETRY_WAIT = TimeValue.timeValueSeconds(5);
+
     private volatile SearcherManager searcherManager;
 
     private volatile SegmentInfos lastCommittedSegmentInfos;
@@ -62,15 +68,24 @@ public class ShadowEngine extends Engine {
     public ShadowEngine(EngineConfig engineConfig)  {
         super(engineConfig);
         SearcherFactory searcherFactory = new EngineSearcherFactory(engineConfig);
+        final long nonexistentRetryTime = engineConfig.getIndexSettings()
+                .getAsTime(NONEXISTENT_INDEX_RETRY_WAIT, DEFAULT_NONEXISTENT_INDEX_RETRY_WAIT)
+                .getMillis();
         try {
             DirectoryReader reader = null;
             store.incRef();
             boolean success = false;
             try {
-                reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(store.directory()), shardId);
-                this.searcherManager = new SearcherManager(reader, searcherFactory);
-                this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
-                success = true;
+                if (Lucene.waitForIndex(store.directory(), nonexistentRetryTime)) {
+                    reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(store.directory()), shardId);
+                    this.searcherManager = new SearcherManager(reader, searcherFactory);
+                    this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
+                    success = true;
+                } else {
+                    throw new IndexShardException(shardId, "failed to open a shadow engine after" +
+                            nonexistentRetryTime + "ms, " +
+                            "directory is not an index");
+                }
             } catch (Throwable e) {
                 logger.warn("failed to create new reader", e);
                 throw e;

+ 51 - 0
src/test/java/org/elasticsearch/common/lucene/LuceneTest.java

@@ -32,6 +32,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * 
@@ -48,6 +50,55 @@ public class LuceneTest extends ElasticsearchTestCase {
         assertEquals(Lucene.VERSION, Version.LATEST);
     }
 
+    public void testWaitForIndex() throws Exception {
+        final MockDirectoryWrapper dir = newMockDirectory();
+
+        final AtomicBoolean succeeded = new AtomicBoolean(false);
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        // Create a shadow Engine, which will freak out because there is no
+        // index yet
+        Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    latch.await();
+                    if (Lucene.waitForIndex(dir, 5000)) {
+                        succeeded.set(true);
+                    } else {
+                        fail("index should have eventually existed!");
+                    }
+                } catch (InterruptedException e) {
+                    // ignore interruptions
+                } catch (Exception e) {
+                    fail("should have been able to create the engine! " + e.getMessage());
+                }
+            }
+        });
+        t.start();
+
+        // count down latch
+        // now shadow engine should try to be created
+        latch.countDown();
+
+        dir.setEnableVirusScanner(false);
+        IndexWriterConfig iwc = newIndexWriterConfig();
+        iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
+        iwc.setMergePolicy(NoMergePolicy.INSTANCE);
+        iwc.setMaxBufferedDocs(2);
+        IndexWriter writer = new IndexWriter(dir, iwc);
+        Document doc = new Document();
+        doc.add(new TextField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
+        writer.addDocument(doc);
+        writer.commit();
+
+        t.join();
+
+        writer.close();
+        dir.close();
+        assertTrue("index should have eventually existed", succeeded.get());
+    }
+
     public void testCleanIndex() throws IOException {
         MockDirectoryWrapper dir = newMockDirectory();
         dir.setEnableVirusScanner(false);

+ 55 - 0
src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java

@@ -30,6 +30,7 @@ import org.apache.lucene.index.Term;
 import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.MockDirectoryWrapper;
+import org.apache.lucene.util.IOUtils;
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.common.Nullable;
@@ -73,6 +74,8 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
 import static org.hamcrest.Matchers.*;
@@ -939,4 +942,56 @@ public class ShadowEngineTests extends ElasticsearchTestCase {
         assertEquals(replicaEngine.config().getCodec().getName(), codecService.codec(codecName).getName());
         assertEquals(replicaEngine.config().getIndexConcurrency(), indexConcurrency);
     }
+
+    @Test
+    public void testShadowEngineCreationRetry() throws Exception {
+        final Path srDir = createTempDir();
+        final Store srStore = createStore(srDir);
+        Lucene.cleanLuceneIndex(srStore.directory());
+        final Translog srTranslog = createTranslogReplica();
+
+        final AtomicBoolean succeeded = new AtomicBoolean(false);
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        // Create a shadow Engine, which will freak out because there is no
+        // index yet
+        Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    latch.await();
+                } catch (InterruptedException e) {
+                    // ignore interruptions
+                }
+                try (ShadowEngine srEngine = createShadowEngine(srStore, srTranslog)) {
+                    succeeded.set(true);
+                } catch (Exception e) {
+                    fail("should have been able to create the engine!");
+                }
+            }
+        });
+        t.start();
+
+        // count down latch
+        // now shadow engine should try to be created
+        latch.countDown();
+
+        // Create an InternalEngine, which creates the index so the shadow
+        // replica will handle it correctly
+        Store pStore = createStore(srDir);
+        Translog pTranslog = createTranslog();
+        InternalEngine pEngine = createInternalEngine(pStore, pTranslog);
+
+        // create a document
+        ParseContext.Document document = testDocumentWithTextField();
+        document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
+        ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null);
+        pEngine.create(new Engine.Create(null, newUid("1"), doc));
+        pEngine.flush(true, true);
+
+        t.join();
+        assertTrue("ShadowEngine should have been able to be created", succeeded.get());
+        // (shadow engine is already shut down in the try-with-resources)
+        IOUtils.close(srTranslog, srStore, pTranslog, pEngine, pStore);
+    }
 }