Browse Source

[Transform] Fix issue if upgrade runs right after a rolling cluster upgrade (#80579)

do not fail a running transform if upgrader rewrites state inbetween

fixes #80073
Hendrik Muhs 3 years ago
parent
commit
44314120fc

+ 3 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformUpdater.java

@@ -224,14 +224,16 @@ public class TransformUpdater {
 
             long lastCheckpoint = currentState.v1().getTransformState().getCheckpoint();
 
+            // if: the state is stored on the latest index, it does not need an update
             if (currentState.v2().getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) {
                 listener.onResponse(lastCheckpoint);
                 return;
             }
 
+            // else: the state is on an old index, update by persisting it to the latest index
             transformConfigManager.putOrUpdateTransformStoredDoc(
                 currentState.v1(),
-                currentState.v2(),
+                null, // set seqNoPrimaryTermAndIndex to `null` to force optype `create`, gh#80073
                 ActionListener.wrap(r -> { listener.onResponse(lastCheckpoint); }, e -> {
                     if (org.elasticsearch.ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
                         // if a version conflict occurs a new state has been written between us reading and writing.

+ 11 - 7
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java

@@ -682,16 +682,20 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
             IndexRequest indexRequest = new IndexRequest(TransformInternalIndexConstants.LATEST_INDEX_NAME).setRefreshPolicy(
                 WriteRequest.RefreshPolicy.IMMEDIATE
             ).id(TransformStoredDoc.documentId(storedDoc.getId())).source(source);
-            if (seqNoPrimaryTermAndIndex != null
-                && seqNoPrimaryTermAndIndex.getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_NAME)) {
-                indexRequest.opType(DocWriteRequest.OpType.INDEX)
-                    .setIfSeqNo(seqNoPrimaryTermAndIndex.getSeqNo())
-                    .setIfPrimaryTerm(seqNoPrimaryTermAndIndex.getPrimaryTerm());
+            if (seqNoPrimaryTermAndIndex != null) {
+                // if seqNoPrimaryTermAndIndex is set, use optype index even if not on the latest index, because the upgrader
+                // could have been called, see gh#80073
+                indexRequest.opType(DocWriteRequest.OpType.INDEX);
+                // if on the latest index use optimistic concurrency control in addition
+                if (seqNoPrimaryTermAndIndex.getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_NAME)) {
+                    indexRequest.setIfSeqNo(seqNoPrimaryTermAndIndex.getSeqNo())
+                        .setIfPrimaryTerm(seqNoPrimaryTermAndIndex.getPrimaryTerm());
+                }
             } else {
-                // If the index is NOT the latest or we are null, that means we have not created this doc before
-                // so, it should be a create option without the seqNo and primaryTerm set
+                // we have not created this doc before or we are called from the upgrader
                 indexRequest.opType(DocWriteRequest.OpType.CREATE);
             }
+
             executeAsyncWithOrigin(
                 client,
                 TRANSFORM_ORIGIN,