|
@@ -24,16 +24,11 @@ import org.apache.lucene.util.Accountable;
|
|
|
import org.apache.lucene.util.IOUtils;
|
|
|
import org.elasticsearch.ElasticsearchException;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|
|
-import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
|
|
import org.elasticsearch.common.Nullable;
|
|
|
import org.elasticsearch.common.Strings;
|
|
|
-import org.elasticsearch.common.inject.CreationException;
|
|
|
import org.elasticsearch.common.inject.Inject;
|
|
|
import org.elasticsearch.common.inject.Injector;
|
|
|
-import org.elasticsearch.common.inject.Injectors;
|
|
|
-import org.elasticsearch.common.inject.Module;
|
|
|
-import org.elasticsearch.common.inject.ModulesBuilder;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.env.NodeEnvironment;
|
|
|
import org.elasticsearch.env.ShardLock;
|
|
@@ -49,15 +44,10 @@ import org.elasticsearch.index.mapper.MapperService;
|
|
|
import org.elasticsearch.index.query.IndexQueryParserService;
|
|
|
import org.elasticsearch.index.settings.IndexSettings;
|
|
|
import org.elasticsearch.index.settings.IndexSettingsService;
|
|
|
-import org.elasticsearch.index.shard.IndexShard;
|
|
|
-import org.elasticsearch.index.shard.IndexShardModule;
|
|
|
-import org.elasticsearch.index.shard.ShardId;
|
|
|
-import org.elasticsearch.index.shard.ShardNotFoundException;
|
|
|
-import org.elasticsearch.index.shard.ShardPath;
|
|
|
+import org.elasticsearch.index.shard.*;
|
|
|
import org.elasticsearch.index.similarity.SimilarityService;
|
|
|
import org.elasticsearch.index.store.IndexStore;
|
|
|
import org.elasticsearch.index.store.Store;
|
|
|
-import org.elasticsearch.index.store.StoreModule;
|
|
|
import org.elasticsearch.indices.IndicesLifecycle;
|
|
|
import org.elasticsearch.indices.IndicesService;
|
|
|
import org.elasticsearch.indices.InternalIndicesLifecycle;
|
|
@@ -110,25 +100,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|
|
private final NodeEnvironment nodeEnv;
|
|
|
private final IndicesService indicesServices;
|
|
|
|
|
|
- private volatile ImmutableMap<Integer, IndexShardInjectorPair> shards = ImmutableMap.of();
|
|
|
+ private volatile ImmutableMap<Integer, IndexShard> shards = ImmutableMap.of();
|
|
|
|
|
|
- private static class IndexShardInjectorPair {
|
|
|
- private final IndexShard indexShard;
|
|
|
- private final Injector injector;
|
|
|
-
|
|
|
- public IndexShardInjectorPair(IndexShard indexShard, Injector injector) {
|
|
|
- this.indexShard = indexShard;
|
|
|
- this.injector = injector;
|
|
|
- }
|
|
|
-
|
|
|
- public IndexShard getIndexShard() {
|
|
|
- return indexShard;
|
|
|
- }
|
|
|
-
|
|
|
- public Injector getInjector() {
|
|
|
- return injector;
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
private final AtomicBoolean closed = new AtomicBoolean(false);
|
|
|
private final AtomicBoolean deleted = new AtomicBoolean(false);
|
|
@@ -173,7 +146,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|
|
|
|
|
@Override
|
|
|
public Iterator<IndexShard> iterator() {
|
|
|
- return shards.values().stream().map((p) -> p.getIndexShard()).iterator();
|
|
|
+ return shards.values().iterator();
|
|
|
}
|
|
|
|
|
|
public boolean hasShard(int shardId) {
|
|
@@ -185,11 +158,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|
|
*/
|
|
|
@Nullable
|
|
|
public IndexShard shard(int shardId) {
|
|
|
- IndexShardInjectorPair indexShardInjectorPair = shards.get(shardId);
|
|
|
- if (indexShardInjectorPair != null) {
|
|
|
- return indexShardInjectorPair.getIndexShard();
|
|
|
- }
|
|
|
- return null;
|
|
|
+ return shards.get(shardId);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -261,16 +230,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Return the shard injector for the provided id, or throw an exception if there is no such shard.
|
|
|
- */
|
|
|
- public Injector shardInjectorSafe(int shardId) {
|
|
|
- IndexShardInjectorPair indexShardInjectorPair = shards.get(shardId);
|
|
|
- if (indexShardInjectorPair == null) {
|
|
|
- throw new ShardNotFoundException(new ShardId(index, shardId));
|
|
|
- }
|
|
|
- return indexShardInjectorPair.getInjector();
|
|
|
- }
|
|
|
|
|
|
public String indexUUID() {
|
|
|
return indexSettings.get(IndexMetaData.SETTING_INDEX_UUID, IndexMetaData.INDEX_UUID_NA_VALUE);
|
|
@@ -304,7 +263,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|
|
final ShardId shardId = new ShardId(index, sShardId);
|
|
|
ShardLock lock = null;
|
|
|
boolean success = false;
|
|
|
- Injector shardInjector = null;
|
|
|
+ Store store = null;
|
|
|
+ IndexShard indexShard = null;
|
|
|
try {
|
|
|
lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5));
|
|
|
indicesLifecycle.beforeIndexShardCreated(shardId, indexSettings);
|
|
@@ -351,38 +311,18 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|
|
// if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary.
|
|
|
final boolean canDeleteShardContent = IndexMetaData.isOnSharedFilesystem(indexSettings) == false ||
|
|
|
(primary && IndexMetaData.isOnSharedFilesystem(indexSettings));
|
|
|
- ModulesBuilder modules = new ModulesBuilder();
|
|
|
- // plugin modules must be added here, before others or we can get crazy injection errors...
|
|
|
- for (Module pluginModule : pluginsService.shardModules(indexSettings)) {
|
|
|
- modules.add(pluginModule);
|
|
|
- }
|
|
|
- modules.add(new IndexShardModule(shardId, primary, indexSettings));
|
|
|
- modules.add(new StoreModule(injector.getInstance(IndexStore.class).shardDirectory(), lock,
|
|
|
- new StoreCloseListener(shardId, canDeleteShardContent, new Closeable() {
|
|
|
- @Override
|
|
|
- public void close() throws IOException {
|
|
|
- injector.getInstance(IndicesQueryCache.class).onClose(shardId);
|
|
|
- }
|
|
|
- }), path));
|
|
|
- pluginsService.processModules(modules);
|
|
|
-
|
|
|
- try {
|
|
|
- shardInjector = modules.createChildInjector(injector);
|
|
|
- } catch (CreationException e) {
|
|
|
- ElasticsearchException ex = new ElasticsearchException("failed to create shard", Injectors.getFirstErrorFailure(e));
|
|
|
- ex.setShard(shardId);
|
|
|
- throw ex;
|
|
|
- } catch (Throwable e) {
|
|
|
- ElasticsearchException ex = new ElasticsearchException("failed to create shard", e);
|
|
|
- ex.setShard(shardId);
|
|
|
- throw ex;
|
|
|
+ IndexStore indexStore = injector.getInstance(IndexStore.class);
|
|
|
+ store = new Store(shardId, indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> injector.getInstance(IndicesQueryCache.class).onClose(shardId)));
|
|
|
+ if (primary && IndexMetaData.isIndexUsingShadowReplicas(indexSettings)) {
|
|
|
+ indexShard = new ShadowIndexShard(shardId, indexSettings, path, store, injector.getInstance(IndexServicesProvider.class));
|
|
|
+ } else {
|
|
|
+ indexShard = new IndexShard(shardId, indexSettings, path, store, injector.getInstance(IndexServicesProvider.class));
|
|
|
}
|
|
|
|
|
|
- IndexShard indexShard = shardInjector.getInstance(IndexShard.class);
|
|
|
indicesLifecycle.indexShardStateChanged(indexShard, null, "shard created");
|
|
|
indicesLifecycle.afterIndexShardCreated(indexShard);
|
|
|
|
|
|
- shards = newMapBuilder(shards).put(shardId.id(), new IndexShardInjectorPair(indexShard, shardInjector)).immutableMap();
|
|
|
+ shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
|
|
|
settingsService.addListener(indexShard);
|
|
|
success = true;
|
|
|
return indexShard;
|
|
@@ -393,10 +333,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|
|
} finally {
|
|
|
if (success == false) {
|
|
|
IOUtils.closeWhileHandlingException(lock);
|
|
|
- if (shardInjector != null) {
|
|
|
- IndexShard indexShard = shardInjector.getInstance(IndexShard.class);
|
|
|
- closeShardInjector("initialization failed", shardId, shardInjector, indexShard);
|
|
|
- }
|
|
|
+ closeShard("initialization failed", shardId, indexShard, store);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -409,29 +346,19 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|
|
return;
|
|
|
}
|
|
|
logger.debug("[{}] closing... (reason: [{}])", shardId, reason);
|
|
|
- HashMap<Integer, IndexShardInjectorPair> tmpShardsMap = new HashMap<>(shards);
|
|
|
- IndexShardInjectorPair indexShardInjectorPair = tmpShardsMap.remove(shardId);
|
|
|
- indexShard = indexShardInjectorPair.getIndexShard();
|
|
|
- shardInjector = indexShardInjectorPair.getInjector();
|
|
|
+ HashMap<Integer, IndexShard> tmpShardsMap = new HashMap<>(shards);
|
|
|
+ indexShard = tmpShardsMap.remove(shardId);
|
|
|
shards = ImmutableMap.copyOf(tmpShardsMap);
|
|
|
- closeShardInjector(reason, sId, shardInjector, indexShard);
|
|
|
+ closeShard(reason, sId, indexShard, indexShard.store());
|
|
|
logger.debug("[{}] closed (reason: [{}])", shardId, reason);
|
|
|
}
|
|
|
|
|
|
- private void closeShardInjector(String reason, ShardId sId, Injector shardInjector, IndexShard indexShard) {
|
|
|
+ private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store store) {
|
|
|
final int shardId = sId.id();
|
|
|
try {
|
|
|
try {
|
|
|
indicesLifecycle.beforeIndexShardClosed(sId, indexShard, indexSettings);
|
|
|
} finally {
|
|
|
- // close everything else even if the beforeIndexShardClosed threw an exception
|
|
|
- for (Class<? extends Closeable> closeable : pluginsService.shardServices()) {
|
|
|
- try {
|
|
|
- shardInjector.getInstance(closeable).close();
|
|
|
- } catch (Throwable e) {
|
|
|
- logger.debug("[{}] failed to clean plugin shard service [{}]", e, shardId, closeable);
|
|
|
- }
|
|
|
- }
|
|
|
// this logic is tricky, we want to close the engine so we rollback the changes done to it
|
|
|
// and close the shard so no operations are allowed to it
|
|
|
if (indexShard != null) {
|
|
@@ -449,7 +376,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|
|
}
|
|
|
} finally {
|
|
|
try {
|
|
|
- shardInjector.getInstance(Store.class).close();
|
|
|
+ store.close();
|
|
|
} catch (Throwable e) {
|
|
|
logger.warn("[{}] failed to close store on shard removal (reason: [{}])", e, shardId, reason);
|
|
|
}
|