Ver Fonte

Add seq no powered optimistic locking support to the index and delete transport actions (#36619)

This commit add support for using sequence numbers to power [optimistic concurrency control](http://en.wikipedia.org/wiki/Optimistic_concurrency_control) 
in the delete and index transport actions and requests. A follow up will come with adding sequence
numbers to the update and get results.

Relates #36148 
Relates #10708
Boaz Leskes há 6 anos atrás
pai
commit
733a6d34c1
20 ficheiros alterados com 301 adições e 71 exclusões
  1. 5 4
      plugins/mapper-annotated-text/src/test/java/org/elasticsearch/index/mapper/annotatedtext/AnnotatedTextFieldMapperTests.java
  2. 14 4
      server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
  3. 4 3
      server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
  4. 51 0
      server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java
  5. 10 0
      server/src/main/java/org/elasticsearch/action/delete/DeleteRequestBuilder.java
  6. 60 0
      server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
  7. 9 0
      server/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java
  8. 33 25
      server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  9. 14 9
      server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java
  10. 3 2
      server/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java
  11. 2 1
      server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java
  12. 3 2
      server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java
  13. 13 11
      server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
  14. 2 1
      server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java
  15. 1 1
      server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java
  16. 68 0
      server/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java
  17. 1 1
      test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java
  18. 4 3
      test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
  19. 2 3
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java
  20. 2 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java

+ 5 - 4
plugins/mapper-annotated-text/src/test/java/org/elasticsearch/index/mapper/annotatedtext/AnnotatedTextFieldMapperTests.java

@@ -51,6 +51,7 @@ import org.elasticsearch.index.mapper.MapperService.MergeReason;
 import org.elasticsearch.index.mapper.ParsedDocument;
 import org.elasticsearch.index.mapper.SourceToParse;
 import org.elasticsearch.index.mapper.TextFieldMapper;
+import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.termvectors.TermVectorsService;
 import org.elasticsearch.indices.IndicesService;
@@ -130,7 +131,7 @@ public class AnnotatedTextFieldMapperTests extends ESSingleNodeTestCase {
 
         IndexShard shard = indexService.getShard(0);
         shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
-            sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
+            sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
         shard.refresh("test");
         try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
             LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader();
@@ -185,7 +186,7 @@ public class AnnotatedTextFieldMapperTests extends ESSingleNodeTestCase {
 
         IndexShard shard = indexService.getShard(0);
         shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
-            sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
+            sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
         shard.refresh("test");
         try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
             LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader();
@@ -384,7 +385,7 @@ public class AnnotatedTextFieldMapperTests extends ESSingleNodeTestCase {
 
         IndexShard shard = indexService.getShard(0);
         shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
-            sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
+            sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
         shard.refresh("test");
         try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
             LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader();
@@ -426,7 +427,7 @@ public class AnnotatedTextFieldMapperTests extends ESSingleNodeTestCase {
 
         IndexShard shard = indexService.getShard(0);
         shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
-            sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
+            sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
         shard.refresh("test");
         try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
             LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader();

+ 14 - 4
server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

@@ -44,6 +44,7 @@ import org.elasticsearch.common.xcontent.XContent;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.VersionType;
+import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
 
 import java.io.IOException;
@@ -77,6 +78,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
     private static final ParseField RETRY_ON_CONFLICT = new ParseField("retry_on_conflict");
     private static final ParseField PIPELINE = new ParseField("pipeline");
     private static final ParseField SOURCE = new ParseField("_source");
+    private static final ParseField IF_SEQ_NO_MATCH = new ParseField("if_seq_no_match");
+    private static final ParseField IF_PRIMARY_TERM_MATCH = new ParseField("if_primary_term_match");
 
     /**
      * Requests that are part of this request. It is only possible to add things that are both {@link ActionRequest}s and
@@ -347,6 +350,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
                 String opType = null;
                 long version = Versions.MATCH_ANY;
                 VersionType versionType = VersionType.INTERNAL;
+                long ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
+                long ifPrimaryTermMatch = 0;
                 int retryOnConflict = 0;
                 String pipeline = valueOrDefault(defaultPipeline, globalPipeline);
 
@@ -377,6 +382,10 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
                                 version = parser.longValue();
                             } else if (VERSION_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
                                 versionType = VersionType.fromString(parser.text());
+                            } else if (IF_SEQ_NO_MATCH.match(currentFieldName, parser.getDeprecationHandler())) {
+                                ifSeqNoMatch = parser.longValue();
+                            } else if (IF_PRIMARY_TERM_MATCH.match(currentFieldName, parser.getDeprecationHandler())) {
+                                ifPrimaryTermMatch = parser.longValue();
                             } else if (RETRY_ON_CONFLICT.match(currentFieldName, parser.getDeprecationHandler())) {
                                 retryOnConflict = parser.intValue();
                             } else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
@@ -404,7 +413,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
                 }
 
                 if ("delete".equals(action)) {
-                    add(new DeleteRequest(index, type, id).routing(routing).version(version).versionType(versionType), payload);
+                    add(new DeleteRequest(index, type, id).routing(routing)
+                        .version(version).versionType(versionType).setIfMatch(ifSeqNoMatch, ifPrimaryTermMatch), payload);
                 } else {
                     nextMarker = findNextMarker(marker, from, data, length);
                     if (nextMarker == -1) {
@@ -417,16 +427,16 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
                     if ("index".equals(action)) {
                         if (opType == null) {
                             internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType)
-                                    .setPipeline(pipeline)
+                                    .setPipeline(pipeline).ifMatch(ifSeqNoMatch, ifPrimaryTermMatch)
                                     .source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType), payload);
                         } else {
                             internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType)
-                                    .create("create".equals(opType)).setPipeline(pipeline)
+                                    .create("create".equals(opType)).setPipeline(pipeline).ifMatch(ifSeqNoMatch, ifPrimaryTermMatch)
                                     .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload);
                         }
                     } else if ("create".equals(action)) {
                         internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType)
-                                .create(true).setPipeline(pipeline)
+                                .create(true).setPipeline(pipeline).ifMatch(ifSeqNoMatch, ifPrimaryTermMatch)
                                 .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload);
                     } else if ("update".equals(action)) {
                         UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict)

+ 4 - 3
server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

@@ -19,8 +19,8 @@
 
 package org.elasticsearch.action.bulk;
 
-import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.DocWriteRequest;
@@ -460,7 +460,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
         executeOnPrimaryWhileHandlingMappingUpdates(context,
             () ->
                 primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse,
-                    request.getAutoGeneratedTimestamp(), request.isRetry()),
+                    request.ifSeqNoMatch(), request.ifPrimaryTermMatch(), request.getAutoGeneratedTimestamp(), request.isRetry()),
             e -> primary.getFailedIndexResult(e, request.version()),
             context::markOperationAsExecuted,
             mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type()));
@@ -471,7 +471,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
         final DeleteRequest request = context.getRequestToExecute();
         final IndexShard primary = context.getPrimary();
         executeOnPrimaryWhileHandlingMappingUpdates(context,
-            () -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType()),
+            () -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType(),
+                request.ifSeqNoMatch(), request.ifPrimaryTermMatch()),
             e -> primary.getFailedDeleteResult(e, request.version()),
             context::markOperationAsExecuted,
             mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type()));

+ 51 - 0
server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java

@@ -31,6 +31,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.mapper.MapperService;
+import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.ShardId;
 
 import java.io.IOException;
@@ -57,6 +58,8 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
     private String routing;
     private long version = Versions.MATCH_ANY;
     private VersionType versionType = VersionType.INTERNAL;
+    private long ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
+    private long ifPrimaryTermMatch = 0;
 
     public DeleteRequest() {
     }
@@ -112,6 +115,12 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
         if (versionType == VersionType.FORCE) {
             validationException = addValidationError("version type [force] may no longer be used", validationException);
         }
+
+        if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO && (
+            versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY
+        )) {
+            validationException = addValidationError("compare and write operations can not use versioning", validationException);
+        }
         return validationException;
     }
 
@@ -194,6 +203,32 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
         return this;
     }
 
+    public long ifSeqNoMatch() {
+        return ifSeqNoMatch;
+    }
+
+    public long ifPrimaryTermMatch() {
+        return ifPrimaryTermMatch;
+    }
+
+    public DeleteRequest setIfMatch(long seqNo, long term) {
+        if (term == 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
+            throw new IllegalArgumentException("seqNo is set, but primary term is [0]");
+        }
+        if (term != 0 && seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
+            throw new IllegalArgumentException("seqNo is unassigned, but primary term is [" + term + "]");
+        }
+        if (seqNo < 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
+            throw new IllegalArgumentException("sequence numbers must be non negative. got [" +  seqNo + "].");
+        }
+        if (term < 0) {
+            throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]");
+        }
+        ifSeqNoMatch = seqNo;
+        ifPrimaryTermMatch = term;
+        return this;
+    }
+
     @Override
     public VersionType versionType() {
         return this.versionType;
@@ -215,6 +250,13 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
         }
         version = in.readLong();
         versionType = VersionType.fromValue(in.readByte());
+        if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
+            ifSeqNoMatch = in.readZLong();
+            ifPrimaryTermMatch = in.readVLong();
+        } else {
+            ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
+            ifPrimaryTermMatch = 0;
+        }
     }
 
     @Override
@@ -228,6 +270,15 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
         }
         out.writeLong(version);
         out.writeByte(versionType.getValue());
+        if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
+            out.writeZLong(ifSeqNoMatch);
+            out.writeVLong(ifPrimaryTermMatch);
+        } else if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTermMatch != 0) {
+            assert false : "setIfMatch [" + ifSeqNoMatch + "], currentDocTem [" + ifPrimaryTermMatch + "]";
+            throw new IllegalStateException(
+                "sequence number based compare and write is not supported until all nodes are on version 7.0 or higher. " +
+                    "Stream version [" + out.getVersion() + "]");
+        }
     }
 
     @Override

+ 10 - 0
server/src/main/java/org/elasticsearch/action/delete/DeleteRequestBuilder.java

@@ -80,4 +80,14 @@ public class DeleteRequestBuilder extends ReplicationRequestBuilder<DeleteReques
         request.versionType(versionType);
         return this;
     }
+
+    /**
+     * only performs this delete request if the document was last modification was assigned the given
+     * sequence number and primary term
+     */
+    public DeleteRequestBuilder setIfMatch(long seqNo, long term) {
+        request.setIfMatch(seqNo, term);
+        return this;
+    }
+
 }

+ 60 - 0
server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

@@ -43,6 +43,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.VersionType;
+import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.ShardId;
 
 import java.io.IOException;
@@ -104,6 +105,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
     private long autoGeneratedTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP;
 
     private boolean isRetry = false;
+    private long ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
+    private long ifPrimaryTermMatch = 0;
 
 
     public IndexRequest() {
@@ -164,6 +167,12 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
                     validationException);
                 return validationException;
             }
+
+            if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTermMatch != 0) {
+                validationException = addValidationError("create operations do not support compare and set. use index instead",
+                    validationException);
+                return validationException;
+            }
         }
 
         if (opType() != OpType.INDEX && id == null) {
@@ -192,6 +201,12 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
             validationException = addValidationError("pipeline cannot be an empty string", validationException);
         }
 
+        if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO && (
+            versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY
+            )) {
+            validationException = addValidationError("compare and write operations can not use versioning", validationException);
+        }
+
         return validationException;
     }
 
