Michael McCandless 9 lat temu
rodzic
commit
07e8370322

+ 3 - 2
core/src/main/java/org/elasticsearch/index/IndexModule.java

@@ -29,6 +29,7 @@ import org.elasticsearch.index.cache.query.none.NoneQueryCache;
 import org.elasticsearch.index.engine.EngineFactory;
 import org.elasticsearch.index.shard.IndexEventListener;
 import org.elasticsearch.index.shard.IndexSearcherWrapper;
+import org.elasticsearch.index.shard.IndexingOperationListener;
 import org.elasticsearch.index.similarity.BM25SimilarityProvider;
 import org.elasticsearch.index.similarity.SimilarityProvider;
 import org.elasticsearch.index.similarity.SimilarityService;
@@ -243,7 +244,7 @@ public final class IndexModule {
     }
 
     public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, MapperRegistry mapperRegistry,
-                                        IndexingMemoryController indexingMemoryController) throws IOException {
+                                        IndexingOperationListener... listeners) throws IOException {
         final IndexSettings settings = indexSettings.newWithListener(settingsConsumers);
         IndexSearcherWrapperFactory searcherWrapperFactory = indexSearcherWrapper.get() == null ? (shard) -> null : indexSearcherWrapper.get();
         IndexEventListener eventListener = freeze();
@@ -265,6 +266,6 @@ public final class IndexModule {
         final BiFunction<IndexSettings, IndicesQueryCache, QueryCache> queryCacheProvider = queryCaches.get(queryCacheType);
         final QueryCache queryCache = queryCacheProvider.apply(settings, servicesProvider.getIndicesQueryCache());
         return new IndexService(settings, environment, new SimilarityService(settings, similarities), shardStoreDeleter, analysisRegistry, engineFactory.get(),
-                servicesProvider, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry, indexingMemoryController);
+                servicesProvider, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry, listeners);
     }
 }

+ 20 - 16
core/src/main/java/org/elasticsearch/index/IndexService.java

@@ -19,6 +19,17 @@
 
 package org.elasticsearch.index;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.lucene.search.BooleanClause;
 import org.apache.lucene.search.BooleanQuery;
 import org.apache.lucene.search.Query;
@@ -51,6 +62,7 @@ import org.elasticsearch.index.query.QueryShardContext;
 import org.elasticsearch.index.shard.IndexEventListener;
 import org.elasticsearch.index.shard.IndexSearcherWrapper;
 import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.IndexingOperationListener;
 import org.elasticsearch.index.shard.ShadowIndexShard;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardNotFoundException;
@@ -60,22 +72,10 @@ import org.elasticsearch.index.store.IndexStore;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.indices.AliasFilterParsingException;
-import org.elasticsearch.indices.IndexingMemoryController;
 import org.elasticsearch.indices.InvalidAliasNameException;
 import org.elasticsearch.indices.mapper.MapperRegistry;
 import org.elasticsearch.threadpool.ThreadPool;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.unmodifiableMap;
 import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
@@ -103,7 +103,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
     private final AtomicBoolean deleted = new AtomicBoolean(false);
     private final IndexSettings indexSettings;
     private final IndexingSlowLog slowLog;
-    private final IndexingMemoryController indexingMemoryController;
+    private final IndexingOperationListener[] listeners;
 
     public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
                         SimilarityService similarityService,
@@ -116,7 +116,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
                         IndexEventListener eventListener,
                         IndexModule.IndexSearcherWrapperFactory wrapperFactory,
                         MapperRegistry mapperRegistry,
-                        IndexingMemoryController indexingMemoryController) throws IOException {
+                        IndexingOperationListener... listenersIn) throws IOException {
         super(indexSettings);
         this.indexSettings = indexSettings;
         this.analysisService = registry.build(indexSettings);
@@ -135,7 +135,11 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
         // initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
         this.searcherWrapper = wrapperFactory.newWrapper(this);
         this.slowLog = new IndexingSlowLog(indexSettings.getSettings());
-        this.indexingMemoryController = indexingMemoryController;
+
+        // Add our slowLog to the incoming IndexingOperationListeners:
+        this.listeners = new IndexingOperationListener[1+listenersIn.length];
+        this.listeners[0] = slowLog;
+        System.arraycopy(listenersIn, 0, this.listeners, 1, listenersIn.length);
     }
 
     public int numberOfShards() {
@@ -300,7 +304,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
             if (useShadowEngine(primary, indexSettings)) {
                 indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider); // no indexing listeners - shadow  engines don't index
             } else {
-                indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, slowLog, indexingMemoryController);
+                indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, listeners);
             }
 
             eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");

