Browse Source

Primary and replica tasks must share parent task IDs (#93055)

David Turner 2 years ago
parent
commit
17f2696102

+ 3 - 1
server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

@@ -1140,8 +1140,10 @@ public abstract class TransportReplicationAction<
         public void perform(Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener) {
             if (Assertions.ENABLED) {
                 listener = listener.map(result -> {
-                    assert result.replicaRequest() == null || result.finalFailure == null
+                    assert (result.replicaRequest() == null) != (result.finalFailure == null)
                         : "a replica request [" + result.replicaRequest() + "] with a primary failure [" + result.finalFailure + "]";
+                    assert result.replicaRequest() == null || result.replicaRequest().getParentTask().equals(request.getParentTask())
+                        : "a replica request [" + result.replicaRequest() + "] with a different parent task from the primary request";
                     return result;
                 });
             }

+ 6 - 5
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java

@@ -97,17 +97,18 @@ public class TransportBulkShardOperationsAction extends TransportWriteAction<
         if (logger.isTraceEnabled()) {
             logger.trace("index [{}] on the following primary shard {}", request.getOperations(), primary.routingEntry());
         }
-        ActionListener.completeWith(
-            listener,
-            () -> shardOperationOnPrimary(
+        ActionListener.completeWith(listener, () -> {
+            final var result = shardOperationOnPrimary(
                 request.shardId(),
                 request.getHistoryUUID(),
                 request.getOperations(),
                 request.getMaxSeqNoOfUpdatesOrDeletes(),
                 primary,
                 logger
-            )
-        );
+            );
+            result.replicaRequest().setParentTask(request.getParentTask());
+            return result;
+        });
     }
 
     @Override