@@ -471,6 +486,33 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
         return this;
     }
 
+    public IndexRequest ifMatch(long seqNo, long term) {
+        if (term == 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
+            throw new IllegalArgumentException("seqNo is set, but primary term is [0]");
+        }
+
+        if (term != 0 && seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
+            throw new IllegalArgumentException("seqNo is unassigned, but primary term is [" + term + "]");
+        }
+        if (seqNo < 0 && seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
+            throw new IllegalArgumentException("sequence numbers must be non negative. got [" +  seqNo + "].");
+        }
+        if (term < 0) {
+            throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]");
+        }
+        ifSeqNoMatch = seqNo;
+        ifPrimaryTermMatch = term;
+        return this;
+    }
+
+    public long ifSeqNoMatch() {
+        return ifSeqNoMatch;
+    }
+
+    public long ifPrimaryTermMatch() {
+        return ifPrimaryTermMatch;
+    }
+
     @Override
     public VersionType versionType() {
         return this.versionType;
@@ -492,6 +534,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
         // generate id if not already provided
         if (id == null) {
             assert autoGeneratedTimestamp == -1 : "timestamp has already been generated!";
+            assert ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO;
+            assert ifPrimaryTermMatch == 0;
             autoGeneratedTimestamp = Math.max(0, System.currentTimeMillis()); // extra paranoia
             String uid;
             if (indexCreatedVersion.onOrAfter(Version.V_6_0_0_beta1)) {
@@ -533,6 +577,13 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
         } else {
             contentType = null;
         }
+        if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
+            ifSeqNoMatch = in.readZLong();
+            ifPrimaryTermMatch = in.readVLong();
+        } else {
+            ifSeqNoMatch = SequenceNumbers.UNASSIGNED_SEQ_NO;
+            ifPrimaryTermMatch = 0;
+        }
     }
 
     @Override
