Browse Source

Upgrade to Lucene 4.8.1

This commit upgrades to the latest Lucene 4.8.1 release including the
following bugfixes:

 * An IndexThrottle now kicks in when merges start falling behind
   limiting index threads to 1 until merges caught up. Closes #6066
 * RateLimiter now kicks in at the configured rate where previously
   the limiter was limiting at ~8MB/sec almost all the time. Closes #6018
Simon Willnauer 11 years ago
parent
commit
85a0b76dbb

+ 22 - 2
docs/reference/index-modules/merge.asciidoc

@@ -193,5 +193,25 @@ scheduler supports this setting:
 
 
 `index.merge.scheduler.max_thread_count`::
 `index.merge.scheduler.max_thread_count`::
 
 
-The maximum number of threads to perform the merge operation. Defaults to
-`Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors() / 2))`.
+The maximum number of concurrent merge threads that may run at once. Defaults
+to `1` which works best with spinning-magnets disks.  If you are using
+a good solid-state disk (SSD) instead then try setting this to `3`.
+
+[float]
+==== SerialMergeScheduler
+
+A merge scheduler that simply does each merge sequentially using the
+calling thread (blocking the operations that triggered the merge or the
+index operation). This merge scheduler has a merge thread pool that
+explicitly schedules merges, and it makes sure that merges are serial
+within a shard, yet concurrent across multiple shards.
+
+The scheduler supports the following settings:
+
+`index.merge.scheduler.max_merge_at_once`::
+
+The maximum number of merges a single merge run performs. This setting prevents
+executing unlimited amount of merges in a loop until another shards has a
+chance to get a merge thread from the pool. If this limit is reached the
+merge thread returns to the pool and continues once the the call to a single
+shards is executed. The default is `5`

+ 5 - 1
pom.xml

@@ -31,7 +31,7 @@
     </parent>
     </parent>
 
 
     <properties>
     <properties>
-        <lucene.version>4.8.0</lucene.version>
+        <lucene.version>4.8.1</lucene.version>
         <tests.jvms>auto</tests.jvms>
         <tests.jvms>auto</tests.jvms>
         <tests.shuffle>true</tests.shuffle>
         <tests.shuffle>true</tests.shuffle>
         <tests.output>onerror</tests.output>
         <tests.output>onerror</tests.output>
@@ -47,6 +47,10 @@
             <id>Codehaus Snapshots</id>
             <id>Codehaus Snapshots</id>
             <url>http://repository.codehaus.org/</url>
             <url>http://repository.codehaus.org/</url>
         </repository>
         </repository>
+        <repository>
+            <id>Apache Maven Repository</id>
+            <url>https://repository.apache.org/content/repositories/releases/</url>
+        </repository>
     </repositories>
     </repositories>
 
 
     <dependencies>
     <dependencies>

+ 7 - 1
src/main/java/org/apache/lucene/store/RateLimitedFSDirectory.java

