浏览代码

Replicate write actions before fsyncing them (#49746)

This commit fixes a number of issues with data replication:

- Local and global checkpoints are not updated after the new operations have been fsynced, but
might capture a state before the fsync. The reason why this probably went undetected for so
long is that AsyncIOProcessor is synchronous if you index one item at a time, and hence working
as intended unless you have a high enough level of concurrent indexing. As we rely in other
places on the assumption that we have an up-to-date local checkpoint in case of synchronous
translog durability, there's a risk for the local and global checkpoints not to be up-to-date after
replication completes, and that this won't be corrected by the periodic global checkpoint sync.
- AsyncIOProcessor also has another "bad" side effect here: if you index one bulk at a time, the
bulk is always first fsynced on the primary before being sent to the replica. Further, if one thread
is tasked by AsyncIOProcessor to drain the processing queue and fsync, other threads can
easily pile more bulk requests on top of that thread. Things are not very fair here, and the thread
might continue doing a lot more fsyncs before returning (as the other threads pile more and
more on top), which blocks it from returning as a replication request (e.g. if this thread is on the
primary, it blocks the replication requests to the replicas from going out, and delaying
checkpoint advancement).

This commit fixes all these issues, and also simplifies the code that coordinates all the after
write actions.
Yannick Welsch 5 年之前
父节点
当前提交
8c165e04a1

+ 43 - 13
server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

@@ -46,6 +46,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Locale;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.LongSupplier;
 
 
 public class ReplicationOperation<
 public class ReplicationOperation<
             Request extends ReplicationRequest<Request>,
             Request extends ReplicationRequest<Request>,
@@ -110,8 +111,6 @@ public class ReplicationOperation<
 
 
     private void handlePrimaryResult(final PrimaryResultT primaryResult) {
     private void handlePrimaryResult(final PrimaryResultT primaryResult) {
         this.primaryResult = primaryResult;
         this.primaryResult = primaryResult;
-        primary.updateLocalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.localCheckpoint());
-        primary.updateGlobalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.globalCheckpoint());
         final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
         final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
         if (replicaRequest != null) {
         if (replicaRequest != null) {
             if (logger.isTraceEnabled()) {
             if (logger.isTraceEnabled()) {
@@ -134,8 +133,26 @@ public class ReplicationOperation<
             markUnavailableShardsAsStale(replicaRequest, replicationGroup);
             markUnavailableShardsAsStale(replicaRequest, replicationGroup);
             performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
             performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
         }
         }
-        successfulShards.incrementAndGet();  // mark primary as successful
-        decPendingAndFinishIfNeeded();
+        primaryResult.runPostReplicationActions(new ActionListener<>() {
+
+            @Override
+            public void onResponse(Void aVoid) {
+                successfulShards.incrementAndGet();
+                try {
+                    updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint);
+                } finally {
+                    decPendingAndFinishIfNeeded();
+                }
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                logger.trace("[{}] op [{}] post replication actions failed for [{}]", primary.routingEntry().shardId(), opType, request);
+                // TODO: fail shard? This will otherwise have the local / global checkpoint info lagging, or possibly have replicas
+                // go out of sync with the primary
+                finishAsFailed(e);
+            }
+        });
     }
     }
 
 
     private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, ReplicationGroup replicationGroup) {
     private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, ReplicationGroup replicationGroup) {
@@ -176,16 +193,10 @@ public class ReplicationOperation<
                 public void onResponse(ReplicaResponse response) {
                 public void onResponse(ReplicaResponse response) {
                     successfulShards.incrementAndGet();
                     successfulShards.incrementAndGet();
                     try {
                     try {
-                        primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
-                        primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
-                    } catch (final AlreadyClosedException e) {
-                        // the index was deleted or this shard was never activated after a relocation; fall through and finish normally
-                    } catch (final Exception e) {
-                        // fail the primary but fall through and let the rest of operation processing complete
-                        final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
-                        primary.failShard(message, e);
+                        updateCheckPoints(shard, response::localCheckpoint, response::globalCheckpoint);
+                    } finally {
+                        decPendingAndFinishIfNeeded();
                     }
                     }
-                    decPendingAndFinishIfNeeded();
                 }
                 }
 
 
                 @Override
                 @Override
@@ -211,6 +222,19 @@ public class ReplicationOperation<
             });
             });
     }
     }
 
 
+    private void updateCheckPoints(ShardRouting shard, LongSupplier localCheckpointSupplier, LongSupplier globalCheckpointSupplier) {
+        try {
+            primary.updateLocalCheckpointForShard(shard.allocationId().getId(), localCheckpointSupplier.getAsLong());
+            primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), globalCheckpointSupplier.getAsLong());
+        } catch (final AlreadyClosedException e) {
+            // the index was deleted or this shard was never activated after a relocation; fall through and finish normally
+        } catch (final Exception e) {
+            // fail the primary but fall through and let the rest of operation processing complete
+            final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
+            primary.failShard(message, e);
+        }
+    }
+
     private void onNoLongerPrimary(Exception failure) {
     private void onNoLongerPrimary(Exception failure) {
         final Throwable cause = ExceptionsHelper.unwrapCause(failure);
         final Throwable cause = ExceptionsHelper.unwrapCause(failure);
         final boolean nodeIsClosing = cause instanceof NodeClosedException;
         final boolean nodeIsClosing = cause instanceof NodeClosedException;
@@ -464,5 +488,11 @@ public class ReplicationOperation<
         @Nullable RequestT replicaRequest();
         @Nullable RequestT replicaRequest();
 
 
         void setShardInfo(ReplicationResponse.ShardInfo shardInfo);
         void setShardInfo(ReplicationResponse.ShardInfo shardInfo);
+
+        /**
+         * Run actions to be triggered post replication
+         * @param listener calllback that is invoked after post replication actions have completed
+         * */
+        void runPostReplicationActions(ActionListener<Void> listener);
     }
     }
 }
 }