@@ -564,6 +615,15 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
         } else {
             out.writeBoolean(false);
         }
+        if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
+            out.writeZLong(ifSeqNoMatch);
+            out.writeVLong(ifPrimaryTermMatch);
+        } else if (ifSeqNoMatch != SequenceNumbers.UNASSIGNED_SEQ_NO || ifPrimaryTermMatch != 0) {
+            assert false : "setIfMatch [" + ifSeqNoMatch + "], currentDocTem [" + ifPrimaryTermMatch + "]";
+            throw new IllegalStateException(
+                "sequence number based compare and write is not supported until all nodes are on version 7.0 or higher. " +
+                    "Stream version [" + out.getVersion() + "]");
+        }
     }
 
     @Override

+ 9 - 0
server/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java

@@ -199,6 +199,15 @@ public class IndexRequestBuilder extends ReplicationRequestBuilder<IndexRequest,
         return this;
     }
 
+    /**
+     * only performs this indexing request if the document was last modification was assigned the given
+     * sequence number and primary term
+     */
+    public IndexRequestBuilder setIfMatch(long seqNo, long term) {
+        request.ifMatch(seqNo, term);
+        return this;
+    }
+
     /**
      * Sets the ingest pipeline to be executed before indexing the document
      */

+ 33 - 25
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -510,7 +510,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                                  * the reverted operations on this shard by replaying the translog to avoid losing acknowledged writes.
                                  */
                                 final Engine engine = getEngine();
-                                if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
+                                if (getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO) {
                                     // If the old primary was on an old version that did not replicate the msu,
                                     // we need to bootstrap it manually from its local history.
                                     assert indexSettings.getIndexVersionCreated().before(Version.V_6_5_0);
@@ -686,30 +686,33 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     }
 
     public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType versionType, SourceToParse sourceToParse,
