Bläddra i källkod

Introduce long polling for changes (#33683)

Rather than scheduling pings to the leader index when we are caught up
to the leader, this commit introduces long polling for changes. We will
fire off a request to the leader which if we are already caught up will
enter a poll on the leader side to listen for global checkpoint
changes. These polls will timeout after a default of one minute, but can
also be specified when creating the following task. We use these time
outs as a way to keep statistics up to date, to not exaggerate time
since last fetches, and to avoid pipes being broken.
Jason Tedor 7 år sedan
förälder
incheckning
770ad53978

+ 105 - 11
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

@@ -5,7 +5,9 @@
  */
 package org.elasticsearch.xpack.ccr.action;
 
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.Action;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.support.ActionFilters;
@@ -19,6 +21,7 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.seqno.SeqNoStats;
 import org.elasticsearch.index.shard.IndexShard;
@@ -36,8 +39,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.TimeoutException;
 
 import static org.elasticsearch.action.ValidateActions.addValidationError;
+import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
 
 public class ShardChangesAction extends Action<ShardChangesAction.Response> {
 
@@ -59,6 +64,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
         private int maxOperationCount;
         private ShardId shardId;
         private String expectedHistoryUUID;
+        private TimeValue pollTimeout = FollowIndexAction.DEFAULT_POLL_TIMEOUT;
         private long maxOperationSizeInBytes = FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
 
         public Request(ShardId shardId, String expectedHistoryUUID) {
@@ -102,6 +108,14 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
             return expectedHistoryUUID;
         }
 
+        public TimeValue getPollTimeout() {
+            return pollTimeout;
+        }
+
+        public void setPollTimeout(final TimeValue pollTimeout) {
+            this.pollTimeout = Objects.requireNonNull(pollTimeout, "pollTimeout");
+        }
+
         @Override
         public ActionRequestValidationException validate() {
             ActionRequestValidationException validationException = null;
@@ -126,6 +140,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
             maxOperationCount = in.readVInt();
             shardId = ShardId.readShardId(in);
             expectedHistoryUUID = in.readString();
+            pollTimeout = in.readTimeValue();
             maxOperationSizeInBytes = in.readVLong();
         }
 
@@ -136,6 +151,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
             out.writeVInt(maxOperationCount);
             shardId.writeTo(out);
             out.writeString(expectedHistoryUUID);
+            out.writeTimeValue(pollTimeout);
             out.writeVLong(maxOperationSizeInBytes);
         }
 
@@ -149,12 +165,13 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
                     maxOperationCount == request.maxOperationCount &&
                     Objects.equals(shardId, request.shardId) &&
                     Objects.equals(expectedHistoryUUID, request.expectedHistoryUUID) &&
+                    Objects.equals(pollTimeout, request.pollTimeout) &&
                     maxOperationSizeInBytes == request.maxOperationSizeInBytes;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(fromSeqNo, maxOperationCount, shardId, expectedHistoryUUID, maxOperationSizeInBytes);
+            return Objects.hash(fromSeqNo, maxOperationCount, shardId, expectedHistoryUUID, pollTimeout, maxOperationSizeInBytes);
         }
 
         @Override
@@ -164,6 +181,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
                     ", maxOperationCount=" + maxOperationCount +
                     ", shardId=" + shardId +
                     ", expectedHistoryUUID=" + expectedHistoryUUID +
+                    ", pollTimeout=" + pollTimeout +
                     ", maxOperationSizeInBytes=" + maxOperationSizeInBytes +
                     '}';
         }
