Bläddra i källkod

Merge pull request #15877 from s1monw/make_indexing_memory_controller_private

Make IndexingMemoryController private to IndicesService
Simon Willnauer 9 år sedan
förälder
incheckning
6fcb45de60

+ 2 - 2
core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

@@ -35,7 +35,7 @@ import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.translog.TranslogConfig;
-import org.elasticsearch.indices.memory.IndexingMemoryController;
+import org.elasticsearch.indices.IndexingMemoryController;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.concurrent.TimeUnit;
@@ -192,7 +192,7 @@ public final class EngineConfig {
     }
 
     /**
-     * Returns the initial index buffer size. This setting is only read on startup and otherwise controlled by {@link org.elasticsearch.indices.memory.IndexingMemoryController}
+     * Returns the initial index buffer size. This setting is only read on startup and otherwise controlled by {@link IndexingMemoryController}
      */
     public ByteSizeValue getIndexingBufferSize() {
         return indexingBufferSize;

+ 1 - 1
core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -111,7 +111,7 @@ import org.elasticsearch.index.warmer.ShardIndexWarmerService;
 import org.elasticsearch.index.warmer.WarmerStats;
 import org.elasticsearch.indices.IndicesWarmer;
 import org.elasticsearch.indices.cache.query.IndicesQueryCache;
-import org.elasticsearch.indices.memory.IndexingMemoryController;
+import org.elasticsearch.indices.IndexingMemoryController;
 import org.elasticsearch.indices.recovery.RecoveryFailedException;
 import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.percolator.PercolatorService;

+ 13 - 25
core/src/main/java/org/elasticsearch/indices/memory/IndexingMemoryController.java → core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java

@@ -17,10 +17,9 @@
  * under the License.
  */
 
-package org.elasticsearch.indices.memory;
+package org.elasticsearch.indices;
 
-import org.elasticsearch.common.component.AbstractLifecycleComponent;
-import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
@@ -32,16 +31,16 @@ import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
 import org.elasticsearch.index.shard.IndexEventListener;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardState;
-import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.monitor.jvm.JvmInfo;
 import org.elasticsearch.threadpool.ThreadPool;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.ScheduledFuture;
 
-public class IndexingMemoryController extends AbstractLifecycleComponent<IndexingMemoryController> implements IndexEventListener {
+public class IndexingMemoryController extends AbstractComponent implements IndexEventListener, Closeable {
 
     /** How much heap (% or bytes) we will share across all actively indexing shards on this node (default: 10%). */
     public static final String INDEX_BUFFER_SIZE_SETTING = "indices.memory.index_buffer_size";
@@ -70,10 +69,6 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
     /** Once a shard becomes inactive, we reduce the {@code IndexWriter} buffer to this value (500 KB) to let active shards use the heap instead. */
     public static final ByteSizeValue INACTIVE_SHARD_INDEXING_BUFFER = ByteSizeValue.parseBytesSizeValue("500kb", "INACTIVE_SHARD_INDEXING_BUFFER");
 
-    /** Once a shard becomes inactive, we reduce the {@code Translog} buffer to this value (1 KB) to let active shards use the heap instead. */
-    public static final ByteSizeValue INACTIVE_SHARD_TRANSLOG_BUFFER = ByteSizeValue.parseBytesSizeValue("1kb", "INACTIVE_SHARD_TRANSLOG_BUFFER");
-
-    private final ThreadPool threadPool;
     private final IndicesService indicesService;
 
     private final ByteSizeValue indexingBuffer;
@@ -81,22 +76,20 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
     private final ByteSizeValue maxShardIndexBufferSize;
     private final TimeValue interval;
 
-    private volatile ScheduledFuture scheduler;
+    private final ScheduledFuture scheduler;
 
     private static final EnumSet<IndexShardState> CAN_UPDATE_INDEX_BUFFER_STATES = EnumSet.of(
             IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED);
 
     private final ShardsIndicesStatusChecker statusChecker;
 
-    @Inject
-    public IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) {
+    IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService) {
         this(settings, threadPool, indicesService, JvmInfo.jvmInfo().getMem().getHeapMax().bytes());
     }
 
     // for testing
-    protected IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService, long jvmMemoryInBytes) {
+    IndexingMemoryController(Settings settings, ThreadPool threadPool, IndicesService indicesService, long jvmMemoryInBytes) {
         super(settings);
-        this.threadPool = threadPool;
         this.indicesService = indicesService;
 
         ByteSizeValue indexingBuffer;
@@ -131,29 +124,24 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
                 MIN_SHARD_INDEX_BUFFER_SIZE_SETTING, this.minShardIndexBufferSize,
                 MAX_SHARD_INDEX_BUFFER_SIZE_SETTING, this.maxShardIndexBufferSize,
                 SHARD_INACTIVE_INTERVAL_TIME_SETTING, this.interval);
+        this.scheduler = scheduleTask(threadPool);
     }
 
-    @Override
-    protected void doStart() {
+    protected ScheduledFuture<?> scheduleTask(ThreadPool threadPool) {
         // it's fine to run it on the scheduler thread, no busy work
-        this.scheduler = threadPool.scheduleWithFixedDelay(statusChecker, interval);
+        return threadPool.scheduleWithFixedDelay(statusChecker, interval);
     }
 
     @Override
-    protected void doStop() {
+    public void close() {
         FutureUtils.cancel(scheduler);
-        scheduler = null;
-    }
-
-    @Override
-    protected void doClose() {
     }
 
     /**
      * returns the current budget for the total amount of indexing buffers of
      * active shards on this node
      */
-    public ByteSizeValue indexingBufferSize() {
+    ByteSizeValue indexingBufferSize() {
         return indexingBuffer;
     }
 
@@ -188,7 +176,7 @@ public class IndexingMemoryController extends AbstractLifecycleComponent<Indexin
     }
 
     /** check if any shards active status changed, now. */
-    public void forceCheck() {
+    void forceCheck() {
         statusChecker.run();
     }
 

+ 0 - 2
core/src/main/java/org/elasticsearch/indices/IndicesModule.java

@@ -111,7 +111,6 @@ import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
 import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
 import org.elasticsearch.indices.flush.SyncedFlushService;
 import org.elasticsearch.indices.mapper.MapperRegistry;
-import org.elasticsearch.indices.memory.IndexingMemoryController;
 import org.elasticsearch.indices.query.IndicesQueriesRegistry;
 import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.indices.recovery.RecoverySource;
@@ -273,7 +272,6 @@ public class IndicesModule extends AbstractModule {
         bind(RecoverySource.class).asEagerSingleton();
         bind(IndicesStore.class).asEagerSingleton();
         bind(IndicesClusterStateService.class).asEagerSingleton();
-        bind(IndexingMemoryController.class).asEagerSingleton();
         bind(SyncedFlushService.class).asEagerSingleton();
         bind(IndicesQueryCache.class).asEagerSingleton();
         bind(IndicesRequestCache.class).asEagerSingleton();

+ 6 - 3
core/src/main/java/org/elasticsearch/indices/IndicesService.java

@@ -65,6 +65,7 @@ import org.elasticsearch.index.store.IndexStoreConfig;
 import org.elasticsearch.indices.mapper.MapperRegistry;
 import org.elasticsearch.indices.query.IndicesQueriesRegistry;
 import org.elasticsearch.plugins.PluginsService;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
 import java.nio.file.Files;
@@ -105,6 +106,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
     private final OldShardsStats oldShardsStats = new OldShardsStats();
     private final IndexStoreConfig indexStoreConfig;
     private final MapperRegistry mapperRegistry;
+    private final IndexingMemoryController indexingMemoryController;
 
     @Override
     protected void doStart() {
@@ -114,7 +116,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
     public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvironment nodeEnv,
                           ClusterSettings clusterSettings, AnalysisRegistry analysisRegistry,
                           IndicesQueriesRegistry indicesQueriesRegistry, IndexNameExpressionResolver indexNameExpressionResolver,
-                          ClusterService clusterService, MapperRegistry mapperRegistry) {
+                          ClusterService clusterService, MapperRegistry mapperRegistry, ThreadPool threadPool) {
         super(settings);
         this.pluginsService = pluginsService;
         this.nodeEnv = nodeEnv;
@@ -127,7 +129,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
         this.mapperRegistry = mapperRegistry;
         clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE_SETTING, indexStoreConfig::setRateLimitingType);
         clusterSettings.addSettingsUpdateConsumer(IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, indexStoreConfig::setRateLimitingThrottle);
-
+        indexingMemoryController = new IndexingMemoryController(settings, threadPool, this);
     }
 
     @Override
@@ -161,7 +163,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
 
     @Override
     protected void doClose() {
-        IOUtils.closeWhileHandlingException(analysisRegistry);
+        IOUtils.closeWhileHandlingException(analysisRegistry, indexingMemoryController);
     }
 
     /**
@@ -291,6 +293,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
 
         final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry);
         pluginsService.onIndexModule(indexModule);
+        indexModule.addIndexEventListener(indexingMemoryController);
         for (IndexEventListener listener : builtInListeners) {
             indexModule.addIndexEventListener(listener);
         }

+ 2 - 3
core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

@@ -63,7 +63,6 @@ import org.elasticsearch.index.shard.ShardNotFoundException;
 import org.elasticsearch.index.snapshots.IndexShardRepository;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.flush.SyncedFlushService;
-import org.elasticsearch.indices.memory.IndexingMemoryController;
 import org.elasticsearch.indices.recovery.RecoveryFailedException;
 import org.elasticsearch.indices.recovery.RecoverySource;
 import org.elasticsearch.indices.recovery.RecoveryState;
@@ -130,9 +129,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
                                       NodeMappingRefreshAction nodeMappingRefreshAction,
                                       RepositoriesService repositoriesService, RestoreService restoreService,
                                       SearchService searchService, SyncedFlushService syncedFlushService,
-                                      RecoverySource recoverySource, NodeServicesProvider nodeServicesProvider, IndexingMemoryController indexingMemoryController) {
+                                      RecoverySource recoverySource, NodeServicesProvider nodeServicesProvider) {
         super(settings);
-        this.buildInIndexListener = Arrays.asList(recoverySource, recoveryTarget, searchService, syncedFlushService, indexingMemoryController);
+        this.buildInIndexListener = Arrays.asList(recoverySource, recoveryTarget, searchService, syncedFlushService);
         this.indicesService = indicesService;
         this.clusterService = clusterService;
         this.threadPool = threadPool;

+ 0 - 5
core/src/main/java/org/elasticsearch/node/Node.java

@@ -69,8 +69,6 @@ import org.elasticsearch.indices.breaker.CircuitBreakerModule;
 import org.elasticsearch.indices.cache.query.IndicesQueryCache;
 import org.elasticsearch.indices.cluster.IndicesClusterStateService;
 import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
-import org.elasticsearch.indices.memory.IndexingMemoryController;
-import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.indices.store.IndicesStore;
 import org.elasticsearch.indices.ttl.IndicesTTLService;
 import org.elasticsearch.monitor.MonitorService;
@@ -249,7 +247,6 @@ public class Node implements Releasable {
 
         injector.getInstance(MappingUpdatedAction.class).setClient(client);
         injector.getInstance(IndicesService.class).start();
-        injector.getInstance(IndexingMemoryController.class).start();
         injector.getInstance(IndicesClusterStateService.class).start();
         injector.getInstance(IndicesTTLService.class).start();
         injector.getInstance(SnapshotsService.class).start();
@@ -308,7 +305,6 @@ public class Node implements Releasable {
         // stop any changes happening as a result of cluster state changes
         injector.getInstance(IndicesClusterStateService.class).stop();
         // we close indices first, so operations won't be allowed on it
-        injector.getInstance(IndexingMemoryController.class).stop();
         injector.getInstance(IndicesTTLService.class).stop();
         injector.getInstance(RoutingService.class).stop();
         injector.getInstance(ClusterService.class).stop();
@@ -360,7 +356,6 @@ public class Node implements Releasable {
         stopWatch.stop().start("indices_cluster");
         injector.getInstance(IndicesClusterStateService.class).close();
         stopWatch.stop().start("indices");
-        injector.getInstance(IndexingMemoryController.class).close();
         injector.getInstance(IndicesTTLService.class).close();
         injector.getInstance(IndicesService.class).close();
         // close filter/fielddata caches after indices

+ 1 - 1
core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerIT.java → core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java

@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.elasticsearch.indices.memory;
+package org.elasticsearch.indices;
 
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.metadata.IndexMetaData;

+ 8 - 2
core/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java → core/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java

@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.elasticsearch.indices.memory;
+package org.elasticsearch.indices;
 
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -24,8 +24,8 @@ import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.shard.IndexShard;
-import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.test.ESSingleNodeTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -33,6 +33,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
 
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
@@ -120,6 +121,11 @@ public class IndexingMemoryControllerTests extends ESSingleNodeTestCase {
             activeShards.add(shard);
             forceCheck();
         }
+
+        @Override
+        protected ScheduledFuture<?> scheduleTask(ThreadPool threadPool) {
+            return null;
+        }
     }
 
     public void testShardAdditionAndRemoval() {