-                                                           long autoGeneratedTimestamp, boolean isRetry) throws IOException {
+                                                           long ifSeqNoMatch, long ifPrimaryTermMatch, long autoGeneratedTimestamp,
+                                                           boolean isRetry)
+        throws IOException {
         assert versionType.validateVersionForWrites(version);
-        return applyIndexOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, autoGeneratedTimestamp,
-            isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse);
+        return applyIndexOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, ifSeqNoMatch,
+            ifPrimaryTermMatch, autoGeneratedTimestamp, isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse);
     }
 
     public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp,
                                                            boolean isRetry, SourceToParse sourceToParse)
         throws IOException {
-        return applyIndexOperation(getEngine(), seqNo, operationPrimaryTerm, version, null, autoGeneratedTimeStamp, isRetry,
-            Engine.Operation.Origin.REPLICA, sourceToParse);
+        return applyIndexOperation(getEngine(), seqNo, operationPrimaryTerm, version, null, UNASSIGNED_SEQ_NO, 0,
+            autoGeneratedTimeStamp, isRetry, Engine.Operation.Origin.REPLICA, sourceToParse);
     }
 
     private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long opPrimaryTerm, long version,
-                                                   @Nullable VersionType versionType, long autoGeneratedTimeStamp, boolean isRetry,
-                                                   Engine.Operation.Origin origin, SourceToParse sourceToParse) throws IOException {
+                                                   @Nullable VersionType versionType, long ifSeqNoMatch, long ifPrimaryTermMatch,
+                                                   long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin,
+                                                   SourceToParse sourceToParse) throws IOException {
         assert opPrimaryTerm <= this.operationPrimaryTerm: "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
             + "]";
         ensureWriteAllowed(origin);
         Engine.Index operation;
         try {
             operation = prepareIndex(docMapper(sourceToParse.type()), indexSettings.getIndexVersionCreated(), sourceToParse, seqNo,
-                    opPrimaryTerm, version, versionType, origin,
-                autoGeneratedTimeStamp, isRetry);
+                opPrimaryTerm, version, versionType, origin, autoGeneratedTimeStamp, isRetry,
+                ifSeqNoMatch, ifPrimaryTermMatch);
             Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
             if (update != null) {
                 return new Engine.IndexResult(update);
@@ -727,8 +730,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     }
 
     public static Engine.Index prepareIndex(DocumentMapperForType docMapper, Version indexCreatedVersion, SourceToParse source, long seqNo,
-            long primaryTerm, long version, VersionType versionType, Engine.Operation.Origin origin, long autoGeneratedIdTimestamp,
-            boolean isRetry) {
+                                            long primaryTerm, long version, VersionType versionType, Engine.Operation.Origin origin,
+                                            long autoGeneratedIdTimestamp, boolean isRetry,
+                                            long ifSeqNoMatch, long ifPrimaryTermMatch) {
         long startTime = System.nanoTime();
         ParsedDocument doc = docMapper.getDocumentMapper().parse(source);
         if (docMapper.getMapping() != null) {
@@ -736,7 +740,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         }
         Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(doc.id()));
         return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry,
-            UNASSIGNED_SEQ_NO, 0);
+            ifSeqNoMatch, ifPrimaryTermMatch);
     }
 
     private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException {
@@ -787,19 +791,22 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         return new Engine.DeleteResult(e, version, operationPrimaryTerm);
     }
 
-    public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType)
+    public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType,
+                                                             long ifSeqNoMatch, long ifPrimaryTermMatch)
         throws IOException {
         assert versionType.validateVersionForWrites(version);
         return applyDeleteOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType,
-            Engine.Operation.Origin.PRIMARY);
+            ifSeqNoMatch, ifPrimaryTermMatch, Engine.Operation.Origin.PRIMARY);
     }
 
     public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException {
-        return applyDeleteOperation(getEngine(), seqNo, operationPrimaryTerm, version, type, id, null, Engine.Operation.Origin.REPLICA);
+        return applyDeleteOperation(
+            getEngine(), seqNo, operationPrimaryTerm, version, type, id, null, UNASSIGNED_SEQ_NO, 0, Engine.Operation.Origin.REPLICA);
     }
 
     private Engine.DeleteResult applyDeleteOperation(Engine engine, long seqNo, long opPrimaryTerm, long version, String type, String id,
-                                                     @Nullable VersionType versionType, Engine.Operation.Origin origin) throws IOException {
+                                                     @Nullable VersionType versionType, long ifSeqNoMatch, long ifPrimaryTermMatch,
+                                                     Engine.Operation.Origin origin) throws IOException {
         assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
             + "]";
         ensureWriteAllowed(origin);
@@ -828,15 +835,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         }
         final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
         final Engine.Delete delete = prepareDelete(type, id, uid, seqNo, opPrimaryTerm, version,
-            versionType, origin);
+            versionType, origin, ifSeqNoMatch, ifPrimaryTermMatch);
         return delete(engine, delete);
     }
 
     private Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version,
-                                               VersionType versionType, Engine.Operation.Origin origin) {
+                                               VersionType versionType, Engine.Operation.Origin origin,
+                                               long ifSeqNoMatch, long ifPrimaryTermMatch) {
         long startTime = System.nanoTime();
         return new Engine.Delete(resolveType(type), id, uid, seqNo, primaryTerm, version, versionType, origin, startTime,
-            UNASSIGNED_SEQ_NO, 0);
+            ifSeqNoMatch, ifPrimaryTermMatch);
     }
 
     private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws IOException {
@@ -1283,14 +1291,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                 // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
                 // autoGeneratedID docs that are coming from the primary are updated correctly.
                 result = applyIndexOperation(engine, index.seqNo(), index.primaryTerm(), index.version(),
-                    versionType, index.getAutoGeneratedIdTimestamp(), true, origin,
+                    versionType, UNASSIGNED_SEQ_NO, 0, index.getAutoGeneratedIdTimestamp(), true, origin,
                     source(shardId.getIndexName(), index.type(), index.id(), index.source(),
                         XContentHelper.xContentType(index.source())).routing(index.routing()));
                 break;
             case DELETE:
                 final Translog.Delete delete = (Translog.Delete) operation;
                 result = applyDeleteOperation(engine, delete.seqNo(), delete.primaryTerm(), delete.version(), delete.type(), delete.id(),
-                    versionType, origin);
+                    versionType, UNASSIGNED_SEQ_NO, 0, origin);
                 break;
             case NO_OP:
                 final Translog.NoOp noOp = (Translog.NoOp) operation;
