Browse Source

Pass index shard's primary term to Engine#addSegmentGenerationListener (#99752)

So the search engine can check that generation listeners are called on the same primary term for the search shard as well as the index shard.
Artem Prigoda 1 year ago
parent
commit
b3db1814a5

+ 5 - 0
docs/changelog/99752.yaml

@@ -0,0 +1,5 @@
+pr: 99752
+summary: Pass shard's primary term to Engine#addSegmentGenerationListener
+area: Store
+type: enhancement
+issues: []

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -147,6 +147,7 @@ public class TransportVersions {
     public static final TransportVersion UNCONTENDED_REGISTER_ANALYSIS_ADDED = def(8_522_00_0);
     public static final TransportVersion TRANSFORM_GET_CHECKPOINT_TIMEOUT_ADDED = def(8_523_00_0);
     public static final TransportVersion IP_ADDRESS_WRITEABLE = def(8_524_00_0);
+    public static final TransportVersion PRIMARY_TERM_ADDED = def(8_525_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 1 - 0
server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java

@@ -126,6 +126,7 @@ public class TransportShardRefreshAction extends TransportReplicationAction<
             } else {
                 UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
                     indexShardRoutingTable,
+                    replicaRequest.primaryRefreshResult.primaryTerm(),
                     replicaRequest.primaryRefreshResult.generation(),
                     false
                 );

+ 5 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportUnpromotableShardRefreshAction.java

@@ -60,7 +60,11 @@ public class TransportUnpromotableShardRefreshAction extends TransportBroadcastU
     ) {
         ActionListener.run(responseListener, listener -> {
             IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
-            shard.waitForSegmentGeneration(request.getSegmentGeneration(), listener.map(l -> ActionResponse.Empty.INSTANCE));
+            shard.waitForPrimaryTermAndGeneration(
+                request.getPrimaryTerm(),
+                request.getSegmentGeneration(),
+                listener.map(l -> ActionResponse.Empty.INSTANCE)
+            );
         });
     }
 

+ 21 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/refresh/UnpromotableShardRefreshRequest.java

@@ -8,9 +8,11 @@
 
 package org.elasticsearch.action.admin.indices.refresh;
 
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.support.broadcast.unpromotable.BroadcastUnpromotableRequest;
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.index.engine.Engine;
@@ -21,20 +23,26 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
 
 public class UnpromotableShardRefreshRequest extends BroadcastUnpromotableRequest {
 
+    private final long primaryTerm;
     private final long segmentGeneration;
 
     public UnpromotableShardRefreshRequest(
         IndexShardRoutingTable indexShardRoutingTable,
+        long primaryTerm,
         long segmentGeneration,
         boolean failShardOnError
     ) {
         super(indexShardRoutingTable, failShardOnError);
+        this.primaryTerm = primaryTerm;
         this.segmentGeneration = segmentGeneration;
     }
 
     public UnpromotableShardRefreshRequest(StreamInput in) throws IOException {
         super(in);
         segmentGeneration = in.readVLong();
+        primaryTerm = in.getTransportVersion().onOrAfter(TransportVersions.PRIMARY_TERM_ADDED)
+            ? in.readVLong()
+            : Engine.UNKNOWN_PRIMARY_TERM;
     }
 
     @Override
@@ -50,14 +58,26 @@ public class UnpromotableShardRefreshRequest extends BroadcastUnpromotableReques
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
         out.writeVLong(segmentGeneration);
+        if (out.getTransportVersion().onOrAfter(TransportVersions.PRIMARY_TERM_ADDED)) {
+            out.writeVLong(primaryTerm);
+        }
     }
 
     public long getSegmentGeneration() {
         return segmentGeneration;
     }
 