+ 3 - 7
core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -488,7 +488,7 @@ public class IndexShard extends AbstractIndexShardComponent {
      */
     public boolean index(Engine.Index index) {
         ensureWriteAllowed(index);
-        markLastWrite();
+        active.set(true);
         index = indexingOperationListeners.preIndex(index);
         final boolean created;
         try {
@@ -532,7 +532,7 @@ public class IndexShard extends AbstractIndexShardComponent {
 
     public void delete(Engine.Delete delete) {
         ensureWriteAllowed(delete);
-        markLastWrite();
+        active.set(true);
         delete = indexingOperationListeners.preDelete(delete);
         try {
             if (logger.isTraceEnabled()) {
@@ -974,11 +974,6 @@ public class IndexShard extends AbstractIndexShardComponent {
         }
     }
 
-    /** Sets {@code active} to true if we were inactive. */
-    private void markLastWrite() {
-        active.set(true);
-    }
-
     private void ensureWriteAllowed(Engine.Operation op) throws IllegalIndexShardStateException {
         Engine.Operation.Origin origin = op.origin();
         IndexShardState state = this.state; // one time volatile read
@@ -1036,6 +1031,7 @@ public class IndexShard extends AbstractIndexShardComponent {
         }
     }
 
+    /** Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed */
     public long getIndexBufferRAMBytesUsed() {
         Engine engine = getEngineOrNull();
         if (engine == null) {

+ 13 - 10
core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java

@@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.engine.Engine;
@@ -133,12 +134,7 @@ public class IndexingMemoryController extends AbstractComponent implements Index
 
     protected ScheduledFuture<?> scheduleTask(ThreadPool threadPool) {
         // it's fine to run it on the scheduler thread, no busy work
-        if (threadPool != null) {
-            return threadPool.scheduleWithFixedDelay(statusChecker, interval);
-        } else {
-            // tests pass null for threadPool --> no periodic checking
-            return null;
-        }
+        return threadPool.scheduleWithFixedDelay(statusChecker, interval);
     }
 
     @Override
@@ -180,11 +176,16 @@ public class IndexingMemoryController extends AbstractComponent implements Index
 
     /** ask this shard to refresh, in the background, to free up heap */
     protected void writeIndexingBufferAsync(IndexShard shard) {
-        threadPool.executor(ThreadPool.Names.REFRESH).execute(new Runnable() {
+        threadPool.executor(ThreadPool.Names.REFRESH).execute(new AbstractRunnable() {
             @Override
-            public void run() {
+            public void doRun() {
                 shard.writeIndexingBuffer();
             }
+
+            @Override
+            public void onFailure(Throwable t) {
+                logger.warn("failed to write indexing buffer for shard [{}]; ignoring", t, shard.shardId());
+            }
         });
     }
 
@@ -243,7 +244,7 @@ public class IndexingMemoryController extends AbstractComponent implements Index
         /** Shard calls this on each indexing/delete op */
         public void bytesWritten(int bytes) {
             long totalBytes = bytesWrittenSinceCheck.addAndGet(bytes);
-            if (totalBytes > indexingBuffer.bytes()/30) {
+            while (totalBytes > indexingBuffer.bytes()/30) {
                 if (runLock.tryLock()) {
                     try {
                         bytesWrittenSinceCheck.addAndGet(-totalBytes);
@@ -251,10 +252,12 @@ public class IndexingMemoryController extends AbstractComponent implements Index
                         // typically smaller but can be larger in extreme cases (many unique terms).  This logic is here only as a safety against
                         // thread starvation or too infrequent checking, to ensure we are still checking periodically, in proportion to bytes
                         // processed by indexing:
-                        run();
+                        runUnlocked();
                     } finally {
                         runLock.unlock();
                     }
+                } else {
+                    break;
                 }
             }
         }

+ 9 - 9
core/src/test/java/org/elasticsearch/index/IndexModuleTests.java

@@ -138,7 +138,7 @@ public class IndexModuleTests extends ESTestCase {
         IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment));
         module.setSearcherWrapper((s) -> new Wrapper());
         module.engineFactory.set(new MockEngineFactory(AssertingDirectoryReader.class));
-        IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, null);
+        IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
         assertTrue(indexService.getSearcherWrapper() instanceof Wrapper);
         assertSame(indexService.getEngineFactory(), module.engineFactory.get());
         indexService.close("simon says", false);
@@ -151,7 +151,7 @@ public class IndexModuleTests extends ESTestCase {
         IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings);
         IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment));
         module.addIndexStore("foo_store", FooStore::new);
-        IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, null);
+        IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
         assertTrue(indexService.getIndexStore() instanceof FooStore);
         try {
             module.addIndexStore("foo_store", FooStore::new);
@@ -175,7 +175,7 @@ public class IndexModuleTests extends ESTestCase {
         Consumer<Settings> listener = (s) -> {};
         module.addIndexSettingsListener(listener);
         module.addIndexEventListener(eventListener);
-        IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, null);
+        IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
         IndexSettings x = indexService.getIndexSettings();
         assertEquals(x.getSettings().getAsMap(), indexSettings.getSettings().getAsMap());
         assertEquals(x.getIndex(), index);
@@ -205,7 +205,7 @@ public class IndexModuleTests extends ESTestCase {
         } catch (IllegalArgumentException ex) {
 
         }
-        IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, null);
+        IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
         IndexSettings x = indexService.getIndexSettings();
         assertEquals(1, x.getUpdateListeners().size());
         assertSame(x.getUpdateListeners().get(0), listener);
@@ -232,7 +232,7 @@ public class IndexModuleTests extends ESTestCase {
             }
         });
 
-        IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, null);
+        IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
         SimilarityService similarityService = indexService.similarityService();
         assertNotNull(similarityService.getSimilarity("my_similarity"));
         assertTrue(similarityService.getSimilarity("my_similarity").get() instanceof TestSimilarity);
@@ -249,7 +249,7 @@ public class IndexModuleTests extends ESTestCase {
                 .build();
         IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(new Index("foo"), indexSettings), null, new AnalysisRegistry(null, environment));
         try {
-            module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, null);
+            module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
         } catch (IllegalArgumentException ex) {
             assertEquals("Unknown Similarity type [test_similarity] for [my_similarity]", ex.getMessage());
         }
@@ -263,7 +263,7 @@ public class IndexModuleTests extends ESTestCase {
                 .build();
         IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(new Index("foo"), indexSettings), null, new AnalysisRegistry(null, environment));
         try {
-            module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, null);
+            module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
         } catch (IllegalArgumentException ex) {
             assertEquals("Similarity [my_similarity] must have an associated type", ex.getMessage());
         }
@@ -310,7 +310,7 @@ public class IndexModuleTests extends ESTestCase {
             assertEquals(e.getMessage(), "Can't register the same [query_cache] more than once for [custom]");
         }
 
-        IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, null);
+        IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
         assertTrue(indexService.cache().query() instanceof CustomQueryCache);
         indexService.close("simon says", false);
     }