@@ -1997,7 +2005,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
         synchronized (mutex) {
             replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
-            if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
+            if (getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO) {
                 // If the old primary was on an old version that did not replicate the msu,
                 // we need to bootstrap it manually from its local history.
                 assert indexSettings.getIndexVersionCreated().before(Version.V_6_5_0);
@@ -2916,8 +2924,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
      * @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long)
      */
     public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
-        assert seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO
-            || getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO :
+        assert seqNo != UNASSIGNED_SEQ_NO
+            || getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO :
             "replica has max_seq_no_of_updates=" + getMaxSeqNoOfUpdatesOrDeletes() + " but primary does not";
         getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo);
         assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo;

+ 14 - 9
server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java

@@ -268,7 +268,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
         Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation);
 
         IndexShard shard = mock(IndexShard.class);
-        when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(mappingUpdate);
+        when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
+            .thenReturn(mappingUpdate);
 
         // Pretend the mappings haven't made it to the node yet
         BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
@@ -285,9 +286,10 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
         assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1));
 
         // Verify that the shard "executed" the operation twice
-        verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean());
+        verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean());
 
-        when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(success);
+        when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
+            .thenReturn(success);
 
         TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis,
             (update, shardId, type) -> fail("should not have had to update the mappings"), () -> {});
@@ -295,7 +297,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
 
         // Verify that the shard "executed" the operation only once (2 for previous invocations plus
         // 1 for this execution)
-        verify(shard, times(3)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean());
+        verify(shard, times(3)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean());
 
 
         BulkItemResponse primaryResponse = bulkShardRequest.items()[0].getPrimaryResponse();
@@ -488,7 +490,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
         Exception err = new ElasticsearchException("I'm dead <(x.x)>");
         Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0, 0);
         IndexShard shard = mock(IndexShard.class);
-        when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(indexResult);
+        when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
+            .thenReturn(indexResult);
         when(shard.indexSettings()).thenReturn(indexSettings);
 
         UpdateHelper updateHelper = mock(UpdateHelper.class);
@@ -536,7 +539,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             "I'm conflicted <(;_;)>");
         Engine.IndexResult indexResult = new Engine.IndexResult(err, 0, 0, 0);
         IndexShard shard = mock(IndexShard.class);
-        when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(indexResult);
+        when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
+            .thenReturn(indexResult);
         when(shard.indexSettings()).thenReturn(indexSettings);
 
         UpdateHelper updateHelper = mock(UpdateHelper.class);
@@ -581,7 +585,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
         Translog.Location resultLocation = new Translog.Location(42, 42, 42);
         Engine.IndexResult indexResult = new FakeIndexResult(1, 1, 13, created, resultLocation);
         IndexShard shard = mock(IndexShard.class);
-        when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenReturn(indexResult);
+        when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
+            .thenReturn(indexResult);
         when(shard.indexSettings()).thenReturn(indexSettings);
 
         UpdateHelper updateHelper = mock(UpdateHelper.class);
@@ -626,7 +631,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
         final long resultSeqNo = 13;
         Engine.DeleteResult deleteResult = new FakeDeleteResult(1, 1, resultSeqNo, found, resultLocation);
         IndexShard shard = mock(IndexShard.class);
-        when(shard.applyDeleteOperationOnPrimary(anyLong(), any(), any(), any())).thenReturn(deleteResult);
+        when(shard.applyDeleteOperationOnPrimary(anyLong(), any(), any(), any(), anyLong(), anyLong())).thenReturn(deleteResult);
         when(shard.indexSettings()).thenReturn(indexSettings);
 
         UpdateHelper updateHelper = mock(UpdateHelper.class);
@@ -769,7 +774,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
         Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation);
 
         IndexShard shard = mock(IndexShard.class);