@@ -81,6 +81,7 @@ public final class RateLimitedFSDirectory extends FilterDirectory{
         private final BufferedIndexOutput bufferedDelegate;
         private final BufferedIndexOutput bufferedDelegate;
         private final RateLimiter rateLimiter;
         private final RateLimiter rateLimiter;
         private final StoreRateLimiting.Listener rateListener;
         private final StoreRateLimiting.Listener rateListener;
+        private long bytesSinceLastRateLimit;
 
 
         RateLimitedIndexOutput(final RateLimiter rateLimiter, final StoreRateLimiting.Listener rateListener, final IndexOutput delegate) {
         RateLimitedIndexOutput(final RateLimiter rateLimiter, final StoreRateLimiting.Listener rateListener, final IndexOutput delegate) {
             super(delegate instanceof BufferedIndexOutput ? ((BufferedIndexOutput) delegate).getBufferSize() : BufferedIndexOutput.DEFAULT_BUFFER_SIZE);
             super(delegate instanceof BufferedIndexOutput ? ((BufferedIndexOutput) delegate).getBufferSize() : BufferedIndexOutput.DEFAULT_BUFFER_SIZE);
@@ -97,7 +98,12 @@ public final class RateLimitedFSDirectory extends FilterDirectory{
 
 
         @Override
         @Override
         protected void flushBuffer(byte[] b, int offset, int len) throws IOException {
         protected void flushBuffer(byte[] b, int offset, int len) throws IOException {
-            rateListener.onPause(rateLimiter.pause(len));
+            bytesSinceLastRateLimit += len;
+            if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) {
+                long pause = rateLimiter.pause(bytesSinceLastRateLimit);
+                bytesSinceLastRateLimit = 0;
+                rateListener.onPause(pause);
+            }
             if (bufferedDelegate != null) {
             if (bufferedDelegate != null) {
                 bufferedDelegate.flushBuffer(b, offset, len);
                 bufferedDelegate.flushBuffer(b, offset, len);
             } else {
             } else {

+ 102 - 17
src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java

@@ -19,7 +19,19 @@
 
 
 package org.elasticsearch.index.engine.internal;
 package org.elasticsearch.index.engine.internal;
 
 
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.lucene.index.*;
 import org.apache.lucene.index.*;
 import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
 import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.IndexSearcher;
@@ -28,6 +40,7 @@ import org.apache.lucene.search.SearcherFactory;
 import org.apache.lucene.search.SearcherManager;
 import org.apache.lucene.search.SearcherManager;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.LockObtainFailedException;
 import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.store.NoLockFactory;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.IOUtils;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchException;
@@ -39,6 +52,8 @@ import org.elasticsearch.common.collect.MapBuilder;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.lucene.HashedBytesRef;
 import org.elasticsearch.common.lucene.HashedBytesRef;
 import org.elasticsearch.common.lucene.LoggerInfoStream;
 import org.elasticsearch.common.lucene.LoggerInfoStream;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.lucene.Lucene;
@@ -75,18 +90,7 @@ import org.elasticsearch.index.translog.TranslogStreams;
 import org.elasticsearch.indices.warmer.IndicesWarmer;
 import org.elasticsearch.indices.warmer.IndicesWarmer;
 import org.elasticsearch.indices.warmer.InternalIndicesWarmer;
 import org.elasticsearch.indices.warmer.InternalIndicesWarmer;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.google.common.collect.Lists;
 
 
 /**
 /**
  *
  *
@@ -163,6 +167,8 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
 
 
     private SegmentInfos lastCommittedSegmentInfos;
     private SegmentInfos lastCommittedSegmentInfos;
 
 
+    private IndexThrottle throttle;
+
     @Inject
     @Inject
     public InternalEngine(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool,
     public InternalEngine(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool,
                           IndexSettingsService indexSettingsService, ShardIndexingService indexingService, @Nullable IndicesWarmer warmer,
                           IndexSettingsService indexSettingsService, ShardIndexingService indexingService, @Nullable IndicesWarmer warmer,
@@ -257,6 +263,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
             }
             }
             try {
             try {
                 this.indexWriter = createWriter();
                 this.indexWriter = createWriter();
+                mergeScheduler.removeListener(this.throttle);
+                this.throttle = new IndexThrottle(mergeScheduler.getMaxMerges(), logger);
+                mergeScheduler.addListener(throttle);
             } catch (IOException e) {
             } catch (IOException e) {
                 throw new EngineCreationFailureException(shardId, "failed to create engine", e);
                 throw new EngineCreationFailureException(shardId, "failed to create engine", e);
             }
             }
@@ -373,7 +382,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
             if (writer == null) {
             if (writer == null) {
                 throw new EngineClosedException(shardId, failedEngine);
                 throw new EngineClosedException(shardId, failedEngine);
             }
             }
-            innerCreate(create, writer);
+            try (Releasable r = throttle.acquireThrottle()) {
+                innerCreate(create, writer);
+            }
             dirty = true;
             dirty = true;
             possibleMergeNeeded = true;
             possibleMergeNeeded = true;
             flushNeeded = true;
             flushNeeded = true;
@@ -462,8 +473,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
             if (writer == null) {
             if (writer == null) {
                 throw new EngineClosedException(shardId, failedEngine);
                 throw new EngineClosedException(shardId, failedEngine);
             }
             }
-
-            innerIndex(index, writer);
+            try (Releasable r = throttle.acquireThrottle()) {
+                innerIndex(index, writer);
+            }
             dirty = true;
             dirty = true;
             possibleMergeNeeded = true;
             possibleMergeNeeded = true;
             flushNeeded = true;
             flushNeeded = true;
@@ -744,7 +756,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
                         // to be allocated to a different node
                         // to be allocated to a different node
                         currentIndexWriter().close(false);
                         currentIndexWriter().close(false);
                         indexWriter = createWriter();
                         indexWriter = createWriter();
-
+                        mergeScheduler.removeListener(this.throttle);
+                        this.throttle = new IndexThrottle(mergeScheduler.getMaxMerges(), this.logger);
+                        mergeScheduler.addListener(throttle);
                         // commit on a just opened writer will commit even if there are no changes done to it
                         // commit on a just opened writer will commit even if there are no changes done to it
                         // we rely on that for the commit data translog id key
                         // we rely on that for the commit data translog id key
                         if (flushNeeded || flush.force()) {
                         if (flushNeeded || flush.force()) {
@@ -1559,4 +1573,75 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
             return aBoolean != null && aBoolean.booleanValue();
             return aBoolean != null && aBoolean.booleanValue();
         }
         }
     }
     }
+
+
+    private static final class IndexThrottle implements MergeSchedulerProvider.Listener {
+
+        private static final InternalLock NOOP_LOCK = new InternalLock(new NoOpLock());
+        private final InternalLock lockReference = new InternalLock(new ReentrantLock());
+        private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
+        private final AtomicBoolean isThrottling = new AtomicBoolean();
+        private final int maxNumMerges;
+        private final ESLogger logger;
+
+        private volatile InternalLock lock = NOOP_LOCK;
+
+        public IndexThrottle(int maxNumMerges, ESLogger logger) {
+            this.maxNumMerges = maxNumMerges;
+            this.logger = logger;
+        }
+
+        public Releasable acquireThrottle() {
+            return lock.acquire();
+        }
+
+        @Override
+        public void beforeMerge(OnGoingMerge merge) {
+          if (numMergesInFlight.incrementAndGet() > maxNumMerges) {
+              if (isThrottling.getAndSet(true) == false) {
+                  logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
+              }
+              lock = lockReference;
+            }
+        }
+
+        @Override
+        public void afterMerge(OnGoingMerge merge) {
+            if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
+                if (isThrottling.getAndSet(false)) {
+                    logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
+                }
+                lock = NOOP_LOCK;
+            }
+        }
+    }
+
+    private static final class NoOpLock implements Lock {
+
+        @Override
+        public void lock() {}
+
+        @Override
+        public void lockInterruptibly() throws InterruptedException {
+        }
+
+        @Override
+        public boolean tryLock() {
+            return true;
+        }
+
+        @Override
+        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+            return true;
+        }
+
+        @Override
+        public void unlock() {
+        }
+
+        @Override
+        public Condition newCondition() {
+            throw new UnsupportedOperationException("NoOpLock can't provide a condition");
+        }
+    }
 }
 }

+ 9 - 1
src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java

@@ -73,7 +73,11 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
     @Override
     @Override
     public MergeScheduler buildMergeScheduler() {
     public MergeScheduler buildMergeScheduler() {
         CustomConcurrentMergeScheduler concurrentMergeScheduler = new CustomConcurrentMergeScheduler(logger, shardId, this);
         CustomConcurrentMergeScheduler concurrentMergeScheduler = new CustomConcurrentMergeScheduler(logger, shardId, this);
-        concurrentMergeScheduler.setMaxMergesAndThreads(maxMergeCount, maxThreadCount);
+        // nocommit but this doesn't handle SMS ... should we even expose/allow SMS?  or, if user requests SMS can we just use CMS(1,1),
+        // which would then stall if there are 2 merges in flight, and unstall once we are back to 1 or 0 merges
+        // NOTE: we pass maxMergeCount+1 here so that CMS will allow one too many merges to kick off which then allows
+        // InternalEngine.IndexThrottle to detect too-many-merges and throttle:
+        concurrentMergeScheduler.setMaxMergesAndThreads(maxMergeCount+1, maxThreadCount);
         schedulers.add(concurrentMergeScheduler);
         schedulers.add(concurrentMergeScheduler);
         return concurrentMergeScheduler;
         return concurrentMergeScheduler;
     }
     }
@@ -101,6 +105,10 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
         indexSettingsService.removeListener(applySettings);
         indexSettingsService.removeListener(applySettings);
     }
     }
 
 
+    public int getMaxMerges() {
+        return this.maxMergeCount;
+    }
+
     public static class CustomConcurrentMergeScheduler extends TrackingConcurrentMergeScheduler {
     public static class CustomConcurrentMergeScheduler extends TrackingConcurrentMergeScheduler {
 
 
         private final ShardId shardId;
         private final ShardId shardId;

+ 3 - 0
src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java

@@ -122,6 +122,9 @@ public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent
         return scheduler;
         return scheduler;
     }
     }
 
 
+    /** Maximum number of allowed running merges before index throttling kicks in. */
+    public abstract int getMaxMerges();
+
     protected abstract MergeScheduler buildMergeScheduler();
     protected abstract MergeScheduler buildMergeScheduler();
 
 
     public abstract MergeStats stats();
     public abstract MergeStats stats();

+ 15 - 5
src/main/java/org/elasticsearch/index/snapshots/blobstore/RateLimitingInputStream.java

@@ -35,6 +35,8 @@ public class RateLimitingInputStream extends InputStream {
 
 
     private final Listener listener;
     private final Listener listener;
 
 
+    private long bytesSinceLastRateLimit;
+
     public interface Listener {
     public interface Listener {
         void onPause(long nanos);
         void onPause(long nanos);
     }
     }
@@ -45,13 +47,21 @@ public class RateLimitingInputStream extends InputStream {
         this.listener = listener;
         this.listener = listener;
     }
     }
 
 
+    private void maybePause(int bytes) {
+        bytesSinceLastRateLimit += bytes;
+        if (bytesSinceLastRateLimit >= rateLimiter.getMinPauseCheckBytes()) {
+            long pause = rateLimiter.pause(bytesSinceLastRateLimit);
+            bytesSinceLastRateLimit = 0;
+            if (pause > 0) {
+                listener.onPause(pause);
+            }
+        }
+    }
+
     @Override
     @Override
     public int read() throws IOException {
     public int read() throws IOException {
         int b = delegate.read();
         int b = delegate.read();
-        long pause = rateLimiter.pause(1);
-        if (pause > 0) {
-            listener.onPause(pause);
-        }
+        maybePause(1);
         return b;
         return b;
     }
     }
 
 
@@ -64,7 +74,7 @@ public class RateLimitingInputStream extends InputStream {
     public int read(byte[] b, int off, int len) throws IOException {
     public int read(byte[] b, int off, int len) throws IOException {
         int n = delegate.read(b, off, len);
         int n = delegate.read(b, off, len);
         if (n > 0) {
         if (n > 0) {
-            listener.onPause(rateLimiter.pause(n));
+            maybePause(n);
         }
         }
         return n;
         return n;
     }
     }