|
@@ -24,12 +24,10 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
|
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
|
|
import org.elasticsearch.index.engine.Engine;
|
|
|
-import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
|
|
|
import org.elasticsearch.index.mapper.DocumentMapperForType;
|
|
|
import org.elasticsearch.index.mapper.MapperException;
|
|
|
import org.elasticsearch.index.mapper.MapperService;
|
|
|
import org.elasticsearch.index.mapper.Mapping;
|
|
|
-import org.elasticsearch.index.mapper.Uid;
|
|
|
import org.elasticsearch.index.translog.Translog;
|
|
|
import org.elasticsearch.rest.RestStatus;
|
|
|
|
|
@@ -149,59 +147,39 @@ public class TranslogRecoveryPerformer {
|
|
|
* is encountered.
|
|
|
*/
|
|
|
private void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates, Engine.Operation.Origin origin) throws IOException {
|
|
|
-
|
|
|
- try {
|
|
|
- switch (operation.opType()) {
|
|
|
- case INDEX:
|
|
|
- Translog.Index index = (Translog.Index) operation;
|
|
|
- // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
|
|
|
- // autoGeneratedID docs that are coming from the primary are updated correctly.
|
|
|
- Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()),
|
|
|
- source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source()))
|
|
|
- .routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(),
|
|
|
- index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, index.getAutoGeneratedIdTimestamp(), true);
|
|
|
- maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates);
|
|
|
- logger.trace("[translog] recover [index] op [({}, {})] of [{}][{}]", index.seqNo(), index.primaryTerm(), index.type(), index.id());
|
|
|
- index(engine, engineIndex);
|
|
|
- break;
|
|
|
- case DELETE:
|
|
|
- Translog.Delete delete = (Translog.Delete) operation;
|
|
|
- logger.trace("[translog] recover [delete] op [({}, {})] of [{}][{}]", delete.seqNo(), delete.primaryTerm(), delete.type(), delete.id());
|
|
|
- final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(),
|
|
|
- delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(),
|
|
|
- origin, System.nanoTime());
|
|
|
- delete(engine, engineDelete);
|
|
|
- break;
|
|
|
- case NO_OP:
|
|
|
- final Translog.NoOp noOp = (Translog.NoOp) operation;
|
|
|
- final long seqNo = noOp.seqNo();
|
|
|
- final long primaryTerm = noOp.primaryTerm();
|
|
|
- final String reason = noOp.reason();
|
|
|
- logger.trace("[translog] recover [no_op] op [({}, {})] of [{}]", seqNo, primaryTerm, reason);
|
|
|
- final Engine.NoOp engineNoOp =
|
|
|
- new Engine.NoOp(seqNo, primaryTerm, origin, System.nanoTime(), reason);
|
|
|
- noOp(engine, engineNoOp);
|
|
|
- break;
|
|
|
- default:
|
|
|
- throw new IllegalStateException("No operation defined for [" + operation + "]");
|
|
|
- }
|
|
|
- } catch (ElasticsearchException e) {
|
|
|
- boolean hasIgnoreOnRecoveryException = false;
|
|
|
- ElasticsearchException current = e;
|
|
|
- while (true) {
|
|
|
- if (current instanceof IgnoreOnRecoveryEngineException) {
|
|
|
- hasIgnoreOnRecoveryException = true;
|
|
|
- break;
|
|
|
- }
|
|
|
- if (current.getCause() instanceof ElasticsearchException) {
|
|
|
- current = (ElasticsearchException) current.getCause();
|
|
|
- } else {
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- if (!hasIgnoreOnRecoveryException) {
|
|
|
- throw e;
|
|
|
- }
|
|
|
+ switch (operation.opType()) {
|
|
|
+ case INDEX:
|
|
|
+ Translog.Index index = (Translog.Index) operation;
|
|
|
+ // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
|
|
|
+ // autoGeneratedID docs that are coming from the primary are updated correctly.
|
|
|
+ Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()),
|
|
|
+ source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source()))
|
|
|
+ .routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(),
|
|
|
+ index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, index.getAutoGeneratedIdTimestamp(), true);
|
|
|
+ maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates);
|
|
|
+ logger.trace("[translog] recover [index] op [({}, {})] of [{}][{}]", index.seqNo(), index.primaryTerm(), index.type(), index.id());
|
|
|
+ index(engine, engineIndex);
|
|
|
+ break;
|
|
|
+ case DELETE:
|
|
|
+ Translog.Delete delete = (Translog.Delete) operation;
|
|
|
+ logger.trace("[translog] recover [delete] op [({}, {})] of [{}][{}]", delete.seqNo(), delete.primaryTerm(), delete.type(), delete.id());
|
|
|
+ final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(),
|
|
|
+ delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(),
|
|
|
+ origin, System.nanoTime());
|
|
|
+ delete(engine, engineDelete);
|
|
|
+ break;
|
|
|
+ case NO_OP:
|
|
|
+ final Translog.NoOp noOp = (Translog.NoOp) operation;
|
|
|
+ final long seqNo = noOp.seqNo();
|
|
|
+ final long primaryTerm = noOp.primaryTerm();
|
|
|
+ final String reason = noOp.reason();
|
|
|
+ logger.trace("[translog] recover [no_op] op [({}, {})] of [{}]", seqNo, primaryTerm, reason);
|
|
|
+ final Engine.NoOp engineNoOp =
|
|
|
+ new Engine.NoOp(seqNo, primaryTerm, origin, System.nanoTime(), reason);
|
|
|
+ noOp(engine, engineNoOp);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ throw new IllegalStateException("No operation defined for [" + operation + "]");
|
|
|
}
|
|
|
operationProcessed();
|
|
|
}
|