@@ -265,19 +283,90 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
 
         @Override
         protected Response shardOperation(Request request, ShardId shardId) throws IOException {
-            IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
-            IndexShard indexShard = indexService.getShard(request.getShard().id());
-            final SeqNoStats seqNoStats =  indexShard.seqNoStats();
+            final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
+            final IndexShard indexShard = indexService.getShard(request.getShard().id());
+            final SeqNoStats seqNoStats = indexShard.seqNoStats();
             final long mappingVersion = clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();
 
             final Translog.Operation[] operations = getOperations(
                     indexShard,
                     seqNoStats.getGlobalCheckpoint(),
-                    request.fromSeqNo,
-                    request.maxOperationCount,
-                    request.expectedHistoryUUID,
-                    request.maxOperationSizeInBytes);
-            return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations);
+                    request.getFromSeqNo(),
+                    request.getMaxOperationCount(),
+                    request.getExpectedHistoryUUID(),
+                    request.getMaxOperationSizeInBytes());
+            return getResponse(mappingVersion, seqNoStats, operations);
+        }
+
+        @Override
+        protected void asyncShardOperation(
+                final Request request,
+                final ShardId shardId,
+                final ActionListener<Response> listener) throws IOException {
+            final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
+            final IndexShard indexShard = indexService.getShard(request.getShard().id());
+            final SeqNoStats seqNoStats = indexShard.seqNoStats();
+
+            if (request.getFromSeqNo() > seqNoStats.getGlobalCheckpoint()) {
+                logger.trace(
+                        "{} waiting for global checkpoint advancement from [{}] to [{}]",
+                        shardId,
+                        seqNoStats.getGlobalCheckpoint(),
+                        request.getFromSeqNo());
+                indexShard.addGlobalCheckpointListener(
+                        request.getFromSeqNo(),
+                        (g, e) -> {
+                            if (g != UNASSIGNED_SEQ_NO) {
+                                assert request.getFromSeqNo() <= g
+                                        : shardId + " only advanced to [" + g + "] while waiting for [" + request.getFromSeqNo() + "]";
+                                globalCheckpointAdvanced(shardId, g, request, listener);
+                            } else {
+                                assert e != null;
+                                globalCheckpointAdvancementFailure(shardId, e, request, listener, indexShard);
+                            }
+                        },
+                        request.getPollTimeout());
+            } else {
+                super.asyncShardOperation(request, shardId, listener);
+            }
+        }
+
+        private void globalCheckpointAdvanced(
+                final ShardId shardId,
+                final long globalCheckpoint,
+                final Request request,
+                final ActionListener<Response> listener) {
+            logger.trace("{} global checkpoint advanced to [{}] after waiting for [{}]", shardId, globalCheckpoint, request.getFromSeqNo());
+            try {
+                super.asyncShardOperation(request, shardId, listener);
+            } catch (final IOException caught) {
+                listener.onFailure(caught);
+            }
+        }
+
+        private void globalCheckpointAdvancementFailure(
+                final ShardId shardId,
+                final Exception e,
+                final Request request,
+                final ActionListener<Response> listener,
+                final IndexShard indexShard) {
+            logger.trace(
+                    () -> new ParameterizedMessage(
+                            "{} exception waiting for global checkpoint advancement to [{}]", shardId, request.getFromSeqNo()),
+                    e);
+            if (e instanceof TimeoutException) {
+                try {
+                    final long mappingVersion =
+                            clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();
+                    final SeqNoStats latestSeqNoStats = indexShard.seqNoStats();
+                    listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, EMPTY_OPERATIONS_ARRAY));
+                } catch (final Exception caught) {
+                    caught.addSuppressed(e);
+                    listener.onFailure(caught);
+                }
+            } else {
+                listener.onFailure(e);
+            }
         }
 
         @Override
@@ -300,7 +389,7 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
 
     }
 