+    public long getPrimaryTerm() {
+        return primaryTerm;
+    }
+
     @Override
     public String toString() {
-        return "UnpromotableShardRefreshRequest{" + "shardId=" + shardId() + ", segmentGeneration=" + segmentGeneration + '}';
+        return Strings.format(
+            "UnpromotableShardRefreshRequest{shardId=%s, primaryTerm=%d, segmentGeneration=%d}",
+            shardId(),
+            primaryTerm,
+            segmentGeneration
+        );
     }
 }

+ 4 - 1
server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

@@ -27,6 +27,7 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.index.IndexService;
+import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.get.GetResult;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
@@ -213,7 +214,9 @@ public class TransportGetAction extends TransportSingleShardAction<GetRequest, G
                             ActionRunnable.supply(l, () -> shardOperation(request, shardId)).run();
                         } else {
                             assert r.segmentGeneration() > -1L;
-                            indexShard.waitForSegmentGeneration(
+                            assert r.primaryTerm() > Engine.UNKNOWN_PRIMARY_TERM;
+                            indexShard.waitForPrimaryTermAndGeneration(
+                                r.primaryTerm(),
                                 r.segmentGeneration(),
                                 listener.delegateFailureAndWrap((ll, aLong) -> super.asyncShardOperation(request, shardId, ll))
                             );

+ 27 - 6
server/src/main/java/org/elasticsearch/action/get/TransportGetFromTranslogAction.java

@@ -10,6 +10,7 @@ package org.elasticsearch.action.get;
 
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionRequestValidationException;
@@ -18,6 +19,7 @@ import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -81,7 +83,7 @@ public class TransportGetFromTranslogAction extends HandledTransportAction<
                 }
                 segmentGeneration = ((InternalEngine) engine).getLastUnsafeSegmentGenerationForGets();
             }
-            return new Response(result, segmentGeneration);
+            return new Response(result, indexShard.getOperationPrimaryTerm(), segmentGeneration);
         });
     }
 
@@ -140,23 +142,31 @@ public class TransportGetFromTranslogAction extends HandledTransportAction<
     public static class Response extends ActionResponse {
         @Nullable
         private final GetResult getResult;
+        private final long primaryTerm;
         private final long segmentGeneration;
 
-        public Response(GetResult getResult, long segmentGeneration) {
+        public Response(GetResult getResult, long primaryTerm, long segmentGeneration) {
             this.getResult = getResult;
             this.segmentGeneration = segmentGeneration;
+            this.primaryTerm = primaryTerm;
         }
 
         public Response(StreamInput in) throws IOException {
             super(in);
             segmentGeneration = in.readZLong();
             getResult = in.readOptionalWriteable(GetResult::new);
+            primaryTerm = in.getTransportVersion().onOrAfter(TransportVersions.PRIMARY_TERM_ADDED)
+                ? in.readVLong()
+                : Engine.UNKNOWN_PRIMARY_TERM;
         }
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeZLong(segmentGeneration);
             out.writeOptionalWriteable(getResult);
+            if (out.getTransportVersion().onOrAfter(TransportVersions.PRIMARY_TERM_ADDED)) {
+                out.writeVLong(primaryTerm);
+            }
         }
 
         @Nullable
@@ -173,22 +183,33 @@ public class TransportGetFromTranslogAction extends HandledTransportAction<
             return segmentGeneration;
         }
 
+        public long primaryTerm() {
+            return primaryTerm;
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) return true;
             if (o instanceof Response == false) return false;
-            Response other = (Response) o;
-            return segmentGeneration == other.segmentGeneration && Objects.equals(getResult, other.getResult);
+            Response response = (Response) o;
+            return segmentGeneration == response.segmentGeneration
+                && Objects.equals(getResult, response.getResult)
+                && primaryTerm == response.primaryTerm;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(segmentGeneration, getResult);
+            return Objects.hash(segmentGeneration, getResult, primaryTerm);
         }
 
         @Override
         public String toString() {
-            return "Response{" + "getResult=" + getResult + ", segmentGeneration=" + segmentGeneration + "}";
+            return Strings.format(
+                "Response{getResult=%s, primaryTerm=%d, segmentGeneration=%d}",
+                getResult,
+                primaryTerm,
+                segmentGeneration
+            );
         }
     }
 }

