瀏覽代碼

Searches & mgets on fast refresh promotable shards (#96740)

Relates ES-6150
Iraklis Psaroudakis 2 年之前
父節點
當前提交
658c9ba359

+ 9 - 4
server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

@@ -20,9 +20,9 @@ import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.OperationRouting;
 import org.elasticsearch.cluster.routing.PlainShardIterator;
 import org.elasticsearch.cluster.routing.ShardIterator;
-import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -96,7 +96,10 @@ public class TransportGetAction extends TransportSingleShardAction<GetRequest, G
         if (iterator == null) {
             return null;
         }
-        return new PlainShardIterator(iterator.shardId(), iterator.getShardRoutings().stream().filter(ShardRouting::isSearchable).toList());
+        return new PlainShardIterator(
+            iterator.shardId(),
+            iterator.getShardRoutings().stream().filter(shardRouting -> OperationRouting.canSearchShard(shardRouting, state)).toList()
+        );
     }
 
     @Override
@@ -110,11 +113,13 @@ public class TransportGetAction extends TransportSingleShardAction<GetRequest, G
         IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
         IndexShard indexShard = indexService.getShard(shardId.id());
         if (indexShard.routingEntry().isPromotableToPrimary() == false) {
+            assert indexShard.indexSettings().isFastRefresh() == false
+                : "a search shard should not receive a TransportGetAction for an index with fast refresh";
             handleGetOnUnpromotableShard(request, indexShard, listener);
             return;
         }
-        assert DiscoveryNode.isStateless(clusterService.getSettings()) == false
-            : "A TransportGetAction should always be handled by a search shard in Stateless";
+        assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh()
+            : "in Stateless a promotable to primary shard can receive a TransportGetAction only if an index has the fast refresh setting";
         if (request.realtime()) { // we are not tied to a refresh cycle here anyway
             asyncGet(request, shardId, listener);
         } else {

+ 15 - 2
server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java

@@ -30,6 +30,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
+
 public class OperationRouting {
 
     public static final Setting<Boolean> USE_ADAPTIVE_REPLICA_SELECTION_SETTING = Setting.boolSetting(
@@ -122,8 +124,11 @@ public class OperationRouting {
                 nodeCounts
             );
             if (iterator != null) {
-                var searchableShards = iterator.getShardRoutings().stream().filter(ShardRouting::isSearchable).toList();
-                set.add(new PlainShardIterator(iterator.shardId(), searchableShards));
+                var shardsThatCanHandleSearches = iterator.getShardRoutings()
+                    .stream()
+                    .filter(shardRouting -> canSearchShard(shardRouting, clusterState))
+                    .toList();
+                set.add(new PlainShardIterator(iterator.shardId(), shardsThatCanHandleSearches));
             }
         }
         return GroupShardsIterator.sortAndCreate(new ArrayList<>(set));
@@ -262,4 +267,12 @@ public class OperationRouting {
         IndexMetadata indexMetadata = indexMetadata(clusterState, index);
         return new ShardId(indexMetadata.getIndex(), IndexRouting.fromIndexMetadata(indexMetadata).getShard(id, routing));
     }
+
+    public static boolean canSearchShard(ShardRouting shardRouting, ClusterState clusterState) {
+        if (INDEX_FAST_REFRESH_SETTING.get(clusterState.metadata().index(shardRouting.index()).getSettings())) {
+            return shardRouting.isPromotableToPrimary();
+        } else {
+            return shardRouting.isSearchable();
+        }
+    }
 }

+ 10 - 1
server/src/main/java/org/elasticsearch/index/IndexSettings.java

@@ -638,6 +638,7 @@ public final class IndexSettings {
     private volatile Translog.Durability durability;
     private volatile TimeValue syncInterval;
     private volatile TimeValue refreshInterval;
+    private final boolean fastRefresh;
     private volatile ByteSizeValue flushThresholdSize;
     private volatile TimeValue flushThresholdAge;
     private volatile ByteSizeValue generationThresholdSize;
@@ -787,7 +788,8 @@ public final class IndexSettings {
         defaultFields = scopedSettings.get(DEFAULT_FIELD_SETTING);
         syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings);
         refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
-        if (scopedSettings.get(INDEX_FAST_REFRESH_SETTING) && DiscoveryNode.isStateless(nodeSettings) == false) {
+        fastRefresh = scopedSettings.get(INDEX_FAST_REFRESH_SETTING);
+        if (fastRefresh && DiscoveryNode.isStateless(nodeSettings) == false) {
             throw new IllegalArgumentException(INDEX_FAST_REFRESH_SETTING.getKey() + " is allowed only in stateless");
         }
         flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
@@ -1119,6 +1121,13 @@ public final class IndexSettings {
         return refreshInterval;
     }
 
+    /**
+     * Only intended for stateless.
+     */
+    public boolean isFastRefresh() {
+        return fastRefresh;
+    }
+
     /**
      * Returns the transaction log threshold size when to forcefully flush the index and clear the transaction log.
      */