+ 47 - 54
server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

@@ -73,8 +73,6 @@ import org.elasticsearch.transport.TransportChannel;
 import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportRequestOptions;
 import org.elasticsearch.transport.TransportRequestOptions;
-import org.elasticsearch.transport.TransportResponse;
-import org.elasticsearch.transport.TransportResponse.Empty;
 import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.transport.TransportService;
 
 
@@ -346,17 +344,12 @@ public abstract class TransportReplicationAction<
                 } else {
                 } else {
                     setPhase(replicationTask, "primary");
                     setPhase(replicationTask, "primary");
 
 
-                    final ActionListener<Response> referenceClosingListener = ActionListener.wrap(response -> {
-                        primaryShardReference.close(); // release shard operation lock before responding to caller
-                        setPhase(replicationTask, "finished");
-                        onCompletionListener.onResponse(response);
-                    }, e -> handleException(primaryShardReference, e));
+                    final ActionListener<Response> responseListener = ActionListener.wrap(response -> {
+                        adaptResponse(response, primaryShardReference.indexShard);
 
 
-                    final ActionListener<Response> globalCheckpointSyncingListener = ActionListener.wrap(response -> {
                         if (syncGlobalCheckpointAfterOperation) {
                         if (syncGlobalCheckpointAfterOperation) {
-                            final IndexShard shard = primaryShardReference.indexShard;
                             try {
                             try {
-                                shard.maybeSyncGlobalCheckpoint("post-operation");
+                                primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation");
                             } catch (final Exception e) {
                             } catch (final Exception e) {
                                 // only log non-closed exceptions
                                 // only log non-closed exceptions
                                 if (ExceptionsHelper.unwrap(
                                 if (ExceptionsHelper.unwrap(
@@ -364,15 +357,19 @@ public abstract class TransportReplicationAction<
                                     // intentionally swallow, a missed global checkpoint sync should not fail this operation
                                     // intentionally swallow, a missed global checkpoint sync should not fail this operation
                                     logger.info(
                                     logger.info(
                                         new ParameterizedMessage(
                                         new ParameterizedMessage(
-                                            "{} failed to execute post-operation global checkpoint sync", shard.shardId()), e);
+                                            "{} failed to execute post-operation global checkpoint sync",
+                                            primaryShardReference.indexShard.shardId()), e);
                                 }
                                 }
                             }
                             }
                         }
                         }
-                        referenceClosingListener.onResponse(response);
-                    }, referenceClosingListener::onFailure);
+
+                        primaryShardReference.close(); // release shard operation lock before responding to caller
+                        setPhase(replicationTask, "finished");
+                        onCompletionListener.onResponse(response);
+                    }, e -> handleException(primaryShardReference, e));
 
 
                     new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference,
                     new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference,
-                        ActionListener.wrap(result -> result.respond(globalCheckpointSyncingListener), referenceClosingListener::onFailure),
+                        ActionListener.map(responseListener, result -> result.finalResponseIfSuccessful),
                         newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm()).execute();
                         newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm()).execute();
                 }
                 }
             } catch (Exception e) {
             } catch (Exception e) {
@@ -393,10 +390,19 @@ public abstract class TransportReplicationAction<
 
 
     }
     }
 
 
+    // allows subclasses to adapt the response
+    protected void adaptResponse(Response response, IndexShard indexShard) {
+
+    }
+
+    protected ActionListener<Response> wrapResponseActionListener(ActionListener<Response> listener, IndexShard shard) {
+        return listener;
+    }
+
     public static class PrimaryResult<ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
     public static class PrimaryResult<ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
             Response extends ReplicationResponse>
             Response extends ReplicationResponse>
             implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
             implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
-        final ReplicaRequest replicaRequest;
+        protected final ReplicaRequest replicaRequest;
         public final Response finalResponseIfSuccessful;
         public final Response finalResponseIfSuccessful;
         public final Exception finalFailure;
         public final Exception finalFailure;
 
 
@@ -429,11 +435,12 @@ public abstract class TransportReplicationAction<
             }
             }
         }
         }
 
 
-        public void respond(ActionListener<Response> listener) {
-            if (finalResponseIfSuccessful != null) {
-                listener.onResponse(finalResponseIfSuccessful);
-            } else {
+        @Override
+        public void runPostReplicationActions(ActionListener<Void> listener) {
+            if (finalFailure != null) {
                 listener.onFailure(finalFailure);
                 listener.onFailure(finalFailure);
+            } else {
+                listener.onResponse(null);
             }
             }
         }
         }
     }
     }
@@ -449,11 +456,11 @@ public abstract class TransportReplicationAction<
             this(null);
             this(null);
         }
         }
 
 
-        public void respond(ActionListener<TransportResponse.Empty> listener) {
-            if (finalFailure == null) {
-                listener.onResponse(TransportResponse.Empty.INSTANCE);
-            } else {
+        public void runPostReplicaActions(ActionListener<Void> listener) {
+            if (finalFailure != null) {
                 listener.onFailure(finalFailure);
                 listener.onFailure(finalFailure);
+            } else {
+                listener.onResponse(null);
             }
             }
         }
         }
     }
     }
@@ -503,10 +510,23 @@ public abstract class TransportReplicationAction<
             try {
             try {
                 assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit";
                 assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit";
                 final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica);
                 final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica);
