瀏覽代碼

Some cleanup in IndexShard (#95742)

Cleaning up some unused things, inlining weird private method call in
constructor, removing test only method that was mocked but never
called, shortening a record and inlining single use private methods.
Armin Braun 2 年之前
父節點
當前提交
cd8914039d

+ 46 - 94
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -283,7 +283,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
     private final RefreshPendingLocationListener refreshPendingLocationListener;
     private volatile boolean useRetentionLeasesInPeerRecovery;
-    private final boolean isDataStreamIndex; // if a shard is a part of data stream
     private final LongSupplier relativeTimeInNanosSupplier;
     private volatile long startedRelativeTimeInNanos;
     private volatile long indexingTimeBeforeShardStartedInNanos;
@@ -333,14 +332,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         this.mapperService = mapperService;
         this.indexCache = indexCache;
         this.internalIndexingStats = new InternalIndexingStats();
-        final List<IndexingOperationListener> listenersList = new ArrayList<>(listeners);
-        listenersList.add(internalIndexingStats);
-        this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger);
+        this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(
+            CollectionUtils.appendToCopyNoNullElements(listeners, internalIndexingStats),
+            logger
+        );
         this.bulkOperationListener = new ShardBulkStats();
         this.globalCheckpointSyncer = globalCheckpointSyncer;
         this.retentionLeaseSyncer = Objects.requireNonNull(retentionLeaseSyncer);
         this.searchOperationListener = new SearchOperationListener.CompositeListener(
-            CollectionUtils.appendToCopy(searchOperationListener, searchStats),
+            CollectionUtils.appendToCopyNoNullElements(searchOperationListener, searchStats),
             logger
         );
         this.getService = new ShardGetService(indexSettings, this, mapperService);
@@ -384,12 +384,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         }
         indexShardOperationPermits = new IndexShardOperationPermits(shardId, threadPool);
         readerWrapper = indexReaderWrapper;
-        refreshListeners = buildRefreshListeners();
+        refreshListeners = new RefreshListeners(
+            indexSettings::getMaxRefreshListeners,
+            () -> refresh("too_many_listeners"),
+            logger,
+            threadPool.getThreadContext(),
+            externalRefreshMetric
+        );
         lastSearcherAccess.set(threadPool.relativeTimeInMillis());
         persistMetadata(path, indexSettings, shardRouting, null, logger);
         this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
         this.refreshPendingLocationListener = new RefreshPendingLocationListener();
-        this.isDataStreamIndex = mapperService == null ? false : mapperService.mappingLookup().isDataStreamTimestampFieldEnabled();
         this.relativeTimeInNanosSupplier = relativeTimeInNanosSupplier;
         this.indexCommitListener = indexCommitListener;
     }
@@ -409,14 +414,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         return indexSortSupplier.get();
     }
 
-    /**
-     * Returns if this shard is a part of datastream
-     * @return {@code true} if this shard is a part of datastream, {@code false} otherwise
-     */
-    public boolean isDataStreamIndex() {
-        return isDataStreamIndex;
-    }
-
     public ShardGetService getService() {
         return this.getService;
     }
@@ -491,10 +488,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         return this.shardRouting;
     }
 
-    public QueryCachingPolicy getQueryCachingPolicy() {
-        return cachingPolicy;
-    }
-
     @Override
     public void updateShardState(
         final ShardRouting newRouting,
@@ -1174,8 +1167,23 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         assert opPrimaryTerm <= getOperationPrimaryTerm()
             : "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]";
         ensureWriteAllowed(origin);
-        final Engine.Delete delete = prepareDelete(id, seqNo, opPrimaryTerm, version, versionType, origin, ifSeqNo, ifPrimaryTerm);
-        return delete(engine, delete);
+        active.set(true);
+        Engine.Delete delete = indexingOperationListeners.preDelete(
+            shardId,
+            prepareDelete(id, seqNo, opPrimaryTerm, version, versionType, origin, ifSeqNo, ifPrimaryTerm)
+        );
+        final Engine.DeleteResult result;
+        try {
+            if (logger.isTraceEnabled()) {
+                logger.trace("delete [{}] (seq no [{}])", delete.uid().text(), delete.seqNo());
+            }
+            result = engine.delete(delete);
+        } catch (Exception e) {
+            indexingOperationListeners.postDelete(shardId, delete, e);
+            throw e;
+        }
+        indexingOperationListeners.postDelete(shardId, delete, result);
+        return result;
     }
 
     public static Engine.Delete prepareDelete(
@@ -1193,23 +1201,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         return new Engine.Delete(id, uid, seqNo, primaryTerm, version, versionType, origin, startTime, ifSeqNo, ifPrimaryTerm);
     }
 
