浏览代码

call maybeMerge outside of writeLock to reduce work done under writeLock

kimchy 14 年之前
父节点
当前提交
e1d56ea467

+ 22 - 3
modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java

@@ -107,6 +107,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
 
     private volatile int disableFlushCounter = 0;
 
+    private final AtomicBoolean flushing = new AtomicBoolean();
+
     private final ConcurrentMap<String, VersionValue> versionMap;
 
     private final Object[] dirtyLocks;
@@ -621,6 +623,20 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
         if (disableFlushCounter > 0) {
             throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
         }
+        // don't allow for concurrent flush operations...
+        if (!flushing.compareAndSet(false, true)) {
+            throw new FlushNotAllowedEngineException(shardId, "Already flushing...");
+        }
+
+        // call maybeMerge outside of the write lock since it gets called anyhow within commit/refresh
+        // and we want not to suffer this cost within the write lock
+        // We can't do prepareCommit here, since we rely on the the segment version for the translog version
+        try {
+            indexWriter.maybeMerge();
+        } catch (Exception e) {
+            flushing.set(false);
+            throw new FlushFailedEngineException(shardId, e);
+        }
         rwl.writeLock().lock();
         try {
             if (indexWriter == null) {
@@ -658,13 +674,16 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
             }
             versionMap.clear();
             dirty = true; // force a refresh
+            // we need to do a refresh here so we sync versioning support
             refresh(new Refresh(true));
         } finally {
             rwl.writeLock().unlock();
+            flushing.set(false);
         }
-        if (flush.refresh()) {
-            refresh(new Refresh(false));
-        }
+        // we flush anyhow before...
+//        if (flush.refresh()) {
+//            refresh(new Refresh(false));
+//        }
     }
 
     @Override public void optimize(Optimize optimize) throws EngineException {

+ 19 - 0
modules/elasticsearch/src/test/java/org/elasticsearch/deps/lucene/SimpleLuceneTests.java

@@ -44,6 +44,25 @@ import static org.hamcrest.Matchers.*;
  */
 public class SimpleLuceneTests {
 
+    @Test public void testAddDocAfterPrepareCommit() throws Exception {
+        Directory dir = new RAMDirectory();
+        IndexWriter indexWriter = new IndexWriter(dir, Lucene.STANDARD_ANALYZER, true, IndexWriter.MaxFieldLength.UNLIMITED);
+        indexWriter.addDocument(doc()
+                .add(field("_id", "1")).build());
+        IndexReader reader = indexWriter.getReader();
+        assertThat(reader.numDocs(), equalTo(1));
+
+        indexWriter.prepareCommit();
+        reader = indexWriter.getReader();
+        assertThat(reader.numDocs(), equalTo(1));
+
+        indexWriter.addDocument(doc()
+                .add(field("_id", "2")).build());
+        indexWriter.commit();
+        reader = indexWriter.getReader();
+        assertThat(reader.numDocs(), equalTo(2));
+    }
+
     @Test public void testSimpleNumericOps() throws Exception {
         Directory dir = new RAMDirectory();
         IndexWriter indexWriter = new IndexWriter(dir, Lucene.STANDARD_ANALYZER, true, IndexWriter.MaxFieldLength.UNLIMITED);