-        when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyBoolean())).thenAnswer(ir -> {
+        when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenAnswer(ir -> {
             if (randomBoolean()) {
                 return conflictedResult;
             }

+ 3 - 2
server/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java

@@ -57,6 +57,7 @@ import org.elasticsearch.index.mapper.TextFieldMapper.TextFieldType;
 import org.elasticsearch.index.query.MatchPhraseQueryBuilder;
 import org.elasticsearch.index.query.QueryShardContext;
 import org.elasticsearch.index.search.MatchQuery;
+import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESSingleNodeTestCase;
@@ -251,7 +252,7 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase {
 
         IndexShard shard = indexService.getShard(0);
         shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
-            sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
+            sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
         shard.refresh("test");
         try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
             LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader();
@@ -293,7 +294,7 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase {
 
         IndexShard shard = indexService.getShard(0);
         shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
-            sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
+            sourceToParse, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
         shard.refresh("test");
         try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
             LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader();

+ 2 - 1
server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java

@@ -46,6 +46,7 @@ import org.elasticsearch.index.engine.EngineTestCase;
 import org.elasticsearch.index.engine.InternalEngineFactory;
 import org.elasticsearch.index.engine.InternalEngineTests;
 import org.elasticsearch.index.mapper.SourceToParse;
+import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardTestCase;
 import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
@@ -210,7 +211,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
                         Versions.MATCH_ANY,
                         VersionType.INTERNAL,
                         SourceToParse.source("index", "type", "primary", new BytesArray("{}"), XContentType.JSON),
-                        randomNonNegativeLong(),
+                        SequenceNumbers.UNASSIGNED_SEQ_NO, 0, randomNonNegativeLong(),
                         false);
             }
             final IndexShard recoveredReplica =

+ 3 - 2
server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java

@@ -63,6 +63,7 @@ import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.SegmentsStats;
 import org.elasticsearch.index.flush.FlushStats;
 import org.elasticsearch.index.mapper.SourceToParse;
+import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
@@ -356,7 +357,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
         assertFalse(shard.shouldPeriodicallyFlush());
         shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
             SourceToParse.source("test", "test", "1", new BytesArray("{}"), XContentType.JSON),
-            IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
+            SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
         assertTrue(shard.shouldPeriodicallyFlush());
         final Translog translog = getTranslog(shard);
         assertEquals(2, translog.stats().getUncommittedOperations());
@@ -406,7 +407,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
             assertThat(translog.currentFileGeneration(), equalTo(generation + rolls));
             final Engine.IndexResult result = shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
                 SourceToParse.source("test", "test", "1", new BytesArray("{}"), XContentType.JSON),
-                IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
+                SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
             final Translog.Location location = result.getTranslogLocation();
             shard.afterWriteOperation();
             if (location.translogLocation + location.size > generationThreshold) {

+ 13 - 11
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -172,6 +172,7 @@ import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting
 import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
 import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
 import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
 import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -323,10 +324,10 @@ public class IndexShardTests extends IndexShardTestCase {
         expectThrows(IndexShardClosedException.class,
             () -> indexShard.acquireAllPrimaryOperationsPermits(null, TimeValue.timeValueSeconds(30L)));
         expectThrows(IndexShardClosedException.class,
-            () -> indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO,
+            () -> indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm(), UNASSIGNED_SEQ_NO,
                 randomNonNegativeLong(), null, ThreadPool.Names.WRITE, ""));
         expectThrows(IndexShardClosedException.class,
-            () -> indexShard.acquireAllReplicaOperationsPermits(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO,
+            () -> indexShard.acquireAllReplicaOperationsPermits(indexShard.getPendingPrimaryTerm(), UNASSIGNED_SEQ_NO,
                 randomNonNegativeLong(), null, TimeValue.timeValueSeconds(30L)));
     }
 
@@ -334,7 +335,7 @@ public class IndexShardTests extends IndexShardTestCase {
         IndexShard indexShard = newShard(false);
         expectThrows(IndexShardNotStartedException.class, () ->
             randomReplicaOperationPermitAcquisition(indexShard, indexShard.getPendingPrimaryTerm() + randomIntBetween(1, 100),
-                SequenceNumbers.UNASSIGNED_SEQ_NO, randomNonNegativeLong(), null, ""));
+                UNASSIGNED_SEQ_NO, randomNonNegativeLong(), null, ""));
         closeShards(indexShard);
     }
 
@@ -828,7 +829,7 @@ public class IndexShardTests extends IndexShardTestCase {
                 newGlobalCheckPoint = randomIntBetween((int) indexShard.getGlobalCheckpoint(), (int) localCheckPoint);
             }
             final long expectedLocalCheckpoint;
-            if (newGlobalCheckPoint == SequenceNumbers.UNASSIGNED_SEQ_NO) {
+            if (newGlobalCheckPoint == UNASSIGNED_SEQ_NO) {
                 expectedLocalCheckpoint = SequenceNumbers.NO_OPS_PERFORMED;
             } else {
                 expectedLocalCheckpoint = newGlobalCheckPoint;
@@ -1039,10 +1040,10 @@ public class IndexShardTests extends IndexShardTestCase {
         indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED));
 
         final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
-        final long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
+        final long globalCheckpointOnReplica = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
         indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test");
 
-        final long globalCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
+        final long globalCheckpoint = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
         final long currentMaxSeqNoOfUpdates = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
         final long maxSeqNoOfUpdatesOrDeletes = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo);
         final Set<String> docsBeforeRollback = getShardDocUIDs(indexShard);
@@ -1104,9 +1105,9 @@ public class IndexShardTests extends IndexShardTestCase {
         final int operations = 1024 - scaledRandomIntBetween(0, 1024);
         indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED));
 
-        final long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
+        final long globalCheckpointOnReplica = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
         indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test");
-        final long globalCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
+        final long globalCheckpoint = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
         Set<String> docsBelowGlobalCheckpoint = getShardDocUIDs(indexShard).stream()
             .filter(id -> Long.parseLong(id) <= Math.max(globalCheckpointOnReplica, globalCheckpoint)).collect(Collectors.toSet());
         final CountDownLatch latch = new CountDownLatch(1);
@@ -1132,7 +1133,7 @@ public class IndexShardTests extends IndexShardTestCase {
                 }, "");
 
         latch.await();
-        if (globalCheckpointOnReplica == SequenceNumbers.UNASSIGNED_SEQ_NO && globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) {
+        if (globalCheckpointOnReplica == UNASSIGNED_SEQ_NO && globalCheckpoint == UNASSIGNED_SEQ_NO) {
             assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
         } else {
             assertThat(indexShard.getLocalCheckpoint(), equalTo(Math.max(globalCheckpoint, globalCheckpointOnReplica)));
@@ -3711,10 +3712,11 @@ public class IndexShardTests extends IndexShardTestCase {
         Engine.IndexResult indexResult = indexDoc(shard, "some_type", "id", "{}");
         assertTrue(indexResult.isCreated());
 
-        DeleteResult deleteResult = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, "some_other_type", "id", VersionType.INTERNAL);
+        DeleteResult deleteResult = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, "some_other_type", "id", VersionType.INTERNAL,
+            UNASSIGNED_SEQ_NO, 0);
         assertFalse(deleteResult.isFound());
 
-        deleteResult = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, "_doc", "id", VersionType.INTERNAL);
+        deleteResult = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, "_doc", "id", VersionType.INTERNAL, UNASSIGNED_SEQ_NO, 0);
         assertTrue(deleteResult.isFound());
 
         closeShards(shard);

