浏览代码

Apply fix for LUCENE-5330 pruning the IndexWriter queue to get rid of pending event

Closes #4093
Simon Willnauer 12 年之前
父节点
当前提交
bb777a2dfe

+ 75 - 0
src/main/java/org/apache/lucene/index/XIndexWriter.java

@@ -0,0 +1,75 @@
+/*
+ * Licensed to ElasticSearch and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. ElasticSearch licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.lucene.index;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.Version;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+
+public final class XIndexWriter extends IndexWriter {
+
+    private static final Method processEvents;
+
+
+    static {
+        // fix for https://issues.apache.org/jira/browse/LUCENE-5330
+        assert Version.LUCENE_45.onOrAfter(org.elasticsearch.Version.CURRENT.luceneVersion) : "This should be fixed in LUCENE-4.6";
+        try {
+            processEvents = IndexWriter.class.getDeclaredMethod("processEvents", boolean.class, boolean.class);
+            processEvents.setAccessible(true);
+        } catch (NoSuchMethodException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public XIndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
+        super(d, conf);
+    }
+
+    private void processEvents() {
+        try {
+            processEvents.invoke(this, false, true);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void rollback() throws IOException {
+        super.rollback();
+        processEvents();
+    }
+
+    @Override
+    public void close(boolean waitForMerges) throws IOException {
+        super.close(waitForMerges);
+        processEvents();
+    }
+
+    @Override
+    DirectoryReader getReader(boolean applyAllDeletes) throws IOException {
+        DirectoryReader reader = super.getReader(applyAllDeletes);
+        processEvents();
+        return reader;
+    }
+
+}

+ 2 - 1
src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java

@@ -1361,7 +1361,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
                     }
                 }
             });
-            return new IndexWriter(store.directory(), config);
+            return new XIndexWriter(store.directory(), config);
         } catch (LockObtainFailedException ex) {
             boolean isLocked = IndexWriter.isLocked(store.directory());
             logger.warn("Could not lock IndexWriter isLocked [{}]", ex, isLocked);
@@ -1605,4 +1605,5 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
             return ongoingRecoveries;
         }
     }
+
 }

+ 26 - 0
src/test/java/org/elasticsearch/index/engine/robin/RobinEngineIntegrationTest.java

@@ -25,11 +25,16 @@ import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
 import org.elasticsearch.action.admin.indices.segments.ShardSegments;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.index.engine.Segment;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.test.AbstractIntegrationTest;
 import org.hamcrest.Matchers;
 import org.junit.Test;
 
 import java.util.Collection;
+import java.util.UUID;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
 
 public class RobinEngineIntegrationTest extends AbstractIntegrationTest {
 
@@ -75,4 +80,25 @@ public class RobinEngineIntegrationTest extends AbstractIntegrationTest {
         assertThat(total, Matchers.equalTo(t));
 
     }
+    @Test
+    public void test4093() {
+        assertAcked(prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder()
+                .put("index.store.type", "memory")
+                .put("index.number_of_shards", "1")
+                .put("index.number_of_replicas", "0")
+                .put("gateway.type", "none")
+                .put("http.enabled", false)
+                .put(RobinEngine.INDEX_COMPOUND_ON_FLUSH, randomBoolean())
+                .put("index.warmer.enabled", false)
+                .build()).get());
+        final int iters = between(500, 1000);
+        for (int i = 0; i < iters; i++) {
+            client().prepareIndex("test", "type1")
+                    .setSource("a", "" + i)
+                    .setRefresh(true)
+                    .execute()
+                    .actionGet();
+        }
+        assertHitCount(client().prepareCount("test").setQuery(QueryBuilders.matchAllQuery()).get(), iters);
+    }
 }