-    private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];
+    static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];
 
     /**
      * Returns at most maxOperationCount operations from the specified from sequence number.
@@ -324,7 +413,8 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
                 historyUUID + "]");
         }
         if (fromSeqNo > globalCheckpoint) {
-            return EMPTY_OPERATIONS_ARRAY;
+            throw new IllegalStateException(
+                    "not exposing operations from [" + fromSeqNo + "] greater than the global checkpoint [" + globalCheckpoint + "]");
         }
         int seenBytes = 0;
         // - 1 is needed, because toSeqNo is inclusive
@@ -344,4 +434,8 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
         return operations.toArray(EMPTY_OPERATIONS_ARRAY);
     }
 
+    static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats, final Translog.Operation[] operations) {
+        return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations);
+    }
+
 }

+ 13 - 17
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

@@ -50,8 +50,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
 
     private final String leaderIndex;
     private final ShardFollowTask params;
+    private final TimeValue pollTimeout;
     private final TimeValue maxRetryDelay;
-    private final TimeValue idleShardChangesRequestDelay;
     private final BiConsumer<TimeValue, Runnable> scheduler;
     private final LongSupplier relativeTimeProvider;
 
@@ -82,8 +82,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
         this.params = params;
         this.scheduler = scheduler;
         this.relativeTimeProvider = relativeTimeProvider;
+        this.pollTimeout = params.getPollTimeout();
         this.maxRetryDelay = params.getMaxRetryDelay();
-        this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay();
         /*
          * We keep track of the most recent fetch exceptions, with the number of exceptions that we track equal to the maximum number of
          * concurrent fetches. For each failed fetch, we track the from sequence number associated with the request, and we clear the entry
@@ -229,12 +229,16 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
         }
         innerSendShardChangesRequest(from, maxOperationCount,
                 response -> {
-                    synchronized (ShardFollowNodeTask.this) {
-                        totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
-                        numberOfSuccessfulFetches++;
-                        fetchExceptions.remove(from);
-                        operationsReceived += response.getOperations().length;
-                        totalTransferredBytes += Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum();
+                    if (response.getOperations().length > 0) {
+                        // do not count polls against fetch stats
+                        synchronized (ShardFollowNodeTask.this) {
+                            totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
+                            numberOfSuccessfulFetches++;
+                            fetchExceptions.remove(from);
+                            operationsReceived += response.getOperations().length;
+                            totalTransferredBytes +=
+                                    Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum();
+                        }
                     }
                     handleReadResponse(from, maxRequiredSeqNo, response);
                 },
@@ -286,15 +290,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
         } else {
             // read is completed, decrement
             numConcurrentReads--;
-            if (response.getOperations().length == 0 && leaderGlobalCheckpoint == lastRequestedSeqNo)  {
-                // we got nothing and we have no reason to believe asking again well get us more, treat shard as idle and delay
-                // future requests
-                LOGGER.trace("{} received no ops and no known ops to fetch, scheduling to coordinate reads",
-                    params.getFollowShardId());
-                scheduler.accept(idleShardChangesRequestDelay, this::coordinateReads);
-            } else {
-                coordinateReads();
-            }
+            coordinateReads();
         }
     }
 

+ 24 - 24
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java

@@ -49,7 +49,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
     public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches");
     public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
     public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay");
-    public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay");
+    public static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout");
     public static final ParseField RECORDED_HISTORY_UUID = new ParseField("recorded_history_uuid");
 
     @SuppressWarnings("unchecked")
@@ -75,8 +75,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
             (p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()),
             MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
         PARSER.declareField(ConstructingObjectParser.constructorArg(),
-            (p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()),
-            IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
+            (p, c) -> TimeValue.parseTimeValue(p.text(), POLL_TIMEOUT.getPreferredName()),
+                POLL_TIMEOUT, ObjectParser.ValueType.STRING);
         PARSER.declareString(ConstructingObjectParser.constructorArg(), RECORDED_HISTORY_UUID);
         PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS);
     }
@@ -90,23 +90,23 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
     private final int maxConcurrentWriteBatches;
     private final int maxWriteBufferSize;
     private final TimeValue maxRetryDelay;
-    private final TimeValue idleShardRetryDelay;
+    private final TimeValue pollTimeout;
     private final String recordedLeaderIndexHistoryUUID;
     private final Map<String, String> headers;
 
     ShardFollowTask(
-                    String leaderClusterAlias,
-                    ShardId followShardId,
-                    ShardId leaderShardId,
-                    int maxBatchOperationCount,
-                    int maxConcurrentReadBatches,
-                    long maxBatchSizeInBytes,
-                    int maxConcurrentWriteBatches,
-                    int maxWriteBufferSize,
-                    TimeValue maxRetryDelay,
-                    TimeValue idleShardRetryDelay,
-                    String recordedLeaderIndexHistoryUUID,
-                    Map<String, String> headers) {
+            final String leaderClusterAlias,
+            final ShardId followShardId,
+            final ShardId leaderShardId,
+            final int maxBatchOperationCount,
+            final int maxConcurrentReadBatches,
+            final long maxBatchSizeInBytes,
+            final int maxConcurrentWriteBatches,
+            final int maxWriteBufferSize,
+            final TimeValue maxRetryDelay,
+            final TimeValue pollTimeout,
+            final String recordedLeaderIndexHistoryUUID,
+            final Map<String, String> headers) {
         this.leaderClusterAlias = leaderClusterAlias;
         this.followShardId = followShardId;
         this.leaderShardId = leaderShardId;
@@ -116,7 +116,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
         this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
         this.maxWriteBufferSize = maxWriteBufferSize;
         this.maxRetryDelay = maxRetryDelay;
-        this.idleShardRetryDelay = idleShardRetryDelay;
+        this.pollTimeout = pollTimeout;
         this.recordedLeaderIndexHistoryUUID = recordedLeaderIndexHistoryUUID;
         this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap();
     }
@@ -131,7 +131,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
         this.maxConcurrentWriteBatches = in.readVInt();
         this.maxWriteBufferSize = in.readVInt();
         this.maxRetryDelay = in.readTimeValue();
-        this.idleShardRetryDelay = in.readTimeValue();
+        this.pollTimeout = in.readTimeValue();
         this.recordedLeaderIndexHistoryUUID = in.readString();
         this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
     }
@@ -172,8 +172,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
         return maxRetryDelay;
     }
 
-    public TimeValue getIdleShardRetryDelay() {
-        return idleShardRetryDelay;
+    public TimeValue getPollTimeout() {
+        return pollTimeout;
     }
 
     public String getTaskId() {
@@ -204,7 +204,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
         out.writeVInt(maxConcurrentWriteBatches);
         out.writeVInt(maxWriteBufferSize);
         out.writeTimeValue(maxRetryDelay);
-        out.writeTimeValue(idleShardRetryDelay);
+        out.writeTimeValue(pollTimeout);
         out.writeString(recordedLeaderIndexHistoryUUID);
         out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
     }
@@ -231,7 +231,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
         builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
         builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
         builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
-        builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
+        builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep());
         builder.field(RECORDED_HISTORY_UUID.getPreferredName(), recordedLeaderIndexHistoryUUID);
         builder.field(HEADERS.getPreferredName(), headers);
         return builder.endObject();
@@ -251,7 +251,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
                 maxBatchSizeInBytes == that.maxBatchSizeInBytes &&
                 maxWriteBufferSize == that.maxWriteBufferSize &&
                 Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
-                Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay) &&
+                Objects.equals(pollTimeout, that.pollTimeout) &&
                 Objects.equals(recordedLeaderIndexHistoryUUID, that.recordedLeaderIndexHistoryUUID) &&
                 Objects.equals(headers, that.headers);
     }
@@ -268,7 +268,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
             maxBatchSizeInBytes,
             maxWriteBufferSize,
             maxRetryDelay,
-            idleShardRetryDelay,
+            pollTimeout,
             recordedLeaderIndexHistoryUUID,
             headers
         );

+ 1 - 0
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

@@ -148,6 +148,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
                 request.setFromSeqNo(from);
                 request.setMaxOperationCount(maxOperationCount);
                 request.setMaxOperationSizeInBytes(params.getMaxBatchSizeInBytes());
+                request.setPollTimeout(params.getPollTimeout());
                 leaderClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler));
             }
         };

+ 1 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java

@@ -188,7 +188,7 @@ public class TransportFollowIndexAction extends HandledTransportAction<FollowInd
                     request.getMaxConcurrentWriteBatches(),
                     request.getMaxWriteBufferSize(),
                     request.getMaxRetryDelay(),
-                    request.getIdleShardRetryDelay(),
+                    request.getPollTimeout(),
                     recordedLeaderShardHistoryUUID,
                     filteredHeaders);
             persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,

+ 1 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java

@@ -168,7 +168,7 @@ public class AutoFollowTests extends ESSingleNodeTestCase {
                 assertThat(shardFollowTask.getMaxRetryDelay(), equalTo(request.getMaxRetryDelay()));
             }
             if (request.getIdleShardRetryDelay() != null) {
-                assertThat(shardFollowTask.getIdleShardRetryDelay(), equalTo(request.getIdleShardRetryDelay()));
+                assertThat(shardFollowTask.getPollTimeout(), equalTo(request.getIdleShardRetryDelay()));
             }
         });
     }

+ 22 - 5
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java

@@ -24,12 +24,15 @@ import org.mockito.Mockito;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Locale;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasToString;
 import static org.hamcrest.Matchers.instanceOf;
 
 public class ShardChangesActionTests extends ESSingleNodeTestCase {
@@ -65,13 +68,27 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
             assertThat(seenSeqNos, equalTo(expectedSeqNos));
         }
 
-        // get operations for a range no operations exists:
-        Translog.Operation[] operations =  ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(),
-            numWrites, numWrites + 1, indexShard.getHistoryUUID(), Long.MAX_VALUE);
-        assertThat(operations.length, equalTo(0));
+        {
+            // get operations for a range for which no operations exist
+            final IllegalStateException e = expectThrows(
+                    IllegalStateException.class,
+                    () -> ShardChangesAction.getOperations(
+                            indexShard,
+                            indexShard.getGlobalCheckpoint(),
+                            numWrites,
+                            numWrites + 1,
+                            indexShard.getHistoryUUID(),
+                            Long.MAX_VALUE));
+            final String message = String.format(
+                    Locale.ROOT,
+                    "not exposing operations from [%d] greater than the global checkpoint [%d]",
+                    numWrites,
+                    indexShard.getGlobalCheckpoint());
+            assertThat(e, hasToString(containsString(message)));
+        }
 
         // get operations for a range some operations do not exist:
-        operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(),
+        Translog.Operation[] operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(),
             numWrites  - 10, numWrites + 10, indexShard.getHistoryUUID(), Long.MAX_VALUE);
         assertThat(operations.length, equalTo(10));
 

+ 69 - 25
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java

@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
@@ -58,6 +59,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
     private Queue<Long> leaderGlobalCheckpoints;
     private Queue<Long> followerGlobalCheckpoints;
     private Queue<Long> maxSeqNos;
+    private Queue<Integer> responseSizes;
 
     public void testCoordinateReads() {
         ShardFollowNodeTask task = createShardFollowTask(8, between(8, 20), between(1, 20), Integer.MAX_VALUE, Long.MAX_VALUE);
@@ -226,6 +228,69 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
     }
 
+    public void testReceiveTimeout() {
+        final ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
+        startTask(task, 63, -1);
+
+        final int numberOfTimeouts = randomIntBetween(1, 32);
+        for (int i = 0; i < numberOfTimeouts; i++) {
+            mappingVersions.add(1L);
+            leaderGlobalCheckpoints.add(63L);
+            maxSeqNos.add(63L);
+            responseSizes.add(0);
+        }
+
+        final AtomicInteger counter = new AtomicInteger();
+        beforeSendShardChangesRequest = status -> {
+            if (counter.get() <= numberOfTimeouts) {
+                assertThat(status.numberOfSuccessfulFetches(), equalTo(0L));
+                assertThat(status.totalFetchTimeMillis(), equalTo(0L));
+                assertThat(status.operationsReceived(), equalTo(0L));
+                assertThat(status.totalTransferredBytes(), equalTo(0L));
+
+                assertThat(status.fetchExceptions().entrySet(), hasSize(0));
+                assertThat(status.totalFetchTimeMillis(), equalTo(0L));
+                assertThat(status.numberOfFailedFetches(), equalTo(0L));
+            } else {
+                // otherwise we will keep looping as if we were repeatedly polling and timing out
+                simulateResponse.set(false);
+            }
+            counter.incrementAndGet();
+        };
+
+        mappingVersions.add(1L);
+        mappingVersions.add(1L);
+        leaderGlobalCheckpoints.add(63L);
+        maxSeqNos.add(63L);
+        simulateResponse.set(true);
+
+        task.coordinateReads();
+
+        // one request for each request that we simulate timedout, plus our request that receives a reply, and then a follow-up request
+        assertThat(shardChangesRequests, hasSize(1 + 1 + numberOfTimeouts));
+        for (final long[] shardChangesRequest : shardChangesRequests.subList(0, shardChangesRequests.size() - 2)) {
+            assertNotNull(shardChangesRequest);
+            assertThat(shardChangesRequest.length, equalTo(2));
+            assertThat(shardChangesRequest[0], equalTo(0L));
+            assertThat(shardChangesRequest[1], equalTo(64L));
+        }
+        final long[] lastShardChangesRequest = shardChangesRequests.get(shardChangesRequests.size() - 1);
+        assertNotNull(lastShardChangesRequest);
+        assertThat(lastShardChangesRequest.length, equalTo(2));
+        assertThat(lastShardChangesRequest[0], equalTo(64L));
+        assertThat(lastShardChangesRequest[1], equalTo(64L));
+
+        final ShardFollowNodeTaskStatus status = task.getStatus();
+        assertThat(status.numberOfSuccessfulFetches(), equalTo(1L));
+        assertThat(status.numberOfFailedFetches(), equalTo(0L));
+        assertThat(status.numberOfConcurrentReads(), equalTo(1));
+        assertThat(status.numberOfConcurrentWrites(), equalTo(1));
+        assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
+        assertThat(status.leaderMaxSeqNo(), equalTo(63L));
+
+        assertThat(counter.get(), equalTo(1 + 1 + numberOfTimeouts));
+    }
+
     public void testReceiveNonRetryableError() {
         ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
         startTask(task, 63, -1);
@@ -357,29 +422,6 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
     }
 
-    public void testDelayCoordinatesRead() {
-        int[] counter = new int[]{0};
-        scheduler = (delay, task) -> {
-            counter[0]++;
-            task.run();
-        };
-        ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
-        startTask(task, 63, -1);
-
-        task.coordinateReads();
-        assertThat(shardChangesRequests.size(), equalTo(1));
-        assertThat(shardChangesRequests.get(0)[0], equalTo(0L));
-        assertThat(shardChangesRequests.get(0)[1], equalTo(64L));
-
-        shardChangesRequests.clear();
-        ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 63L);
-        // Also invokes coordinateReads()
-        task.innerHandleReadResponse(0L, 63L, response);
-        task.innerHandleReadResponse(64L, 63L,
-            new ShardChangesAction.Response(0, 63L, 63L, new Translog.Operation[0]));
-        assertThat(counter[0], equalTo(1));
-    }
-
     public void testMappingUpdate() {
         ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE);
         startTask(task, 63, -1);
@@ -653,6 +695,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
         leaderGlobalCheckpoints = new LinkedList<>();
         followerGlobalCheckpoints = new LinkedList<>();
         maxSeqNos = new LinkedList<>();
+        responseSizes = new LinkedList<>();
         return new ShardFollowNodeTask(
                 1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) {
 
@@ -699,8 +742,9 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
                 if (readFailure != null) {
                     errorHandler.accept(readFailure);
                 } else if (simulateResponse.get()) {
-                    final Translog.Operation[] operations = new Translog.Operation[requestBatchSize];
-                    for (int i = 0; i < requestBatchSize; i++) {
+                    final int responseSize = responseSizes.size() == 0 ? requestBatchSize : responseSizes.poll();
+                    final Translog.Operation[] operations = new Translog.Operation[responseSize];
+                    for (int i = 0; i < responseSize; i++) {
                         operations[i] = new Translog.NoOp(from + i, 0, "test");
                     }
                     final ShardChangesAction.Response response = new ShardChangesAction.Response(

+ 7 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

@@ -160,6 +160,9 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
             recoverShardFromStore(leaderGroup.getPrimary());
             String newHistoryUUID = leaderGroup.getPrimary().getHistoryUUID();
 
+            // force the global checkpoint on the leader to advance
+            leaderGroup.appendDocs(64);
+
             assertBusy(() -> {
                 assertThat(shardFollowTask.isStopped(), is(true));
                 assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID +
@@ -259,6 +262,10 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
                     for (IndexShard indexShard : indexShards) {
                         try {
                             final SeqNoStats seqNoStats = indexShard.seqNoStats();
+                            if (from > seqNoStats.getGlobalCheckpoint()) {
+                                handler.accept(ShardChangesAction.getResponse(1L, seqNoStats, ShardChangesAction.EMPTY_OPERATIONS_ARRAY));
+                                return;
+                            }
                             Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from,
                                 maxOperationCount, params.getRecordedLeaderIndexHistoryUUID(), params.getMaxBatchSizeInBytes());
                             // hard code mapping version; this is ok, as mapping updates are not tested here

+ 15 - 15
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java

@@ -36,8 +36,8 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
     public static final int DEFAULT_MAX_CONCURRENT_WRITE_BATCHES = 1;
     public static final long DEFAULT_MAX_BATCH_SIZE_IN_BYTES = Long.MAX_VALUE;
     static final TimeValue DEFAULT_MAX_RETRY_DELAY = new TimeValue(500);
-    static final TimeValue DEFAULT_IDLE_SHARD_RETRY_DELAY = TimeValue.timeValueSeconds(10);
     static final TimeValue MAX_RETRY_DELAY = TimeValue.timeValueMinutes(5);
+    public static final TimeValue DEFAULT_POLL_TIMEOUT = TimeValue.timeValueMinutes(1);
 
     private FollowIndexAction() {
         super(NAME);
@@ -58,7 +58,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
         private static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches");
         private static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
         private static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay");
-        private static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay");
+        private static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout");
         private static final ConstructingObjectParser<Request, String> PARSER = new ConstructingObjectParser<>(NAME, true,
             (args, followerIndex) -> {
                 if (args[1] != null) {
@@ -83,8 +83,8 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
                     ObjectParser.ValueType.STRING);
             PARSER.declareField(
                     ConstructingObjectParser.optionalConstructorArg(),
-                    (p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()),
-                    IDLE_SHARD_RETRY_DELAY,
+                    (p, c) -> TimeValue.parseTimeValue(p.text(), POLL_TIMEOUT.getPreferredName()),
+                    POLL_TIMEOUT,
                     ObjectParser.ValueType.STRING);
         }
 
@@ -151,10 +151,10 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
             return maxRetryDelay;
         }
 
-        private TimeValue idleShardRetryDelay;
+        private TimeValue pollTimeout;
 
-        public TimeValue getIdleShardRetryDelay() {
-            return idleShardRetryDelay;
+        public TimeValue getPollTimeout() {
+            return pollTimeout;
         }
 
         public Request(
@@ -166,7 +166,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
             final Integer maxConcurrentWriteBatches,
             final Integer maxWriteBufferSize,
             final TimeValue maxRetryDelay,
-            final TimeValue idleShardRetryDelay) {
+            final TimeValue pollTimeout) {
 
             if (leaderIndex == null) {
                 throw new IllegalArgumentException(LEADER_INDEX_FIELD.getPreferredName() + " is missing");
@@ -206,7 +206,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
             }
 
             final TimeValue actualRetryTimeout = maxRetryDelay == null ? DEFAULT_MAX_RETRY_DELAY : maxRetryDelay;
-            final TimeValue actualIdleShardRetryDelay = idleShardRetryDelay == null ? DEFAULT_IDLE_SHARD_RETRY_DELAY : idleShardRetryDelay;
+            final TimeValue actualPollTimeout = pollTimeout == null ? DEFAULT_POLL_TIMEOUT : pollTimeout;
 
             this.leaderIndex = leaderIndex;
             this.followerIndex = followerIndex;
@@ -216,7 +216,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
             this.maxConcurrentWriteBatches = actualMaxConcurrentWriteBatches;
             this.maxWriteBufferSize = actualMaxWriteBufferSize;
             this.maxRetryDelay = actualRetryTimeout;
-            this.idleShardRetryDelay = actualIdleShardRetryDelay;
+            this.pollTimeout = actualPollTimeout;
         }
 
         public Request() {
@@ -252,7 +252,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
             maxConcurrentWriteBatches = in.readVInt();
             maxWriteBufferSize = in.readVInt();
             maxRetryDelay = in.readOptionalTimeValue();
-            idleShardRetryDelay = in.readOptionalTimeValue();
+            pollTimeout = in.readOptionalTimeValue();
         }
 
         @Override
@@ -266,7 +266,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
             out.writeVInt(maxConcurrentWriteBatches);
             out.writeVInt(maxWriteBufferSize);
             out.writeOptionalTimeValue(maxRetryDelay);
-            out.writeOptionalTimeValue(idleShardRetryDelay);
+            out.writeOptionalTimeValue(pollTimeout);
         }
 
         @Override
@@ -281,7 +281,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
                 builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches);
                 builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
                 builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), maxRetryDelay.getStringRep());
-                builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
+                builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep());
             }
             builder.endObject();
             return builder;
@@ -298,7 +298,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
                 maxConcurrentWriteBatches == request.maxConcurrentWriteBatches &&
                 maxWriteBufferSize == request.maxWriteBufferSize &&
                 Objects.equals(maxRetryDelay, request.maxRetryDelay) &&
-                Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay) &&
+                Objects.equals(pollTimeout, request.pollTimeout) &&
                 Objects.equals(leaderIndex, request.leaderIndex) &&
                 Objects.equals(followerIndex, request.followerIndex);
         }
@@ -314,7 +314,7 @@ public final class FollowIndexAction extends Action<AcknowledgedResponse> {
                 maxConcurrentWriteBatches,
                 maxWriteBufferSize,
                 maxRetryDelay,
-                idleShardRetryDelay
+                pollTimeout
             );
         }
     }