-                releasable.close(); // release shard operation lock before responding to caller
-                final TransportReplicationAction.ReplicaResponse response =
-                        new ReplicaResponse(replica.getLocalCheckpoint(), replica.getLastSyncedGlobalCheckpoint());
-                replicaResult.respond(new ResponseListener(response));
+                replicaResult.runPostReplicaActions(
+                    ActionListener.wrap(r -> {
+                        final TransportReplicationAction.ReplicaResponse response =
+                            new ReplicaResponse(replica.getLocalCheckpoint(), replica.getLastSyncedGlobalCheckpoint());
+                        releasable.close(); // release shard operation lock before responding to caller
+                        if (logger.isTraceEnabled()) {
+                            logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction,
+                                replicaRequest.getRequest().shardId(),
+                                replicaRequest.getRequest());
+                        }
+                        setPhase(task, "finished");
+                        onCompletionListener.onResponse(response);
+                    }, e -> {
+                        Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
+                        this.responseWithFailure(e);
+                    })
+                );
             } catch (final Exception e) {
             } catch (final Exception e) {
                 Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
                 Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
                 AsyncReplicaAction.this.onFailure(e);
                 AsyncReplicaAction.this.onFailure(e);
@@ -564,33 +584,6 @@ public abstract class TransportReplicationAction<
             acquireReplicaOperationPermit(replica, replicaRequest.getRequest(), this, replicaRequest.getPrimaryTerm(),
             acquireReplicaOperationPermit(replica, replicaRequest.getRequest(), this, replicaRequest.getPrimaryTerm(),
                 replicaRequest.getGlobalCheckpoint(), replicaRequest.getMaxSeqNoOfUpdatesOrDeletes());
                 replicaRequest.getGlobalCheckpoint(), replicaRequest.getMaxSeqNoOfUpdatesOrDeletes());
         }
         }
-
-        /**
-         * Listens for the response on the replica and sends the response back to the primary.
-         */
-        private class ResponseListener implements ActionListener<TransportResponse.Empty> {
-            private final ReplicaResponse replicaResponse;
-
-            ResponseListener(ReplicaResponse replicaResponse) {
-                this.replicaResponse = replicaResponse;
-            }
-
-            @Override
-            public void onResponse(Empty response) {
-                if (logger.isTraceEnabled()) {
-                    logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction,
-                        replicaRequest.getRequest().shardId(),
-                        replicaRequest.getRequest());
-                }
-                setPhase(task, "finished");
-                onCompletionListener.onResponse(replicaResponse);
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                responseWithFailure(e);
-            }
-        }
     }
     }
 
 
     private IndexShard getIndexShard(final ShardId shardId) {
     private IndexShard getIndexShard(final ShardId shardId) {

+ 42 - 76
server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

@@ -41,7 +41,6 @@ import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.index.translog.Translog.Location;
 import org.elasticsearch.index.translog.Translog.Location;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.TransportResponse;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.transport.TransportService;
 
 
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -124,12 +123,10 @@ public abstract class TransportWriteAction<
      * NOTE: public for testing
      * NOTE: public for testing
      */
      */
     public static class WritePrimaryResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>,
     public static class WritePrimaryResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>,
-            Response extends ReplicationResponse & WriteResponse> extends PrimaryResult<ReplicaRequest, Response>
-            implements RespondingWriteResult {
-        boolean finishedAsyncActions;
+            Response extends ReplicationResponse & WriteResponse> extends PrimaryResult<ReplicaRequest, Response> {
         public final Location location;
         public final Location location;
         public final IndexShard primary;
         public final IndexShard primary;
-        ActionListener<Response> listener = null;
+        private final Logger logger;
 
 
         public WritePrimaryResult(ReplicaRequest request, @Nullable Response finalResponse,
         public WritePrimaryResult(ReplicaRequest request, @Nullable Response finalResponse,
                                   @Nullable Location location, @Nullable Exception operationFailure,
                                   @Nullable Location location, @Nullable Exception operationFailure,
@@ -137,104 +134,73 @@ public abstract class TransportWriteAction<
             super(request, finalResponse, operationFailure);
             super(request, finalResponse, operationFailure);
             this.location = location;
             this.location = location;
             this.primary = primary;
             this.primary = primary;
+            this.logger = logger;
             assert location == null || operationFailure == null
             assert location == null || operationFailure == null
                     : "expected either failure to be null or translog location to be null, " +
                     : "expected either failure to be null or translog location to be null, " +
                     "but found: [" + location + "] translog location and [" + operationFailure + "] failure";
                     "but found: [" + location + "] translog location and [" + operationFailure + "] failure";
-            if (operationFailure != null) {
-                this.finishedAsyncActions = true;
+        }
+
+        @Override
+        public void runPostReplicationActions(ActionListener<Void> listener) {
+            if (finalFailure != null) {
+                listener.onFailure(finalFailure);
             } else {
             } else {
                 /*
                 /*
-                 * We call this before replication because this might wait for a refresh and that can take a while.
+                 * We call this after replication because this might wait for a refresh and that can take a while.
                  * This way we wait for the refresh in parallel on the primary and on the replica.
                  * This way we wait for the refresh in parallel on the primary and on the replica.
                  */
                  */
-                new AsyncAfterWriteAction(primary, request, location, this, logger).run();
-            }
-        }
-
-        @Override
-        public synchronized void respond(ActionListener<Response> listener) {
-            this.listener = listener;
-            respondIfPossible(null);
-        }
+                new AsyncAfterWriteAction(primary, replicaRequest, location, new RespondingWriteResult() {
+                    @Override
+                    public void onSuccess(boolean forcedRefresh) {
+                        finalResponseIfSuccessful.setForcedRefresh(forcedRefresh);
+                        listener.onResponse(null);
+                    }
 
 
-        /**
-         * Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}.
-         */
-        protected void respondIfPossible(Exception ex) {
-            assert Thread.holdsLock(this);
-            if (finishedAsyncActions && listener != null) {
-                if (ex == null) {
-                    super.respond(listener);
-                } else {
-                    listener.onFailure(ex);
-                }
+                    @Override
+                    public void onFailure(Exception ex) {
+                        listener.onFailure(ex);
+                    }
+                }, logger).run();
             }
             }
         }
         }
-
-        public synchronized void onFailure(Exception exception) {
-            finishedAsyncActions = true;
-            respondIfPossible(exception);
-        }
-
-        @Override
-        public synchronized void onSuccess(boolean forcedRefresh) {
-            finalResponseIfSuccessful.setForcedRefresh(forcedRefresh);
-            finishedAsyncActions = true;
-            respondIfPossible(null);
-        }
     }
     }
 
 
     /**
     /**
      * Result of taking the action on the replica.
      * Result of taking the action on the replica.
      */
      */
-    public static class WriteReplicaResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>>
-            extends ReplicaResult implements RespondingWriteResult {
+    public static class WriteReplicaResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>> extends ReplicaResult {
         public final Location location;
         public final Location location;
-        boolean finishedAsyncActions;
-        private ActionListener<TransportResponse.Empty> listener;
+        private final ReplicaRequest request;
+        private final IndexShard replica;
+        private final Logger logger;
 
 
         public WriteReplicaResult(ReplicaRequest request, @Nullable Location location,
         public WriteReplicaResult(ReplicaRequest request, @Nullable Location location,
                                   @Nullable Exception operationFailure, IndexShard replica, Logger logger) {
                                   @Nullable Exception operationFailure, IndexShard replica, Logger logger) {
             super(operationFailure);
             super(operationFailure);
             this.location = location;
             this.location = location;
-            if (operationFailure != null) {
-                this.finishedAsyncActions = true;
-            } else {
-                new AsyncAfterWriteAction(replica, request, location, this, logger).run();
-            }
+            this.request = request;
+            this.replica = replica;
+            this.logger = logger;
         }
         }
 
 
         @Override
         @Override
-        public synchronized void respond(ActionListener<TransportResponse.Empty> listener) {
-            this.listener = listener;
-            respondIfPossible(null);
-        }
+        public void runPostReplicaActions(ActionListener<Void> listener) {
+            if (finalFailure != null) {
+                listener.onFailure(finalFailure);
+            } else {
+                new AsyncAfterWriteAction(replica, request, location, new RespondingWriteResult() {
+                    @Override
+                    public void onSuccess(boolean forcedRefresh) {
+                        listener.onResponse(null);
+                    }
 
 
-        /**
-         * Respond if the refresh has occurred and the listener is ready. Always called while synchronized on {@code this}.
-         */
-        protected void respondIfPossible(Exception ex) {
-            assert Thread.holdsLock(this);
-            if (finishedAsyncActions && listener != null) {
-                if (ex == null) {
-                    super.respond(listener);
-                } else {
-                    listener.onFailure(ex);
-                }
+                    @Override
+                    public void onFailure(Exception ex) {
+                        listener.onFailure(ex);
+                    }
+                }, logger).run();
             }
             }
         }
         }
-
-        @Override
-        public synchronized void onFailure(Exception ex) {
-            finishedAsyncActions = true;
-            respondIfPossible(ex);
-        }
-
-        @Override
-        public synchronized void onSuccess(boolean forcedRefresh) {
-            finishedAsyncActions = true;
-            respondIfPossible(null);
-        }
     }
     }
 
 
     @Override
     @Override

+ 5 - 0
server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java

@@ -348,6 +348,11 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
             this.shardInfo.set(shardInfo);
             this.shardInfo.set(shardInfo);
         }
         }
 
 