-    private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws IOException {
-        active.set(true);
-        final Engine.DeleteResult result;
-        delete = indexingOperationListeners.preDelete(shardId, delete);
-        try {
-            if (logger.isTraceEnabled()) {
-                logger.trace("delete [{}] (seq no [{}])", delete.uid().text(), delete.seqNo());
-            }
-            result = engine.delete(delete);
-        } catch (Exception e) {
-            indexingOperationListeners.postDelete(shardId, delete, e);
-            throw e;
-        }
-        indexingOperationListeners.postDelete(shardId, delete, result);
-        return result;
-    }
-
     public Engine.GetResult get(Engine.Get get) {
         readAllowed();
         MappingLookup mappingLookup = mapperService.mappingLookup();
@@ -1227,9 +1218,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
      */
     public Engine.RefreshResult refresh(String source) {
         verifyNotClosed();
-        if (logger.isTraceEnabled()) {
-            logger.trace("refresh with source [{}]", source);
-        }
+        logger.trace("refresh with source [{}]", source);
         return getEngine().refresh(source);
     }
 
@@ -1409,10 +1398,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     }
 
     public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
-        verifyActive();
-        if (logger.isTraceEnabled()) {
-            logger.trace("force merge with {}", forceMerge);
+        IndexShardState state = this.state; // one time volatile read
+        if (state != IndexShardState.STARTED) {
+            throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard is active");
         }
+        logger.trace("force merge with {}", forceMerge);
         Engine engine = getEngine();
         engine.forceMerge(forceMerge.flush(), forceMerge.maxNumSegments(), forceMerge.onlyExpungeDeletes(), forceMerge.forceMergeUUID());
     }
@@ -1620,7 +1610,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         }
 
         @Override
-        protected void doClose() throws IOException {
+        protected void doClose() {
             // don't close here - mimic the MultiReader#doClose = false behavior that FilterDirectoryReader doesn't have
         }
 
@@ -2185,13 +2175,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         }
     }
 
-    protected final void verifyActive() throws IllegalIndexShardStateException {
-        IndexShardState state = this.state; // one time volatile read
-        if (state != IndexShardState.STARTED) {
-            throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard is active");
-        }
-    }
-
     /**
      * Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed
      */
@@ -3051,11 +3034,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         // }
         assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource());
         switch (recoveryState.getRecoverySource().getType()) {
-            case EMPTY_STORE:
-            case EXISTING_STORE:
-                executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
-                break;
-            case PEER:
+            case EMPTY_STORE, EXISTING_STORE -> executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
+            case PEER -> {
                 try {
                     markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
                     recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
@@ -3063,8 +3043,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                     failShard("corrupted preexisting index", e);
                     recoveryListener.onRecoveryFailure(new RecoveryFailedException(recoveryState, null, e), true);
                 }
-                break;
-            case SNAPSHOT:
+            }
+            case SNAPSHOT -> {
                 final String repo = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot().getRepository();
                 executeRecovery(
                     "from snapshot",
@@ -3072,8 +3052,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                     recoveryListener,
                     l -> restoreFromRepository(repositoriesService.repository(repo), l)
                 );
-                break;
-            case LOCAL_SHARDS:
+            }
+            case LOCAL_SHARDS -> {
                 final IndexMetadata indexMetadata = indexSettings().getIndexMetadata();
                 final Index resizeSourceIndex = indexMetadata.getResizeSourceIndex();
                 final List<IndexShard> startedShards = new ArrayList<>();
@@ -3096,7 +3076,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                     numShards = -1;
                     requiredShards = Collections.emptySet();
                 }
-
                 if (numShards == startedShards.size()) {
                     assert requiredShards.isEmpty() == false;
                     executeRecovery(
@@ -3127,9 +3106,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                     }
                     throw e;
                 }
-                break;
-            default:
-                throw new IllegalArgumentException("Unknown recovery source " + recoveryState.getRecoverySource());
+            }
+            default -> throw new IllegalArgumentException("Unknown recovery source " + recoveryState.getRecoverySource());
         }
     }
 
