|
@@ -226,10 +226,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|
|
return this.store;
|
|
|
}
|
|
|
|
|
|
- public Engine engine() {
|
|
|
- return engineSafe();
|
|
|
- }
|
|
|
-
|
|
|
public Translog translog() {
|
|
|
return translog;
|
|
|
}
|
|
@@ -315,7 +311,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|
|
if (newRouting.state() == ShardRoutingState.STARTED || newRouting.state() == ShardRoutingState.RELOCATING) {
|
|
|
// we want to refresh *before* we move to internal STARTED state
|
|
|
try {
|
|
|
- engineSafe().refresh("cluster_state_started", true);
|
|
|
+ engine().refresh("cluster_state_started");
|
|
|
} catch (Throwable t) {
|
|
|
logger.debug("failed to refresh due to move to cluster wide started", t);
|
|
|
}
|
|
@@ -410,7 +406,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
logger.trace("index [{}][{}]{}", create.type(), create.id(), create.docs());
|
|
|
}
|
|
|
- engineSafe().create(create);
|
|
|
+ engine().create(create);
|
|
|
create.endTime(System.nanoTime());
|
|
|
} catch (Throwable ex) {
|
|
|
indexingService.postCreate(create, ex);
|
|
@@ -434,7 +430,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
|
|
|
}
|
|
|
- engineSafe().index(index);
|
|
|
+ engine().index(index);
|
|
|
index.endTime(System.nanoTime());
|
|
|
} catch (Throwable ex) {
|
|
|
indexingService.postIndex(index, ex);
|
|
@@ -457,7 +453,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
logger.trace("delete [{}]", delete.uid().text());
|
|
|
}
|
|
|
- engineSafe().delete(delete);
|
|
|
+ engine().delete(delete);
|
|
|
delete.endTime(System.nanoTime());
|
|
|
} catch (Throwable ex) {
|
|
|
indexingService.postDelete(delete, ex);
|
|
@@ -485,23 +481,23 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|
|
logger.trace("delete_by_query [{}]", deleteByQuery.query());
|
|
|
}
|
|
|
deleteByQuery = indexingService.preDeleteByQuery(deleteByQuery);
|
|
|
- engineSafe().delete(deleteByQuery);
|
|
|
+ engine().delete(deleteByQuery);
|
|
|
deleteByQuery.endTime(System.nanoTime());
|
|
|
indexingService.postDeleteByQuery(deleteByQuery);
|
|
|
}
|
|
|
|
|
|
public Engine.GetResult get(Engine.Get get) throws ElasticsearchException {
|
|
|
readAllowed();
|
|
|
- return engineSafe().get(get);
|
|
|
+ return engine().get(get);
|
|
|
}
|
|
|
|
|
|
- public void refresh(String source, boolean force) throws ElasticsearchException {
|
|
|
+ public void refresh(String source) throws ElasticsearchException {
|
|
|
verifyNotClosed();
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
- logger.trace("refresh with soruce: {} force: {}", source, force);
|
|
|
+ logger.trace("refresh with soruce: {}", source);
|
|
|
}
|
|
|
long time = System.nanoTime();
|
|
|
- engineSafe().refresh(source, force);
|
|
|
+ engine().refresh(source);
|
|
|
refreshMetric.inc(System.nanoTime() - time);
|
|
|
}
|
|
|
|
|
@@ -549,7 +545,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|
|
}
|
|
|
|
|
|
public SegmentsStats segmentStats() {
|
|
|
- SegmentsStats segmentsStats = engineSafe().segmentsStats();
|
|
|
+ SegmentsStats segmentsStats = engine().segmentsStats();
|
|
|
segmentsStats.addBitsetMemoryInBytes(shardBitsetFilterCache.getMemorySizeInBytes());
|
|
|
return segmentsStats;
|
|
|
}
|
|
@@ -611,7 +607,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|
|
logger.trace("flush with {}", request);
|
|
|
}
|
|
|
long time = System.nanoTime();
|
|
|
- engineSafe().flush(request.full() ? Engine.FlushType.NEW_WRITER : Engine.FlushType.COMMIT_TRANSLOG, request.force(), request.waitIfOngoing());
|
|
|
+ engine().flush(request.full() ? Engine.FlushType.NEW_WRITER : Engine.FlushType.COMMIT_TRANSLOG, request.force(), request.waitIfOngoing());
|
|
|
flushMetric.inc(System.nanoTime() - time);
|
|
|
}
|
|
|
|
|
@@ -620,7 +616,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
logger.trace("optimize with {}", optimize);
|
|
|
}
|
|
|
- engineSafe().forceMerge(optimize.flush(), optimize.waitForMerge(), optimize
|
|
|
+ engine().forceMerge(optimize.flush(), optimize.waitForMerge(), optimize
|
|
|
.maxNumSegments(), optimize.onlyExpungeDeletes(), optimize.upgrade());
|
|
|
}
|
|
|
|
|
@@ -628,7 +624,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|
|
IndexShardState state = this.state; // one time volatile read
|
|
|
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
|
|
|
if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
|
|
|
- return engineSafe().snapshotIndex();
|
|
|
+ return engine().snapshotIndex();
|
|
|
} else {
|
|
|
throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
|
|
|
}
|
|
@@ -636,12 +632,12 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|
|
|
|
|
public void recover(Engine.RecoveryHandler recoveryHandler) throws EngineException {
|
|
|
verifyStarted();
|
|
|
- engineSafe().recover(recoveryHandler);
|
|
|
+ engine().recover(recoveryHandler);
|
|
|
}
|
|
|
|
|
|
public void failShard(String reason, Throwable e) {
|
|
|
// fail the engine. This will cause this shard to also be removed from the node's index service.
|
|
|
- engineSafe().failEngine(reason, e);
|
|
|
+ engine().failEngine(reason, e);
|
|
|
}
|
|
|
|
|
|
public Engine.Searcher acquireSearcher(String source) {
|
|
@@ -650,7 +646,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|
|
|
|
|
public Engine.Searcher acquireSearcher(String source, boolean searcherForWriteOperation) {
|
|
|
readAllowed(searcherForWriteOperation);
|
|
|
- return engineSafe().acquireSearcher(source);
|
|
|
+ return engine().acquireSearcher(source);
|
|
|
}
|
|
|
|
|
|
public void close(String reason) throws IOException {
|
|
@@ -740,11 +736,11 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|
|
|
|
|
public void performRecoveryFinalization(boolean withFlush) throws ElasticsearchException {
|
|
|
if (withFlush) {
|
|
|
- engineSafe().flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
|
|
|
+ engine().flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
|
|
|
}
|
|
|
// clear unreferenced files
|
|
|
translog.clearUnreferenced();
|
|
|
- engineSafe().refresh("recovery_finalization", true);
|
|
|
+ engine().refresh("recovery_finalization");
|
|
|
synchronized (mutex) {
|
|
|
changeState(IndexShardState.POST_RECOVERY, "post recovery");
|
|
|
}
|
|
@@ -770,7 +766,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|
|
source(create.source()).type(create.type()).id(create.id())
|
|
|
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()),
|
|
|
create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true, false);
|
|
|
- engineSafe().create(engineCreate);
|
|
|
+ engine().create(engineCreate);
|
|
|
indexOperation = engineCreate;
|
|
|
break;
|
|
|
case SAVE:
|
|
@@ -778,18 +774,18 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|
|
Engine.Index engineIndex = prepareIndex(source(index.source()).type(index.type()).id(index.id())
|
|
|
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()),
|
|
|
index.version(), index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true);
|
|
|
- engineSafe().index(engineIndex);
|
|
|
+ engine().index(engineIndex);
|
|
|
indexOperation = engineIndex;
|
|
|
break;
|
|
|
case DELETE:
|
|
|
Translog.Delete delete = (Translog.Delete) operation;
|
|
|
Uid uid = Uid.createUid(delete.uid().text());
|
|
|
- engineSafe().delete(new Engine.Delete(uid.type(), uid.id(), delete.uid(), delete.version(),
|
|
|
+ engine().delete(new Engine.Delete(uid.type(), uid.id(), delete.uid(), delete.version(),
|
|
|
delete.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, System.nanoTime(), false));
|
|
|
break;
|
|
|
case DELETE_BY_QUERY:
|
|
|
Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) operation;
|
|
|
- engineSafe().delete(prepareDeleteByQuery(deleteByQuery.source(), deleteByQuery.filteringAliases(), Engine.Operation.Origin.RECOVERY, deleteByQuery.types()));
|
|
|
+ engine().delete(prepareDeleteByQuery(deleteByQuery.source(), deleteByQuery.filteringAliases(), Engine.Operation.Origin.RECOVERY, deleteByQuery.types()));
|
|
|
break;
|
|
|
default:
|
|
|
throw new ElasticsearchIllegalStateException("No operation defined for [" + operation + "]");
|
|
@@ -905,13 +901,13 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|
|
}
|
|
|
|
|
|
public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
|
|
|
- Engine engine = engineSafe();
|
|
|
+ Engine engine = engine();
|
|
|
engine.updateIndexingBufferSize(shardIndexingBufferSize);
|
|
|
translog().updateBuffer(shardTranslogBufferSize);
|
|
|
}
|
|
|
|
|
|
public void markAsInactive() {
|
|
|
- Engine engine = engineSafe();
|
|
|
+ Engine engine = engine();
|
|
|
engine.updateIndexingBufferSize(EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER);
|
|
|
translog().updateBuffer(Translog.INACTIVE_SHARD_TRANSLOG_BUFFER);
|
|
|
}
|
|
@@ -954,8 +950,8 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
try {
|
|
|
- if (engineSafe().refreshNeeded()) {
|
|
|
- refresh("schedule", false);
|
|
|
+ if (engine().refreshNeeded()) {
|
|
|
+ refresh("schedule");
|
|
|
}
|
|
|
} catch (EngineClosedException e) {
|
|
|
// we are being closed, ignore
|
|
@@ -1036,7 +1032,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private Engine engineSafe() {
|
|
|
+ public Engine engine() {
|
|
|
Engine engine = this.currentEngineReference.get();
|
|
|
if (engine == null) {
|
|
|
throw new EngineClosedException(shardId);
|
|
@@ -1059,7 +1055,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
|
- // close the engine all bets are off... don't use engineSafe() here it can throw an exception
|
|
|
+ // close the engine all bets are off... don't use engine() here it can throw an exception
|
|
|
IOUtils.closeWhileHandlingException(currentEngineReference.get());
|
|
|
}
|
|
|
}
|