+ 4 - 1
server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java

@@ -28,6 +28,7 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.index.IndexService;
+import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.get.GetResult;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
@@ -203,7 +204,9 @@ public class TransportShardMultiGetAction extends TransportSingleShardAction<Mul
                             ActionRunnable.supply(l, () -> handleLocalGets(request, r.multiGetShardResponse(), shardId)).run();
                         } else {
                             assert r.segmentGeneration() > -1L;
-                            indexShard.waitForSegmentGeneration(
+                            assert r.primaryTerm() > Engine.UNKNOWN_PRIMARY_TERM;
+                            indexShard.waitForPrimaryTermAndGeneration(
+                                r.primaryTerm(),
                                 r.segmentGeneration(),
                                 listener.delegateFailureAndWrap(
                                     (ll, aLong) -> getExecutor(request, shardId).execute(

+ 27 - 11
server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetFomTranslogAction.java

@@ -9,6 +9,7 @@
 package org.elasticsearch.action.get;
 
 import org.apache.lucene.store.AlreadyClosedException;
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionRequestValidationException;
@@ -16,6 +17,7 @@ import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.action.support.TransportActions;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.index.IndexService;
@@ -102,7 +104,7 @@ public class TransportShardMultiGetFomTranslogAction extends HandledTransportAct
                 }
                 segmentGeneration = ((InternalEngine) engine).getLastUnsafeSegmentGenerationForGets();
             }
-            return new Response(multiGetShardResponse, segmentGeneration);
+            return new Response(multiGetShardResponse, indexShard.getOperationPrimaryTerm(), segmentGeneration);
         });
     }
 
@@ -164,9 +166,11 @@ public class TransportShardMultiGetFomTranslogAction extends HandledTransportAct
     public static class Response extends ActionResponse {
 
         private final MultiGetShardResponse multiGetShardResponse;
+        private final long primaryTerm;
         private final long segmentGeneration;
 
-        public Response(MultiGetShardResponse response, long segmentGeneration) {
+        public Response(MultiGetShardResponse response, long primaryTerm, long segmentGeneration) {
+            this.primaryTerm = primaryTerm;
             this.segmentGeneration = segmentGeneration;
             this.multiGetShardResponse = response;
         }
@@ -175,43 +179,55 @@ public class TransportShardMultiGetFomTranslogAction extends HandledTransportAct
             super(in);
             segmentGeneration = in.readZLong();
             multiGetShardResponse = new MultiGetShardResponse(in);
+            primaryTerm = in.getTransportVersion().onOrAfter(TransportVersions.PRIMARY_TERM_ADDED)
+                ? in.readVLong()
+                : Engine.UNKNOWN_PRIMARY_TERM;
         }
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeZLong(segmentGeneration);
             multiGetShardResponse.writeTo(out);
+            if (out.getTransportVersion().onOrAfter(TransportVersions.PRIMARY_TERM_ADDED)) {
+                out.writeVLong(primaryTerm);
+            }
         }
 
         public long segmentGeneration() {
             return segmentGeneration;
         }
 
+        public long primaryTerm() {
+            return primaryTerm;
+        }
+
         public MultiGetShardResponse multiGetShardResponse() {
             return multiGetShardResponse;
         }
 
         @Override
         public String toString() {
-            return "ShardMultiGetFomTranslogResponse{"
-                + "multiGetShardResponse="
-                + multiGetShardResponse
-                + ", segmentGeneration="
-                + segmentGeneration
-                + "}";
+            return Strings.format(
+                "ShardMultiGetFomTranslogResponse{multiGetShardResponse=%s, primaryTerm=%d, segmentGeneration=%d}",
+                multiGetShardResponse,
+                primaryTerm,
+                segmentGeneration
+            );
         }
 
         @Override
         public boolean equals(Object o) {
             if (this == o) return true;
             if (o instanceof Response == false) return false;
-            Response other = (Response) o;
-            return segmentGeneration == other.segmentGeneration && Objects.equals(multiGetShardResponse, other.multiGetShardResponse);
+            Response response = (Response) o;
+            return segmentGeneration == response.segmentGeneration
+                && Objects.equals(multiGetShardResponse, response.multiGetShardResponse)
+                && primaryTerm == response.primaryTerm;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(segmentGeneration, multiGetShardResponse);
+            return Objects.hash(segmentGeneration, multiGetShardResponse, primaryTerm);
         }
     }
 }

