Răsfoiți Sursa

Fix CacheServiceTests.testProcessShardEviction (#91855)

Slightly adjust the mechanics of the map of futures so that `waitForCacheFilesEvictionIfNeeded` and
a future obtained from `pendingShardsEvictions` will always be
consistent since both the future and the map removal happen under the
same mutex. The existing approach of using the future from the executor
submit and removing it from the map during the execution of its task
would lead to a race where the future is always removed from the map
before it completes, so `waitForCacheFilesEvictionIfNeeded` would
briefly not see the future in the map and return while the future has
not yet been completed.

closes #91814
Armin Braun 2 ani în urmă
părinte
comite
1c6d9a90be

+ 19 - 14
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/CacheService.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.searchablesnapshots.cache.full;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.store.AlreadyClosedException;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.cache.Cache;
@@ -140,7 +141,7 @@ public class CacheService extends AbstractLifecycleComponent {
     private final Cache<CacheKey, CacheFile> cache;
     private final ByteSizeValue rangeSize;
     private final ByteSizeValue recoveryRangeSize;
-    private final Map<ShardEviction, Future<?>> pendingShardsEvictions;
+    private final Map<ShardEviction, PlainActionFuture<?>> pendingShardsEvictions;
     private final ReadWriteLock shardsEvictionsLock;
     private final Object shardsEvictionsMutex;
 
@@ -345,18 +346,22 @@ public class CacheService extends AbstractLifecycleComponent {
         synchronized (shardsEvictionsMutex) {
             if (allowShardsEvictions) {
                 final ShardEviction shardEviction = new ShardEviction(snapshotUUID, snapshotIndexName, shardId);
-                pendingShardsEvictions.computeIfAbsent(shardEviction, shard -> threadPool.generic().submit(new AbstractRunnable() {
-                    @Override
-                    protected void doRun() {
-                        processShardEviction(shardEviction);
-                    }
+                pendingShardsEvictions.computeIfAbsent(shardEviction, shard -> {
+                    final PlainActionFuture<?> future = PlainActionFuture.newFuture();
+                    threadPool.generic().execute(new AbstractRunnable() {
+                        @Override
+                        protected void doRun() {
+                            processShardEviction(shardEviction);
+                        }
 
-                    @Override
-                    public void onFailure(Exception e) {
-                        logger.warn(() -> format("failed to evict cache files associated with shard %s", shardEviction), e);
-                        assert false : e;
-                    }
-                }));
+                        @Override
+                        public void onFailure(Exception e) {
+                            logger.warn(() -> format("failed to evict cache files associated with shard %s", shardEviction), e);
+                            assert false : e;
+                        }
+                    });
+                    return future;
+                });
             }
         }
     }
@@ -423,8 +428,8 @@ public class CacheService extends AbstractLifecycleComponent {
                 }
             } finally {
                 synchronized (shardsEvictionsMutex) {
-                    final Future<?> removedFuture = pendingShardsEvictions.remove(shardEviction);
-                    assert removedFuture != null;
+                    final PlainActionFuture<?> removedFuture = pendingShardsEvictions.remove(shardEviction);
+                    removedFuture.onResponse(null);
                 }
             }
         } finally {