+ 2 - 1
server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java

@@ -81,6 +81,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
             // Index doc but not advance local checkpoint.
             shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
                 SourceToParse.source(shard.shardId().getIndexName(), "_doc", Integer.toString(i), new BytesArray("{}"), XContentType.JSON),
+                SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
                 randomBoolean() ? IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP : randomNonNegativeLong(), true);
         }
 
@@ -150,7 +151,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
             // Index doc but not advance local checkpoint.
             shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
                 SourceToParse.source(shard.shardId().getIndexName(), "_doc", Integer.toString(i), new BytesArray("{}"), XContentType.JSON),
-                IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
+                SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
         }
 
         String allocationId = shard.routingEntry().allocationId().getId();

+ 1 - 1
server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java

@@ -314,7 +314,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
             Engine.IndexResult result = primaryShard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
                 SourceToParse.source(primaryShard.shardId().getIndexName(), "_doc", Integer.toString(i), new BytesArray("{}"),
                     XContentType.JSON),
-                IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
+                SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
             assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS));
             if (randomBoolean()) {
                 globalCheckpoint = randomLongBetween(globalCheckpoint, i);

+ 68 - 0
server/src/test/java/org/elasticsearch/versioning/SimpleVersioningIT.java

@@ -281,6 +281,74 @@ public class SimpleVersioningIT extends ESIntegTestCase {
         assertThat(deleteResponse.getVersion(), equalTo(4L));
     }
 
+    public void testCompareAndSet() {
+        createIndex("test");
+        ensureGreen();
+
+        IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet();
+        assertThat(indexResponse.getSeqNo(), equalTo(0L));
+        assertThat(indexResponse.getPrimaryTerm(), equalTo(1L));
+
+        indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(1).execute().actionGet();
+        assertThat(indexResponse.getSeqNo(), equalTo(1L));
+        assertThat(indexResponse.getPrimaryTerm(), equalTo(1L));
+
+        assertThrows(
+            client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setIfMatch(10, 1).execute(),
+            VersionConflictEngineException.class);
+
+        assertThrows(
+            client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setIfMatch(10, 2).execute(),
+            VersionConflictEngineException.class);
+
+        assertThrows(
+            client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setIfMatch(1, 2).execute(),
+            VersionConflictEngineException.class);
+
+
+        assertThrows(client().prepareDelete("test", "type", "1").setIfMatch(10, 1).execute(), VersionConflictEngineException.class);
+        assertThrows(client().prepareDelete("test", "type", "1").setIfMatch(10, 2).execute(), VersionConflictEngineException.class);
+        assertThrows(client().prepareDelete("test", "type", "1").setIfMatch(1, 2).execute(), VersionConflictEngineException.class);
+
+        client().admin().indices().prepareRefresh().execute().actionGet();
+        // TODO: Enable once get response returns seqNo
+//        for (int i = 0; i < 10; i++) {
+//            final GetResponse response = client().prepareGet("test", "type", "1").get();
+//            assertThat(response.getSeqNo(), equalTo(1L));
+//            assertThat(response.getPrimaryTerm(), equalTo(1L));
+//        }
+
+        // search with versioning
+        for (int i = 0; i < 10; i++) {
+            // TODO: ADD SEQ NO!
+            SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setVersion(true).execute().actionGet();
+            assertThat(searchResponse.getHits().getAt(0).getVersion(), equalTo(2L));
+        }
+
+        // search without versioning
+        for (int i = 0; i < 10; i++) {
+            SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).execute().actionGet();
+            assertThat(searchResponse.getHits().getAt(0).getVersion(), equalTo(Versions.NOT_FOUND));
+        }
+
+        DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setIfMatch(1, 1).execute().actionGet();
+        assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
+        assertThat(deleteResponse.getSeqNo(), equalTo(2L));
+        assertThat(deleteResponse.getPrimaryTerm(), equalTo(1L));
+
+        assertThrows(client().prepareDelete("test", "type", "1").setIfMatch(1, 1).execute(), VersionConflictEngineException.class);
+        assertThrows(client().prepareDelete("test", "type", "1").setIfMatch(3, 2).execute(), VersionConflictEngineException.class);
+        assertThrows(client().prepareDelete("test", "type", "1").setIfMatch(1, 2).execute(), VersionConflictEngineException.class);
+
+
+        // This is intricate - the object was deleted but a delete transaction was with the right version. We add another one
+        // and thus the transaction is increased.
+        deleteResponse = client().prepareDelete("test", "type", "1").setIfMatch(2, 1).execute().actionGet();
+        assertEquals(DocWriteResponse.Result.NOT_FOUND, deleteResponse.getResult());
+        assertThat(deleteResponse.getSeqNo(), equalTo(3L));
+        assertThat(deleteResponse.getPrimaryTerm(), equalTo(1L));
+    }
+
     public void testSimpleVersioningWithFlush() throws Exception {
         createIndex("test");
         ensureGreen();

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java

@@ -125,7 +125,7 @@ public class TranslogHandler implements Engine.TranslogRecoveryRunner {
                         source(indexName, index.type(), index.id(), index.source(),
                             XContentHelper.xContentType(index.source()))
                                 .routing(index.routing()), index.seqNo(), index.primaryTerm(),
-                        index.version(), null, origin, index.getAutoGeneratedIdTimestamp(), true);
+                        index.version(), null, origin, index.getAutoGeneratedIdTimestamp(), true, SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
                 return engineIndex;
             case DELETE:
                 final Translog.Delete delete = (Translog.Delete) operation;

+ 4 - 3
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -702,12 +702,12 @@ public abstract class IndexShardTestCase extends ESTestCase {
         Engine.IndexResult result;
         if (shard.routingEntry().primary()) {
             result = shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, sourceToParse,
-                IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
+                SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
             if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
                 updateMappings(shard, IndexMetaData.builder(shard.indexSettings().getIndexMetaData())
                     .putMapping(type, result.getRequiredMappingUpdate().toString()).build());
                 result = shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, sourceToParse,
-                    IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
+                    SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
             }
             shard.updateLocalCheckpointForShard(shard.routingEntry().allocationId().getId(),
                 shard.getLocalCheckpoint());
@@ -731,7 +731,8 @@ public abstract class IndexShardTestCase extends ESTestCase {
     protected Engine.DeleteResult deleteDoc(IndexShard shard, String type, String id) throws IOException {
         final Engine.DeleteResult result;
         if (shard.routingEntry().primary()) {
-            result = shard.applyDeleteOperationOnPrimary(Versions.MATCH_ANY, type, id, VersionType.INTERNAL);
+            result = shard.applyDeleteOperationOnPrimary(
+                Versions.MATCH_ANY, type, id, VersionType.INTERNAL, SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
             shard.updateLocalCheckpointForShard(shard.routingEntry().allocationId().getId(), shard.getEngine().getLocalCheckpoint());
         } else {
             final long seqNo = shard.seqNoStats().getMaxSeqNo() + 1;

+ 2 - 3
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java

@@ -318,12 +318,11 @@ public class FollowingEngineTests extends ESTestCase {
             Engine.Index index = (Engine.Index) op;
             result = engine.index(new Engine.Index(index.uid(), index.parsedDoc(), index.seqNo(), primaryTerm, index.version(),
                 versionType, origin, index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry(),
-                SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
+                index.getIfSeqNoMatch(), index.getIfPrimaryTermMatch()));
         } else if (op instanceof Engine.Delete) {
             Engine.Delete delete = (Engine.Delete) op;
             result = engine.delete(new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), primaryTerm,
-                delete.version(), versionType, origin, delete.startTime(),
-                SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
+                delete.version(), versionType, origin, delete.startTime(), delete.getIfSeqNoMatch(), delete.getIfPrimaryTermMatch()));
         } else {
             Engine.NoOp noOp = (Engine.NoOp) op;
             result = engine.noOp(new Engine.NoOp(noOp.seqNo(), primaryTerm, origin, noOp.startTime(), noOp.reason()));

+ 2 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java

@@ -48,6 +48,7 @@ import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.SeqNoFieldMapper;
 import org.elasticsearch.index.mapper.Uid;
 import org.elasticsearch.index.seqno.SeqNoStats;
+import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardState;
 import org.elasticsearch.index.shard.IndexShardTestCase;
@@ -294,7 +295,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
                         assert source != null : "_source is null but should have been filtered out at snapshot time";
                         Engine.Result result = targetShard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, source
                             (index, uid.type(), uid.id(), source, XContentHelper.xContentType(source))
-                            .routing(rootFieldsVisitor.routing()), 1, false);
+                            .routing(rootFieldsVisitor.routing()), SequenceNumbers.UNASSIGNED_SEQ_NO, 0, 1, false);
                         if (result.getResultType() != Engine.Result.Type.SUCCESS) {
                             throw new IllegalStateException("failed applying post restore operation result: " + result
                                 .getResultType(), result.getFailure());