Browse Source

Cleanup Some releaseOnce/RunOnce Usage (#72217)

Just a few obvious spots I found debugging recent test failures where we use the
wrong "once" wrapper that needlessly causes indirection/allocation/longer stack-traces.
Armin Braun 4 years ago
parent
commit
6fac5a45e3

+ 2 - 3
server/src/internalClusterTest/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java

@@ -17,8 +17,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Glob;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.concurrent.RunOnce;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.transport.MockTransportService;
@@ -148,8 +148,7 @@ public class ReopenWhileClosingIT extends ESIntegTestCase {
                     connection.sendRequest(requestId, action, request, options);
                 });
         }
-        final RunOnce releaseOnce = new RunOnce(release::countDown);
-        return releaseOnce::run;
+        return Releasables.releaseOnce(release::countDown);
     }
 
     private static void assertIndexIsBlocked(final String... indices) {

+ 2 - 7
server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java

@@ -11,6 +11,7 @@ package org.elasticsearch.index.engine;
 import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.search.Query;
 import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.index.mapper.SeqNoFieldMapper;
 import org.elasticsearch.index.seqno.RetentionLease;
 import org.elasticsearch.index.seqno.RetentionLeases;
@@ -18,7 +19,6 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.translog.Translog;
 
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
@@ -77,12 +77,7 @@ final class SoftDeletesPolicy {
     synchronized Releasable acquireRetentionLock() {
         assert retentionLockCount >= 0 : "Invalid number of retention locks [" + retentionLockCount + "]";
         retentionLockCount++;
-        final AtomicBoolean released = new AtomicBoolean();
-        return () -> {
-            if (released.compareAndSet(false, true)) {
-                releaseRetentionLock();
-            }
-        };
+        return Releasables.releaseOnce(this::releaseRetentionLock);
     }
 
     private synchronized void releaseRetentionLock() {

+ 3 - 4
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -65,7 +65,6 @@ import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.CollectionUtils;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.AsyncIOProcessor;
-import org.elasticsearch.common.util.concurrent.RunOnce;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.core.internal.io.IOUtils;
@@ -2990,7 +2989,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
 
             @Override
             public void onResponse(final Releasable releasable) {
-                final RunOnce releaseOnce = new RunOnce(releasable::close);
+                final Releasable releaseOnce = Releasables.releaseOnce(releasable);
                 try {
                     assert getOperationPrimaryTerm() <= pendingPrimaryTerm;
                     termUpdated.await();
@@ -3003,14 +3002,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                 } catch (final Exception e) {
                     if (combineWithAction == null) {
                         // otherwise leave it to combineWithAction to release the permit
-                        releaseOnce.run();
+                        releaseOnce.close();
                     }
                     innerFail(e);
                 } finally {
                     if (combineWithAction != null) {
                         combineWithAction.onResponse(releasable);
                     } else {
-                        releaseOnce.run();
+                        releaseOnce.close();
                     }
                 }
             }

+ 6 - 12
server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java

@@ -15,8 +15,8 @@ import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.support.ContextPreservingActionListener;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
-import org.elasticsearch.common.util.concurrent.RunOnce;
 import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -92,12 +92,12 @@ final class IndexShardOperationPermits implements Closeable {
         delayOperations();
         threadPool.executor(executor).execute(new AbstractRunnable() {
 
-            final RunOnce released = new RunOnce(() -> releaseDelayedOperations());
+            final Releasable released = Releasables.releaseOnce(() -> releaseDelayedOperations());
 
             @Override
             public void onFailure(final Exception e) {
                 try {
-                    released.run(); // resume delayed operations as soon as possible
+                    released.close(); // resume delayed operations as soon as possible
                 } finally {
                     onAcquired.onFailure(e);
                 }
@@ -106,13 +106,7 @@ final class IndexShardOperationPermits implements Closeable {
             @Override
             protected void doRun() throws Exception {
                 final Releasable releasable = acquireAll(timeout, timeUnit);
-                onAcquired.onResponse(() -> {
-                    try {
-                        releasable.close();
-                    } finally {
-                        released.run();
-                    }
-                });
+                onAcquired.onResponse(() -> Releasables.close(releasable, released));
             }
         });
     }
@@ -135,11 +129,11 @@ final class IndexShardOperationPermits implements Closeable {
             }
         }
         if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
-            final RunOnce release = new RunOnce(() -> {
+            final Releasable release = Releasables.releaseOnce(() -> {
                 assert semaphore.availablePermits() == 0;
                 semaphore.release(TOTAL_PERMITS);
             });
-            return release::run;
+            return release;
         } else {
             throw new TimeoutException("timeout while blocking operations");
         }

+ 4 - 4
server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java

@@ -12,8 +12,8 @@ import org.apache.logging.log4j.Logger;
 import org.apache.lucene.search.ReferenceManager;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.metrics.MeanMetric;
-import org.elasticsearch.common.util.concurrent.RunOnce;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.index.translog.Translog;
 
@@ -91,7 +91,7 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
             assert refreshForcers >= 0;
             refreshForcers += 1;
         }
-        final RunOnce runOnce = new RunOnce(() -> {
+        final Releasable releaseOnce = Releasables.releaseOnce(() -> {
             synchronized (RefreshListeners.this) {
                 assert refreshForcers > 0;
                 refreshForcers -= 1;
@@ -101,12 +101,12 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
             try {
                 forceRefresh.run();
             } catch (Exception e) {
-                runOnce.run();
+                releaseOnce.close();
                 throw e;
             }
         }
         assert refreshListeners == null;
-        return () -> runOnce.run();
+        return releaseOnce;
     }
 
     /**

+ 1 - 7
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -74,7 +74,6 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
@@ -396,12 +395,7 @@ public class RecoverySourceHandler {
      */
     private Engine.IndexCommitRef acquireSafeCommit(IndexShard shard) {
         final Engine.IndexCommitRef commitRef = shard.acquireSafeIndexCommit();
-        final AtomicBoolean closed = new AtomicBoolean(false);
-        return new Engine.IndexCommitRef(commitRef.getIndexCommit(), () -> {
-            if (closed.compareAndSet(false, true)) {
-                runWithGenericThreadPool(commitRef::close);
-            }
-        });
+        return new Engine.IndexCommitRef(commitRef.getIndexCommit(), () -> runWithGenericThreadPool(commitRef::close));
     }
 
     private void runWithGenericThreadPool(CheckedRunnable<Exception> task) {