+        @Override
+        public void runPostReplicationActions(ActionListener<Void> listener) {
+            listener.onResponse(null);
+        }
+
         public ReplicationResponse.ShardInfo getShardInfo() {
         public ReplicationResponse.ShardInfo getShardInfo() {
             return shardInfo.get();
             return shardInfo.get();
         }
         }

+ 11 - 0
server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java

@@ -124,6 +124,7 @@ public class ReplicationOperationTests extends ESTestCase {
         assertThat(request.processedOnReplicas, equalTo(expectedReplicas));
         assertThat(request.processedOnReplicas, equalTo(expectedReplicas));
         assertThat(replicasProxy.failedReplicas, equalTo(simulatedFailures.keySet()));
         assertThat(replicasProxy.failedReplicas, equalTo(simulatedFailures.keySet()));
         assertThat(replicasProxy.markedAsStaleCopies, equalTo(staleAllocationIds));
         assertThat(replicasProxy.markedAsStaleCopies, equalTo(staleAllocationIds));
+        assertThat("post replication operations not run on primary", request.runPostReplicationActionsOnPrimary.get(), equalTo(true));
         assertTrue("listener is not marked as done", listener.isDone());
         assertTrue("listener is not marked as done", listener.isDone());
         ShardInfo shardInfo = listener.actionGet().getShardInfo();
         ShardInfo shardInfo = listener.actionGet().getShardInfo();
         assertThat(shardInfo.getFailed(), equalTo(reportedFailures.size()));
         assertThat(shardInfo.getFailed(), equalTo(reportedFailures.size()));
@@ -437,6 +438,7 @@ public class ReplicationOperationTests extends ESTestCase {
 
 
     public static class Request extends ReplicationRequest<Request> {
     public static class Request extends ReplicationRequest<Request> {
         public AtomicBoolean processedOnPrimary = new AtomicBoolean();
         public AtomicBoolean processedOnPrimary = new AtomicBoolean();
+        public AtomicBoolean runPostReplicationActionsOnPrimary = new AtomicBoolean();
         public Set<ShardRouting> processedOnReplicas = ConcurrentCollections.newConcurrentSet();
         public Set<ShardRouting> processedOnReplicas = ConcurrentCollections.newConcurrentSet();
 
 
         Request(ShardId shardId) {
         Request(ShardId shardId) {
@@ -505,6 +507,14 @@ public class ReplicationOperationTests extends ESTestCase {
                 this.shardInfo = shardInfo;
                 this.shardInfo = shardInfo;
             }
             }
 
 
+            @Override
+            public void runPostReplicationActions(ActionListener<Void> listener) {
+                if (request.runPostReplicationActionsOnPrimary.compareAndSet(false, true) == false) {
+                    fail("processed [" + request + "] twice");
+                }
+                listener.onResponse(null);
+            }
+
             public ShardInfo getShardInfo() {
             public ShardInfo getShardInfo() {
                 return shardInfo;
                 return shardInfo;
             }
             }
@@ -597,6 +607,7 @@ public class ReplicationOperationTests extends ESTestCase {
                 final long maxSeqNoOfUpdatesOrDeletes,
                 final long maxSeqNoOfUpdatesOrDeletes,
                 final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
                 final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
             assertTrue("replica request processed twice on [" + replica + "]", request.processedOnReplicas.add(replica));
             assertTrue("replica request processed twice on [" + replica + "]", request.processedOnReplicas.add(replica));
+            assertFalse("primary post replication actions should run after replication", request.runPostReplicationActionsOnPrimary.get());
             if (opFailures.containsKey(replica)) {
             if (opFailures.containsKey(replica)) {
                 listener.onFailure(opFailures.get(replica));
                 listener.onFailure(opFailures.get(replica));
             } else {
             } else {

+ 10 - 48
server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java

@@ -67,9 +67,6 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.HashSet;
 import java.util.Locale;
 import java.util.Locale;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -143,7 +140,7 @@ public class TransportWriteActionTests extends ESTestCase {
         testAction.shardOperationOnPrimary(request, indexShard,
         testAction.shardOperationOnPrimary(request, indexShard,
             ActionTestUtils.assertNoFailureListener(result -> {
             ActionTestUtils.assertNoFailureListener(result -> {
                 CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
                 CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
-                result.respond(listener);
+                result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful));
                 assertNotNull(listener.response);
                 assertNotNull(listener.response);
                 assertNull(listener.failure);
                 assertNull(listener.failure);
                 verify(indexShard, never()).refresh(any());
                 verify(indexShard, never()).refresh(any());
@@ -158,7 +155,7 @@ public class TransportWriteActionTests extends ESTestCase {
         TransportWriteAction.WriteReplicaResult<TestRequest> result =
         TransportWriteAction.WriteReplicaResult<TestRequest> result =
                 testAction.shardOperationOnReplica(request, indexShard);
                 testAction.shardOperationOnReplica(request, indexShard);
         CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
         CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
-        result.respond(listener);
+        result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE));
         assertNotNull(listener.response);
         assertNotNull(listener.response);
         assertNull(listener.failure);
         assertNull(listener.failure);
         verify(indexShard, never()).refresh(any());
         verify(indexShard, never()).refresh(any());
@@ -172,7 +169,7 @@ public class TransportWriteActionTests extends ESTestCase {
         testAction.shardOperationOnPrimary(request, indexShard,
         testAction.shardOperationOnPrimary(request, indexShard,
             ActionTestUtils.assertNoFailureListener(result -> {
             ActionTestUtils.assertNoFailureListener(result -> {
                 CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
                 CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
-                result.respond(listener);
+                result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful));
                 assertNotNull(listener.response);
                 assertNotNull(listener.response);
                 assertNull(listener.failure);
                 assertNull(listener.failure);
                 assertTrue(listener.response.forcedRefresh);
                 assertTrue(listener.response.forcedRefresh);
@@ -188,7 +185,7 @@ public class TransportWriteActionTests extends ESTestCase {
         TransportWriteAction.WriteReplicaResult<TestRequest> result =
         TransportWriteAction.WriteReplicaResult<TestRequest> result =
                 testAction.shardOperationOnReplica(request, indexShard);
                 testAction.shardOperationOnReplica(request, indexShard);
         CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
         CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
-        result.respond(listener);
+        result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE));
         assertNotNull(listener.response);
         assertNotNull(listener.response);
         assertNull(listener.failure);
         assertNull(listener.failure);
         verify(indexShard).refresh("refresh_flag_index");
         verify(indexShard).refresh("refresh_flag_index");
@@ -203,7 +200,7 @@ public class TransportWriteActionTests extends ESTestCase {
         testAction.shardOperationOnPrimary(request, indexShard,
         testAction.shardOperationOnPrimary(request, indexShard,
             ActionTestUtils.assertNoFailureListener(result -> {
             ActionTestUtils.assertNoFailureListener(result -> {
                 CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
                 CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
-                result.respond(listener);
+                result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful));
                 assertNull(listener.response); // Haven't really responded yet
                 assertNull(listener.response); // Haven't really responded yet
 
 
                 @SuppressWarnings({"unchecked", "rawtypes"})
                 @SuppressWarnings({"unchecked", "rawtypes"})
@@ -226,7 +223,7 @@ public class TransportWriteActionTests extends ESTestCase {
         TestAction testAction = new TestAction();
         TestAction testAction = new TestAction();
         TransportWriteAction.WriteReplicaResult<TestRequest> result = testAction.shardOperationOnReplica(request, indexShard);
         TransportWriteAction.WriteReplicaResult<TestRequest> result = testAction.shardOperationOnReplica(request, indexShard);
         CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
         CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
-        result.respond(listener);
+        result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE));
         assertNull(listener.response); // Haven't responded yet
         assertNull(listener.response); // Haven't responded yet
         @SuppressWarnings({ "unchecked", "rawtypes" })
         @SuppressWarnings({ "unchecked", "rawtypes" })
         ArgumentCaptor<Consumer<Boolean>> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class);
         ArgumentCaptor<Consumer<Boolean>> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class);
@@ -244,9 +241,9 @@ public class TransportWriteActionTests extends ESTestCase {
         TestRequest request = new TestRequest();
         TestRequest request = new TestRequest();
         TestAction testAction = new TestAction(true, true);
         TestAction testAction = new TestAction(true, true);
         testAction.shardOperationOnPrimary(request, indexShard,
         testAction.shardOperationOnPrimary(request, indexShard,
-            ActionTestUtils.assertNoFailureListener(writePrimaryResult -> {
+            ActionTestUtils.assertNoFailureListener(result -> {
                 CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
                 CapturingActionListener<TestResponse> listener = new CapturingActionListener<>();
-                writePrimaryResult.respond(listener);
+                result.runPostReplicationActions(ActionListener.map(listener, ignore -> result.finalResponseIfSuccessful));
                 assertNull(listener.response);
                 assertNull(listener.response);
                 assertNotNull(listener.failure);
                 assertNotNull(listener.failure);
             }));
             }));
@@ -255,10 +252,10 @@ public class TransportWriteActionTests extends ESTestCase {
     public void testDocumentFailureInShardOperationOnReplica() throws Exception {
     public void testDocumentFailureInShardOperationOnReplica() throws Exception {
         TestRequest request = new TestRequest();
         TestRequest request = new TestRequest();
         TestAction testAction = new TestAction(randomBoolean(), true);
         TestAction testAction = new TestAction(randomBoolean(), true);
-        TransportWriteAction.WriteReplicaResult<TestRequest> writeReplicaResult =
+        TransportWriteAction.WriteReplicaResult<TestRequest> result =
                 testAction.shardOperationOnReplica(request, indexShard);
                 testAction.shardOperationOnReplica(request, indexShard);
         CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
         CapturingActionListener<TransportResponse.Empty> listener = new CapturingActionListener<>();
-        writeReplicaResult.respond(listener);
+        result.runPostReplicaActions(ActionListener.map(listener, ignore -> TransportResponse.Empty.INSTANCE));
         assertNull(listener.response);
         assertNull(listener.response);
         assertNotNull(listener.failure);
         assertNotNull(listener.failure);
     }
     }
@@ -350,41 +347,6 @@ public class TransportWriteActionTests extends ESTestCase {
         }
         }
     }
     }
 
 
-    public void testConcurrentWriteReplicaResultCompletion() throws InterruptedException {
-        IndexShard replica = mock(IndexShard.class);
-        when(replica.getTranslogDurability()).thenReturn(Translog.Durability.ASYNC);
-        TestRequest request = new TestRequest();
-        request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
-        TransportWriteAction.WriteReplicaResult<TestRequest> replicaResult = new TransportWriteAction.WriteReplicaResult<>(
-            request, new Translog.Location(0, 0, 0), null, replica, logger);
-        CyclicBarrier barrier = new CyclicBarrier(2);
-        Runnable waitForBarrier = () -> {
-            try {
-                barrier.await();
-            } catch (InterruptedException | BrokenBarrierException e) {
-                throw new AssertionError(e);
-            }
-        };
-        CountDownLatch completionLatch = new CountDownLatch(1);
-        threadPool.generic().execute(() -> {
-            waitForBarrier.run();
-            replicaResult.respond(ActionListener.wrap(completionLatch::countDown));
-        });
-        if (randomBoolean()) {
-            threadPool.generic().execute(() -> {
-                waitForBarrier.run();
-                replicaResult.onFailure(null);
-            });
-        } else {
-            threadPool.generic().execute(() -> {
-                waitForBarrier.run();
-                replicaResult.onSuccess(false);
-            });
-        }
-
-        assertTrue(completionLatch.await(30, TimeUnit.SECONDS));
-    }
-
     private class TestAction extends TransportWriteAction<TestRequest, TestRequest, TestResponse> {
     private class TestAction extends TransportWriteAction<TestRequest, TestRequest, TestResponse> {
 
 
         private final boolean withDocumentFailureOnPrimary;
         private final boolean withDocumentFailureOnPrimary;

+ 27 - 0
server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncIT.java

@@ -259,4 +259,31 @@ public class GlobalCheckpointSyncIT extends ESIntegTestCase {
             }
             }
         });
         });
     }
     }
+
+    public void testPersistLocalCheckpoint() {
+        internalCluster().ensureAtLeastNumDataNodes(2);
+        Settings.Builder indexSettings = Settings.builder()
+            .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "10m")
+            .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST)
+            .put("index.number_of_shards", 1)
+            .put("index.number_of_replicas", randomIntBetween(0, 1));
+        prepareCreate("test", indexSettings).get();
+        ensureGreen("test");
+        int numDocs = randomIntBetween(1, 20);
+        logger.info("numDocs {}", numDocs);
+        long maxSeqNo = 0;
+        for (int i = 0; i < numDocs; i++) {
+            maxSeqNo = client().prepareIndex("test").setId(Integer.toString(i)).setSource("{}", XContentType.JSON).get().getSeqNo();
+            logger.info("got {}", maxSeqNo);
+        }
+        for (IndicesService indicesService : internalCluster().getDataNodeInstances(IndicesService.class)) {
+            for (IndexService indexService : indicesService) {
+                for (IndexShard shard : indexService) {
+                    final SeqNoStats seqNoStats = shard.seqNoStats();
+                    assertThat(maxSeqNo, equalTo(seqNoStats.getMaxSeqNo()));
+                    assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));;
+                }
+            }
+        }
+    }
 }
 }

+ 1 - 1
server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java

@@ -154,7 +154,7 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
         verify(indexShard).persistRetentionLeases();
         verify(indexShard).persistRetentionLeases();
         // the result should indicate success
         // the result should indicate success
         final AtomicBoolean success = new AtomicBoolean();
         final AtomicBoolean success = new AtomicBoolean();
-        result.respond(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString())));
+        result.runPostReplicaActions(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString())));
         assertTrue(success.get());
         assertTrue(success.get());
     }
     }
 
 