@@ -3704,36 +3682,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         }
     }
 
-    /**
-     * Build {@linkplain RefreshListeners} for this shard.
-     */
-    private RefreshListeners buildRefreshListeners() {
-        return new RefreshListeners(
-            indexSettings::getMaxRefreshListeners,
-            () -> refresh("too_many_listeners"),
-            logger,
-            threadPool.getThreadContext(),
-            externalRefreshMetric
-        );
-    }
-
     /**
      * Simple struct encapsulating a shard failure
      *
      * @see IndexShard#addShardFailureCallback(Consumer)
      */
-    public static final class ShardFailure {
-        public final ShardRouting routing;
-        public final String reason;
-        @Nullable
-        public final Exception cause;
-
-        public ShardFailure(ShardRouting routing, String reason, @Nullable Exception cause) {
-            this.routing = routing;
-            this.reason = reason;
-            this.cause = cause;
-        }
-    }
+    public record ShardFailure(ShardRouting routing, String reason, @Nullable Exception cause) {}
 
     EngineFactory getEngineFactory() {
         return engineFactory;
@@ -3765,9 +3719,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                 setRefreshPending(engine);
                 return false;
             } else {
-                if (logger.isTraceEnabled()) {
-                    logger.trace("refresh with source [schedule]");
-                }
+                logger.trace("refresh with source [schedule]");
                 return getEngine().maybeRefresh("schedule").refreshed();
             }
         }
@@ -3919,7 +3871,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         }
 
         @Override
-        public void beforeRefresh() throws IOException {
+        public void beforeRefresh() {
             if (Assertions.ENABLED) {
                 assert callingThread == null
                     : "beforeRefresh was called by " + callingThread.getName() + " without a corresponding call to afterRefresh";

+ 3 - 3
server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

@@ -929,14 +929,14 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
     private class FailedShardHandler implements Consumer<IndexShard.ShardFailure> {
         @Override
         public void accept(final IndexShard.ShardFailure shardFailure) {
-            final ShardRouting shardRouting = shardFailure.routing;
+            final ShardRouting shardRouting = shardFailure.routing();
             threadPool.generic().execute(() -> {
                 synchronized (IndicesClusterStateService.this) {
                     failAndRemoveShard(
                         shardRouting,
                         true,
-                        "shard failure, reason [" + shardFailure.reason + "]",
-                        shardFailure.cause,
+                        "shard failure, reason [" + shardFailure.reason() + "]",
+                        shardFailure.cause(),
                         clusterService.state()
                     );
                 }

+ 0 - 5
server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java

@@ -12,7 +12,6 @@ import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.MatchNoDocsQuery;
 import org.apache.lucene.search.Query;
-import org.apache.lucene.search.QueryCachingPolicy;
 import org.apache.lucene.search.Sort;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.tests.index.RandomIndexWriter;
@@ -74,8 +73,6 @@ public class DefaultSearchContextTests extends ESTestCase {
 
         ThreadPool threadPool = new TestThreadPool(this.getClass().getName());
         IndexShard indexShard = mock(IndexShard.class);
-        QueryCachingPolicy queryCachingPolicy = mock(QueryCachingPolicy.class);
-        when(indexShard.getQueryCachingPolicy()).thenReturn(queryCachingPolicy);
         when(indexShard.getThreadPool()).thenReturn(threadPool);
 
         int maxResultWindow = randomIntBetween(50, 100);
@@ -303,8 +300,6 @@ public class DefaultSearchContextTests extends ESTestCase {
 
         ThreadPool threadPool = new TestThreadPool(this.getClass().getName());
         IndexShard indexShard = mock(IndexShard.class);
-        QueryCachingPolicy queryCachingPolicy = mock(QueryCachingPolicy.class);
-        when(indexShard.getQueryCachingPolicy()).thenReturn(queryCachingPolicy);
         when(indexShard.getThreadPool()).thenReturn(threadPool);
 
         IndexService indexService = mock(IndexService.class);

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -115,7 +115,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
 
     private static final Consumer<IndexShard.ShardFailure> DEFAULT_SHARD_FAILURE_HANDLER = failure -> {
         if (failOnShardFailures.get()) {
-            throw new AssertionError(failure.reason, failure.cause);
+            throw new AssertionError(failure.reason(), failure.cause());
         }
     };