Переглянути джерело

Allow listeners for the segment generation (#92818)

This change allows adding generation listeners which are needed for
Stateless Refresh. The changes include:

- Aligning Engine#refresh and Engine#maybeRefresh to always return a
  RefreshResult.
- Add addSegmentGenerationListener to Engine which by default throw
  UnsupportedOperationException.
- Adapt TransportShardRefreshAction to work with Stateless.

Relates: ES-4862
Pooya Salehi 2 роки тому
батько
коміт
a907d3d8ff

+ 58 - 0
server/src/main/java/org/elasticsearch/action/admin/indices/refresh/ReplicaShardRefreshRequest.java

@@ -0,0 +1,58 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.admin.indices.refresh;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.action.support.replication.ReplicationRequest;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.tasks.TaskId;
+
+import java.io.IOException;
+
+public class ReplicaShardRefreshRequest extends ReplicationRequest<ReplicaShardRefreshRequest> {
+
+    @Nullable
+    private final Long segmentGeneration;
+
+    public ReplicaShardRefreshRequest(ShardId shardId, TaskId parentTaskId, @Nullable Long segmentGeneration) {
+        super(shardId);
+        setParentTask(parentTaskId);
+        this.segmentGeneration = segmentGeneration;
+    }
+
+    public ReplicaShardRefreshRequest(StreamInput in) throws IOException {
+        super(in);
+        if (in.getVersion().onOrAfter(Version.V_8_7_0)) {
+            this.segmentGeneration = in.readOptionalVLong();
+        } else {
+            this.segmentGeneration = null;
+        }
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        if (out.getVersion().onOrAfter(Version.V_8_7_0)) {
+            out.writeOptionalVLong(segmentGeneration);
+        }
+    }
+
+    @Nullable
+    public Long getSegmentGeneration() {
+        return segmentGeneration;
+    }
+
+    @Override
+    public String toString() {
+        return "ReplicaShardRefreshRequest{" + shardId + '}';
+    }
+}

+ 26 - 11
server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java

@@ -15,10 +15,12 @@ import org.elasticsearch.action.support.replication.BasicReplicationRequest;
 import org.elasticsearch.action.support.replication.ReplicationResponse;
 import org.elasticsearch.action.support.replication.TransportReplicationAction;
 import org.elasticsearch.cluster.action.shard.ShardStateAction;
+import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.logging.LogManager;
@@ -30,7 +32,7 @@ import java.io.IOException;
 
 public class TransportShardRefreshAction extends TransportReplicationAction<
     BasicReplicationRequest,
-    BasicReplicationRequest,
+    ReplicaShardRefreshRequest,
     ReplicationResponse> {
 
     private static final Logger logger = LogManager.getLogger(TransportShardRefreshAction.class);
@@ -39,6 +41,8 @@ public class TransportShardRefreshAction extends TransportReplicationAction<
     public static final ActionType<ReplicationResponse> TYPE = new ActionType<>(NAME, ReplicationResponse::new);
     public static final String SOURCE_API = "api";
 
+    private final Settings settings;
+
     @Inject
     public TransportShardRefreshAction(
         Settings settings,
@@ -59,9 +63,10 @@ public class TransportShardRefreshAction extends TransportReplicationAction<
             shardStateAction,
             actionFilters,
             BasicReplicationRequest::new,
-            BasicReplicationRequest::new,
+            ReplicaShardRefreshRequest::new,
             ThreadPool.Names.REFRESH
         );
+        this.settings = settings;
     }
 
     @Override
@@ -73,21 +78,31 @@ public class TransportShardRefreshAction extends TransportReplicationAction<
     protected void shardOperationOnPrimary(
         BasicReplicationRequest shardRequest,
         IndexShard primary,
-        ActionListener<PrimaryResult<BasicReplicationRequest, ReplicationResponse>> listener
+        ActionListener<PrimaryResult<ReplicaShardRefreshRequest, ReplicationResponse>> listener
     ) {
         ActionListener.completeWith(listener, () -> {
-            primary.refresh(SOURCE_API);
+            var refreshResult = primary.refresh(SOURCE_API);
             logger.trace("{} refresh request executed on primary", primary.shardId());
-            return new PrimaryResult<>(shardRequest, new ReplicationResponse());
+            var shardRefreshRequest = new ReplicaShardRefreshRequest(
+                primary.shardId(),
+                shardRequest.getParentTask(),
+                refreshResult.generation()
+            );
+            return new PrimaryResult<>(shardRefreshRequest, new ReplicationResponse());
         });
     }
 
     @Override
-    protected void shardOperationOnReplica(BasicReplicationRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
-        ActionListener.completeWith(listener, () -> {
-            replica.refresh(SOURCE_API);
-            logger.trace("{} refresh request executed on replica", replica.shardId());
-            return new ReplicaResult();
-        });
+    protected void shardOperationOnReplica(ReplicaShardRefreshRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
+        if (DiscoveryNode.isStateless(settings) && replica.routingEntry().isPromotableToPrimary() == false) {
+            assert request.getSegmentGeneration() != Engine.RefreshResult.UNKNOWN_GENERATION;
+            replica.waitForSegmentGeneration(request.getSegmentGeneration(), listener.map(l -> new ReplicaResult()));
+        } else {
+            ActionListener.completeWith(listener, () -> {
+                replica.refresh(SOURCE_API);
+                logger.trace("{} refresh request executed on replica", replica.shardId());
+                return new ReplicaResult();
+            });
+        }
     }
 }

+ 27 - 4
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -82,6 +82,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
+import java.util.function.LongConsumer;
 import java.util.stream.Stream;
 
 import static org.elasticsearch.core.Strings.format;
@@ -1013,16 +1014,14 @@ public abstract class Engine implements Closeable {
      * changes.
      */
     @Nullable
-    public abstract void refresh(String source) throws EngineException;
+    public abstract RefreshResult refresh(String source) throws EngineException;
 
     /**
      * Synchronously refreshes the engine for new search operations to reflect the latest
      * changes unless another thread is already refreshing the engine concurrently.
-     *
-     * @return <code>true</code> if the a refresh happened. Otherwise <code>false</code>
      */
     @Nullable
-    public abstract boolean maybeRefresh(String source) throws EngineException;
+    public abstract RefreshResult maybeRefresh(String source) throws EngineException;
 
     /**
      * Called when our engine is using too much heap and should move buffered indexed/deleted documents to disk.
@@ -1956,4 +1955,28 @@ public abstract class Engine implements Closeable {
     public final EngineConfig getEngineConfig() {
         return engineConfig;
     }
+
+    /**
+     * Allows registering a callback for when the index shard is on a segment generation >= minGeneration.
+     * The provided consumer is called back with the specific segment generation number.
+     */
+    public void addSegmentGenerationListener(long minGeneration, LongConsumer consumer) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Captures the result of a refresh operation on the index shard.
+     * <p>
+     * <code>refreshed</code> is true if a refresh happened. If refreshed, <code>generation</code>
+     * contains the generation of the index commit that the reader has opened upon refresh.
+     */
+    public record RefreshResult(boolean refreshed, long generation) {
+
+        public static final long UNKNOWN_GENERATION = -1L;
+        public static final RefreshResult NO_REFRESH = new RefreshResult(false);
+
+        public RefreshResult(boolean refreshed) {
+            this(refreshed, UNKNOWN_GENERATION);
+        }
+    }
 }

+ 14 - 5
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -1816,20 +1816,21 @@ public class InternalEngine extends Engine {
     }
 
     @Override
-    public void refresh(String source) throws EngineException {
-        refresh(source, SearcherScope.EXTERNAL, true);
+    public RefreshResult refresh(String source) throws EngineException {
+        return refresh(source, SearcherScope.EXTERNAL, true);
     }
 
     @Override
-    public boolean maybeRefresh(String source) throws EngineException {
+    public RefreshResult maybeRefresh(String source) throws EngineException {
         return refresh(source, SearcherScope.EXTERNAL, false);
     }
 
-    final boolean refresh(String source, SearcherScope scope, boolean block) throws EngineException {
+    final RefreshResult refresh(String source, SearcherScope scope, boolean block) throws EngineException {
         // both refresh types will result in an internal refresh but only the external will also
         // pass the new reader reference to the external reader manager.
         final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();
         boolean refreshed;
+        long segmentGeneration = RefreshResult.UNKNOWN_GENERATION;
         try {
             // refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way.
             if (store.tryIncRef()) {
@@ -1845,6 +1846,14 @@ public class InternalEngine extends Engine {
                     } else {
                         refreshed = referenceManager.maybeRefresh();
                     }
+                    if (refreshed) {
+                        final ElasticsearchDirectoryReader current = referenceManager.acquire();
+                        try {
+                            segmentGeneration = current.getIndexCommit().getGeneration();
+                        } finally {
+                            referenceManager.release(current);
+                        }
+                    }
                 } finally {
                     store.decRef();
                 }
@@ -1876,7 +1885,7 @@ public class InternalEngine extends Engine {
         // for a long time:
         maybePruneDeletes();
         mergeScheduler.refreshConfig();
-        return refreshed;
+        return new RefreshResult(refreshed, segmentGeneration);
     }
 
     @Override

+ 4 - 3
server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

@@ -418,14 +418,15 @@ public class ReadOnlyEngine extends Engine {
     }
 
     @Override
-    public void refresh(String source) {
+    public RefreshResult refresh(String source) {
         // we could allow refreshes if we want down the road the reader manager will then reflect changes to a rw-engine
         // opened side-by-side
+        return RefreshResult.NO_REFRESH;
     }
 
     @Override
-    public boolean maybeRefresh(String source) throws EngineException {
-        return false;
+    public RefreshResult maybeRefresh(String source) throws EngineException {
+        return RefreshResult.NO_REFRESH;
     }
 
     @Override

+ 7 - 3
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -1217,12 +1217,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     /**
      * Writes all indexing changes to disk and opens a new searcher reflecting all changes.  This can throw {@link AlreadyClosedException}.
      */
-    public void refresh(String source) {
+    public Engine.RefreshResult refresh(String source) {
         verifyNotClosed();
         if (logger.isTraceEnabled()) {
             logger.trace("refresh with source [{}]", source);
         }
-        getEngine().refresh(source);
+        return getEngine().refresh(source);
     }
 
     /**
@@ -3803,7 +3803,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                 if (logger.isTraceEnabled()) {
                     logger.trace("refresh with source [schedule]");
                 }
-                return getEngine().maybeRefresh("schedule");
+                return getEngine().maybeRefresh("schedule").refreshed();
             }
         }
         final Engine engine = getEngine();
@@ -4120,4 +4120,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     public String toString() {
         return "IndexShard(shardRouting=" + shardRouting + ")";
     }
+
+    public void waitForSegmentGeneration(long segmentGeneration, ActionListener<Long> listener) {
+        getEngine().addSegmentGenerationListener(segmentGeneration, listener::onResponse);
+    }
 }

+ 27 - 1
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -5429,7 +5429,9 @@ public class InternalEngineTests extends EngineTestCase {
                 engine.index(primaryResponse);
             }
             assertTrue(engine.refreshNeeded());
-            engine.refresh("test", Engine.SearcherScope.INTERNAL, true);
+            var refreshResult = engine.refresh("test", Engine.SearcherScope.INTERNAL, true);
+            assertTrue(refreshResult.refreshed());
+            assertNotEquals(refreshResult.generation(), Engine.RefreshResult.UNKNOWN_GENERATION);
             try (
                 Engine.Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
                 Engine.Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)
@@ -7586,6 +7588,30 @@ public class InternalEngineTests extends EngineTestCase {
         }
     }
 
+    public void testRefreshResult() throws IOException {
+        try (
+            Store store = createStore();
+            InternalEngine engine =
+                // disable merges to make sure that the reader doesn't change unexpectedly during the test
+                createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)
+        ) {
+            var refresh1Result = engine.refresh("warm_up");
+            assertTrue(refresh1Result.refreshed());
+            assertNotEquals("when refreshed, generation must be set", refresh1Result.generation(), Engine.RefreshResult.UNKNOWN_GENERATION);
+            for (int i = 0; i < 10; i++) {
+                engine.index(indexForDoc(createParsedDoc(String.valueOf(i), EngineTestCase.randomIdFieldType(), null)));
+            }
+            assertTrue(engine.refreshNeeded());
+            var refresh2Result = engine.refresh("test", Engine.SearcherScope.INTERNAL, true);
+            assertTrue(refresh2Result.refreshed());
+            assertThat(refresh2Result.generation(), greaterThanOrEqualTo(refresh1Result.generation()));
+            engine.flush(true, true);
+            var refresh3Result = engine.refresh("test");
+            assertTrue(refresh3Result.refreshed());
+            assertThat(refresh3Result.generation(), greaterThan(refresh2Result.generation()));
+        }
+    }
+
     private static void assertCommitGenerations(Map<IndexCommit, Engine.IndexCommitRef> commits, List<Long> expectedGenerations) {
         assertCommitGenerations(commits.values().stream().map(Engine.IndexCommitRef::getIndexCommit).toList(), expectedGenerations);
     }