+ 1 - 1
server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java

@@ -149,7 +149,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
         verify(indexShard).persistRetentionLeases();
         verify(indexShard).persistRetentionLeases();
         // the result should indicate success
         // the result should indicate success
         final AtomicBoolean success = new AtomicBoolean();
         final AtomicBoolean success = new AtomicBoolean();
-        result.respond(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString())));
+        result.runPostReplicaActions(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString())));
         assertTrue(success.get());
         assertTrue(success.get());
     }
     }
 
 

+ 15 - 5
test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

@@ -601,14 +601,23 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
         public void execute() {
         public void execute() {
             try {
             try {
                 new ReplicationOperation<>(request, new PrimaryRef(),
                 new ReplicationOperation<>(request, new PrimaryRef(),
-                    ActionListener.wrap(result -> result.respond(listener), listener::onFailure), new ReplicasRef(), logger, opType,
-                    primaryTerm).execute();
+                    ActionListener.map(listener, result -> {
+                        adaptResponse(result.finalResponse, getPrimaryShard());
+                        return result.finalResponse;
+                    }),
+                    new ReplicasRef(), logger, opType, primaryTerm)
+                    .execute();
             } catch (Exception e) {
             } catch (Exception e) {
                 listener.onFailure(e);
                 listener.onFailure(e);
             }
             }
         }
         }
 
 