@@ -320,7 +320,7 @@ public class IndexModuleTests extends ESTestCase {
                 .put("path.home", createTempDir().toString())
                 .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
         IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(new Index("foo"), indexSettings), null, new AnalysisRegistry(null, environment));
-        IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry, null);
+        IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, mapperRegistry);
         assertTrue(indexService.cache().query() instanceof IndexQueryCache);
         indexService.close("simon says", false);
     }

+ 4 - 0
core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java

@@ -350,6 +350,10 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
                 shard.writeIndexingBuffer();
             }
 
+            @Override
+            protected ScheduledFuture<?> scheduleTask(ThreadPool threadPool) {
+                return null;
+            }
         };
 
         for (int i = 0; i < 100; i++) {

+ 5 - 1
docs/reference/migration/migrate_3_0.asciidoc

@@ -194,7 +194,6 @@ Previously, there were three settings for the ping timeout: `discovery.zen.initi
 the only setting key for the ping timeout is now `discovery.zen.ping_timeout`. The default value for
 ping timeouts remains at three seconds.
 
-
 ==== Recovery settings
 
 Recovery settings deprecated in 1.x have been removed:
@@ -238,6 +237,11 @@ Please change the setting in your configuration files or in the clusterstate to
 
 The 'default' similarity has been renamed to 'classic'.
 
+==== Indexing settings
+
+`indices.memory.min_shard_index_buffer_size` and `indices.memory.max_shard_index_buffer_size` are removed since Elasticsearch now allows any one shard to any
+amount of heap as long as the total indexing buffer heap used across all shards is below the node's `indices.memory.index_buffer_size` (default: 10% of the JVM heap)
+
 [[breaking_30_mapping_changes]]
 === Mapping changes