+ 1 - 0
server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java

@@ -146,6 +146,7 @@ public class PostWriteRefresh {
     ) {
         UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
             indexShard.getReplicationGroup().getRoutingTable(),
+            indexShard.getOperationPrimaryTerm(),
             generation,
             true
         );

+ 2 - 2
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -2141,13 +2141,13 @@ public abstract class Engine implements Closeable {
      * <code>refreshed</code> is true if a refresh happened. If refreshed, <code>generation</code>
      * contains the generation of the index commit that the reader has opened upon refresh.
      */
-    public record RefreshResult(boolean refreshed, long generation) {
+    public record RefreshResult(boolean refreshed, long primaryTerm, long generation) {
 
         public static final long UNKNOWN_GENERATION = -1L;
         public static final RefreshResult NO_REFRESH = new RefreshResult(false);
 
         public RefreshResult(boolean refreshed) {
-            this(refreshed, UNKNOWN_GENERATION);
+            this(refreshed, UNKNOWN_PRIMARY_TERM, UNKNOWN_GENERATION);
         }
     }
 

+ 2 - 1
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -2075,7 +2075,8 @@ public class InternalEngine extends Engine {
         // for a long time:
         maybePruneDeletes();
         mergeScheduler.refreshConfig();
-        return new RefreshResult(refreshed, segmentGeneration);
+        long primaryTerm = config().getPrimaryTermSupplier().getAsLong();
+        return new RefreshResult(refreshed, primaryTerm, segmentGeneration);
     }
 
     @Override

+ 20 - 7
server/src/test/java/org/elasticsearch/action/get/GetFromTranslogResponseSerializationTests.java

@@ -24,26 +24,39 @@ public class GetFromTranslogResponseSerializationTests extends AbstractWireSeria
 
     @Override
     protected TransportGetFromTranslogAction.Response createTestInstance() {
-        return new TransportGetFromTranslogAction.Response(randomGetResult(), randomSegmentGeneration());
+        return new TransportGetFromTranslogAction.Response(randomGetResult(), randomPrimaryTerm(), randomSegmentGeneration());
     }
 
     @Override
     protected TransportGetFromTranslogAction.Response mutateInstance(TransportGetFromTranslogAction.Response instance) throws IOException {
-        return randomBoolean()
-            ? new TransportGetFromTranslogAction.Response(
-                instance.getResult(),
-                randomValueOtherThan(instance.segmentGeneration(), this::randomSegmentGeneration)
-            )
-            : new TransportGetFromTranslogAction.Response(
+        return switch (randomInt(2)) {
+            case 0 -> new TransportGetFromTranslogAction.Response(
                 randomValueOtherThan(instance.getResult(), this::randomGetResult),
+                instance.primaryTerm(),
+                instance.segmentGeneration()
+            );
+            case 1 -> new TransportGetFromTranslogAction.Response(
+                instance.getResult(),
+                randomValueOtherThan(instance.primaryTerm(), this::randomPrimaryTerm),
                 instance.segmentGeneration()
             );
+            case 2 -> new TransportGetFromTranslogAction.Response(
+                instance.getResult(),
+                instance.primaryTerm(),
+                randomValueOtherThan(instance.segmentGeneration(), this::randomSegmentGeneration)
+            );
+            default -> randomValueOtherThan(instance, this::createTestInstance);
+        };
     }
 
     private long randomSegmentGeneration() {
         return randomBoolean() ? -1L : randomNonNegativeLong();
     }
 
+    private long randomPrimaryTerm() {
+        return randomNonNegativeLong();
+    }
+
     private GetResult randomGetResult() {
         return randomBoolean() ? null : GetResultTests.randomGetResult(randomFrom(XContentType.values())).v1();
     }

+ 20 - 7
server/src/test/java/org/elasticsearch/action/get/ShardMultiGetFromTranslogResponseSerializationTests.java

@@ -28,26 +28,39 @@ public class ShardMultiGetFromTranslogResponseSerializationTests extends Abstrac
 
     @Override
     protected Response createTestInstance() {
-        return new Response(randomMultiGetShardResponse(), randomSegmentGeneration());
+        return new Response(randomMultiGetShardResponse(), randomPrimaryTerm(), randomSegmentGeneration());
     }
 
     @Override
     protected Response mutateInstance(Response instance) throws IOException {
-        return randomBoolean()
-            ? new Response(
-                instance.multiGetShardResponse(),
-                randomValueOtherThan(instance.segmentGeneration(), this::randomSegmentGeneration)
-            )
-            : new Response(
+        return switch (randomInt(2)) {
+            case 0 -> new Response(
                 randomValueOtherThan(instance.multiGetShardResponse(), this::randomMultiGetShardResponse),
+                instance.primaryTerm(),
+                instance.segmentGeneration()
+            );
+            case 1 -> new Response(
+                instance.multiGetShardResponse(),
+                randomValueOtherThan(instance.primaryTerm(), this::randomPrimaryTerm),
                 instance.segmentGeneration()
             );
+            case 2 -> new Response(
+                instance.multiGetShardResponse(),
+                instance.primaryTerm(),
+                randomValueOtherThan(instance.segmentGeneration(), this::randomSegmentGeneration)
+            );
+            default -> randomValueOtherThan(instance, this::createTestInstance);
+        };
     }
 
     private long randomSegmentGeneration() {
         return randomBoolean() ? -1L : randomNonNegativeLong();
     }
 
+    private long randomPrimaryTerm() {
+        return randomNonNegativeLong();
+    }
+
     private GetResponse randomGetResponse() {
         return randomBoolean() ? null : new GetResponse(GetResultTests.randomGetResult(randomFrom(XContentType.values())).v1());
     }

+ 2 - 2
server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java

@@ -103,7 +103,7 @@ public class TransportWriteActionTests extends ESTestCase {
         indexShard = mock(IndexShard.class);
         location = mock(Translog.Location.class);
         clusterService = createClusterService(threadPool);
-        when(indexShard.refresh(any())).thenReturn(new Engine.RefreshResult(true, 1));
+        when(indexShard.refresh(any())).thenReturn(new Engine.RefreshResult(true, randomNonNegativeLong(), 1));
         ReplicationGroup replicationGroup = mock(ReplicationGroup.class);
         when(indexShard.getReplicationGroup()).thenReturn(replicationGroup);
         when(replicationGroup.getReplicationTargets()).thenReturn(Collections.emptyList());
@@ -199,7 +199,7 @@ public class TransportWriteActionTests extends ESTestCase {
         verify(indexShard).externalRefresh(eq(PostWriteRefresh.FORCED_REFRESH_AFTER_INDEX), refreshListener.capture());
         verify(indexShard, never()).addRefreshListener(any(), any());
         // Fire the listener manually
-        refreshListener.getValue().onResponse(new Engine.RefreshResult(randomBoolean(), randomNonNegativeLong()));
+        refreshListener.getValue().onResponse(new Engine.RefreshResult(randomBoolean(), randomNonNegativeLong(), randomNonNegativeLong()));
         assertNotNull(listener.response);
         assertNull(listener.failure);
     }