-        IndexShard getPrimaryShard() {
+        // to be overridden by subclasses
+        protected void adaptResponse(Response response, IndexShard indexShard) {
+
+        }
+
+        protected IndexShard getPrimaryShard() {
             return replicationTargets.primary;
             return replicationTargets.primary;
         }
         }
 
 
@@ -731,8 +740,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
                 finalResponse.setShardInfo(shardInfo);
                 finalResponse.setShardInfo(shardInfo);
             }
             }
 
 
-            public void respond(ActionListener<Response> listener) {
-                listener.onResponse(finalResponse);
+            @Override
+            public void runPostReplicationActions(ActionListener<Void> listener) {
+                listener.onResponse(null);
             }
             }
         }
         }
 
 

+ 11 - 21
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java

@@ -102,7 +102,7 @@ public class TransportBulkShardOperationsAction
     }
     }
 
 
     // public for testing purposes only
     // public for testing purposes only
-    public static CcrWritePrimaryResult shardOperationOnPrimary(
+    public static WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
             final ShardId shardId,
             final ShardId shardId,
             final String historyUUID,
             final String historyUUID,
             final List<Translog.Operation> sourceOperations,
             final List<Translog.Operation> sourceOperations,
@@ -154,7 +154,7 @@ public class TransportBulkShardOperationsAction
         }
         }
         final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(
         final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(
             shardId, historyUUID, appliedOperations, maxSeqNoOfUpdatesOrDeletes);
             shardId, historyUUID, appliedOperations, maxSeqNoOfUpdatesOrDeletes);
-        return new CcrWritePrimaryResult(replicaRequest, location, primary, logger);
+        return new WritePrimaryResult<>(replicaRequest, new BulkShardOperationsResponse(), location, null, primary, logger);
     }
     }
 
 
     @Override
     @Override
@@ -190,26 +190,16 @@ public class TransportBulkShardOperationsAction
         return new BulkShardOperationsResponse(in);
         return new BulkShardOperationsResponse(in);
     }
     }
 
 
-    /**
-     * Custom write result to include global checkpoint after ops have been replicated.
-     */
-    static final class CcrWritePrimaryResult extends WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> {
-        CcrWritePrimaryResult(BulkShardOperationsRequest request, Translog.Location location, IndexShard primary, Logger logger) {
-            super(request, new BulkShardOperationsResponse(), location, null, primary, logger);
-        }
-
-        @Override
-        public synchronized void respond(ActionListener<BulkShardOperationsResponse> listener) {
-            final ActionListener<BulkShardOperationsResponse> wrappedListener = ActionListener.wrap(response -> {
-                final SeqNoStats seqNoStats = primary.seqNoStats();
-                // return a fresh global checkpoint after the operations have been replicated for the shard follow task
-                response.setGlobalCheckpoint(seqNoStats.getGlobalCheckpoint());
-                response.setMaxSeqNo(seqNoStats.getMaxSeqNo());
-                listener.onResponse(response);
-            }, listener::onFailure);
-            super.respond(wrappedListener);
-        }
+    @Override
+    protected void adaptResponse(BulkShardOperationsResponse response, IndexShard indexShard) {
+        adaptBulkShardOperationsResponse(response, indexShard);
+    }
 
 
+    public static void adaptBulkShardOperationsResponse(BulkShardOperationsResponse response, IndexShard indexShard) {
+        final SeqNoStats seqNoStats = indexShard.seqNoStats();
+        // return a fresh global checkpoint after the operations have been replicated for the shard follow task
+        response.setGlobalCheckpoint(seqNoStats.getGlobalCheckpoint());
+        response.setMaxSeqNo(seqNoStats.getMaxSeqNo());
     }
     }
 
 
 }
 }

+ 24 - 16
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

@@ -18,6 +18,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.replication.ReplicationResponse;
 import org.elasticsearch.action.support.replication.ReplicationResponse;
 import org.elasticsearch.action.support.replication.TransportWriteAction;
 import org.elasticsearch.action.support.replication.TransportWriteAction;
+import org.elasticsearch.action.support.replication.TransportWriteActionTestHelper;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.RecoverySource;
@@ -67,6 +68,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 import java.util.function.BiConsumer;
@@ -668,26 +670,32 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
 
 
         @Override
         @Override
         protected void performOnPrimary(IndexShard primary, BulkShardOperationsRequest request, ActionListener<PrimaryResult> listener) {
         protected void performOnPrimary(IndexShard primary, BulkShardOperationsRequest request, ActionListener<PrimaryResult> listener) {
-            ActionListener.completeWith(listener, () -> {
-                final PlainActionFuture<Releasable> permitFuture = new PlainActionFuture<>();
-                primary.acquirePrimaryOperationPermit(permitFuture, ThreadPool.Names.SAME, request);
-                final TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> ccrResult;
-                try (Releasable ignored = permitFuture.get()) {
-                    ccrResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getHistoryUUID(),
-                        request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
-                }
-                return new PrimaryResult(ccrResult.replicaRequest(), ccrResult.finalResponseIfSuccessful) {
-                    @Override
-                    public void respond(ActionListener<BulkShardOperationsResponse> listener) {
-                        ccrResult.respond(listener);
-                    }
-                };
-            });
+            final PlainActionFuture<Releasable> permitFuture = new PlainActionFuture<>();
+            primary.acquirePrimaryOperationPermit(permitFuture, ThreadPool.Names.SAME, request);
+            final TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> ccrResult;
+            try (Releasable ignored = permitFuture.get()) {
+                ccrResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getHistoryUUID(),
+                    request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
+                TransportWriteActionTestHelper.performPostWriteActions(primary, request, ccrResult.location, logger);
+            } catch (InterruptedException | ExecutionException | IOException e) {
+                throw new AssertionError(e);
+            }
+            listener.onResponse(new PrimaryResult(ccrResult.replicaRequest(), ccrResult.finalResponseIfSuccessful));
+        }
+
+        @Override
+        protected void adaptResponse(BulkShardOperationsResponse response, IndexShard indexShard) {
+            TransportBulkShardOperationsAction.adaptBulkShardOperationsResponse(response, indexShard);
         }
         }
 
 
         @Override
         @Override
         protected void performOnReplica(BulkShardOperationsRequest request, IndexShard replica) throws Exception {
         protected void performOnReplica(BulkShardOperationsRequest request, IndexShard replica) throws Exception {
-            TransportBulkShardOperationsAction.shardOperationOnReplica(request, replica, logger);
+            try (Releasable ignored = PlainActionFuture.get(f -> replica.acquireReplicaOperationPermit(
+                getPrimaryShard().getPendingPrimaryTerm(), getPrimaryShard().getLastKnownGlobalCheckpoint(),
+                getPrimaryShard().getMaxSeqNoOfUpdatesOrDeletes(), f, ThreadPool.Names.SAME, request))) {
+                Translog.Location location = TransportBulkShardOperationsAction.shardOperationOnReplica(request, replica, logger).location;
+                TransportWriteActionTestHelper.performPostWriteActions(replica, request, location, logger);
+            }
         }
         }
     }
     }
 
 

+ 4 - 3
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java

@@ -33,7 +33,6 @@ import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
 import static java.util.Collections.emptySet;
 import static org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction.rewriteOperationWithPrimaryTerm;
 import static org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction.rewriteOperationWithPrimaryTerm;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.equalTo;
-import static org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction.CcrWritePrimaryResult;
 
 
 public class BulkShardOperationsTests extends IndexShardTestCase {
 public class BulkShardOperationsTests extends IndexShardTestCase {
 
 
@@ -124,7 +123,8 @@ public class BulkShardOperationsTests extends IndexShardTestCase {
         Randomness.shuffle(firstBulk);
         Randomness.shuffle(firstBulk);
         Randomness.shuffle(secondBulk);
         Randomness.shuffle(secondBulk);
         oldPrimary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno);
         oldPrimary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno);
-        final CcrWritePrimaryResult fullResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(oldPrimary.shardId(),
+        final TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> fullResult =
+            TransportBulkShardOperationsAction.shardOperationOnPrimary(oldPrimary.shardId(),
             oldPrimary.getHistoryUUID(), firstBulk, seqno, oldPrimary, logger);
             oldPrimary.getHistoryUUID(), firstBulk, seqno, oldPrimary, logger);
         assertThat(fullResult.replicaRequest().getOperations(),
         assertThat(fullResult.replicaRequest().getOperations(),
             equalTo(firstBulk.stream().map(op -> rewriteOperationWithPrimaryTerm(op, oldPrimaryTerm)).collect(Collectors.toList())));
             equalTo(firstBulk.stream().map(op -> rewriteOperationWithPrimaryTerm(op, oldPrimaryTerm)).collect(Collectors.toList())));
@@ -138,7 +138,8 @@ public class BulkShardOperationsTests extends IndexShardTestCase {
         // The second bulk includes some operations from the first bulk which were processed already;
         // The second bulk includes some operations from the first bulk which were processed already;
         // only a subset of these operations will be included the result but with the old primary term.
         // only a subset of these operations will be included the result but with the old primary term.
         final List<Translog.Operation> existingOps = randomSubsetOf(firstBulk);
         final List<Translog.Operation> existingOps = randomSubsetOf(firstBulk);
-        final CcrWritePrimaryResult partialResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(newPrimary.shardId(),
+        final TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> partialResult =
+            TransportBulkShardOperationsAction.shardOperationOnPrimary(newPrimary.shardId(),
             newPrimary.getHistoryUUID(), Stream.concat(secondBulk.stream(), existingOps.stream()).collect(Collectors.toList()),
             newPrimary.getHistoryUUID(), Stream.concat(secondBulk.stream(), existingOps.stream()).collect(Collectors.toList()),
             seqno, newPrimary, logger);
             seqno, newPrimary, logger);
         final long newPrimaryTerm = newPrimary.getOperationPrimaryTerm();
         final long newPrimaryTerm = newPrimary.getOperationPrimaryTerm();