Răsfoiți Sursa

Extract RunOnce into a dedicated class (#35489)

This commit extracts the static inner class RunOnce from 
WorkerBulkByScrollTaskState so that it can be reused at 
other places.
Tanguy Leroux 7 ani în urmă
părinte
comite
c8c8ce2374

+ 9 - 4
modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java

@@ -35,8 +35,6 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;
-import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.delete.DeleteResponse;
 import org.elasticsearch.action.index.IndexRequest;
@@ -65,6 +63,8 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.engine.VersionConflictEngineException;
+import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;
+import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
@@ -323,8 +323,13 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
                 // While we're here we can check that the sleep made it through
                 assertThat(delay.nanos(), greaterThan(0L));
                 assertThat(delay.seconds(), lessThanOrEqualTo(10L));
-                ((AbstractRunnable) command).onRejection(new EsRejectedExecutionException("test"));
-                return null;
+                final EsRejectedExecutionException exception = new EsRejectedExecutionException("test");
+                if (command instanceof AbstractRunnable) {
+                    ((AbstractRunnable) command).onRejection(exception);
+                    return null;
+                } else {
+                    throw exception;
+                }
             }
         });
         ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null);

+ 50 - 0
server/src/main/java/org/elasticsearch/common/util/concurrent/RunOnce.java

@@ -0,0 +1,50 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.common.util.concurrent;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Runnable that can only be run one time.
+ */
+public class RunOnce implements Runnable {
+
+    private final Runnable delegate;
+    private final AtomicBoolean hasRun;
+
+    public RunOnce(final Runnable delegate) {
+        this.delegate = Objects.requireNonNull(delegate);
+        this.hasRun = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void run() {
+        if (hasRun.compareAndSet(false, true)) {
+            delegate.run();
+        }
+    }
+
+    /**
+     * {@code true} if the {@link RunOnce} has been executed once.
+     */
+    public boolean hasRun() {
+        return hasRun.get();
+    }
+}

+ 13 - 41
server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java

@@ -23,12 +23,13 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.common.util.concurrent.FutureUtils;
+import org.elasticsearch.common.util.concurrent.RunOnce;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -188,8 +189,12 @@ public class WorkerBulkByScrollTaskState implements SuccessfullyProcessed {
         synchronized (delayedPrepareBulkRequestReference) {
             TimeValue delay = throttleWaitTime(lastBatchStartTime, timeValueNanos(System.nanoTime()), lastBatchSize);
             logger.debug("[{}]: preparing bulk request for [{}]", task.getId(), delay);
-            delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(),
-                delay, new RunOnce(prepareBulkRequestRunnable)));
+            try {
+                delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(),
+                    delay, new RunOnce(prepareBulkRequestRunnable)));
+            } catch (EsRejectedExecutionException e) {
+                prepareBulkRequestRunnable.onRejection(e);
+            }
         }
     }
 
@@ -242,25 +247,17 @@ public class WorkerBulkByScrollTaskState implements SuccessfullyProcessed {
 
     class DelayedPrepareBulkRequest {
         private final ThreadPool threadPool;
-        private final AbstractRunnable command;
+        private final Runnable command;
         private final float requestsPerSecond;
         private final ScheduledFuture<?> future;
 
-        DelayedPrepareBulkRequest(ThreadPool threadPool, float requestsPerSecond, TimeValue delay, AbstractRunnable command) {
+        DelayedPrepareBulkRequest(ThreadPool threadPool, float requestsPerSecond, TimeValue delay, Runnable command) {
             this.threadPool = threadPool;
             this.requestsPerSecond = requestsPerSecond;
             this.command = command;
-            this.future = threadPool.schedule(delay, ThreadPool.Names.GENERIC, new AbstractRunnable() {
-                @Override
-                protected void doRun() throws Exception {
-                    throttledNanos.addAndGet(delay.nanos());
-                    command.run();
-                }
-
-                @Override
-                public void onFailure(Exception e) {
-                    command.onFailure(e);
-                }
+            this.future = threadPool.schedule(delay, ThreadPool.Names.GENERIC, () -> {
+                throttledNanos.addAndGet(delay.nanos());
+                command.run();
             });
         }
 
@@ -302,29 +299,4 @@ public class WorkerBulkByScrollTaskState implements SuccessfullyProcessed {
             return timeValueNanos(round(remainingDelay * requestsPerSecond / newRequestsPerSecond));
         }
     }
-
-    /**
-     * Runnable that can only be run one time. This is paranoia to prevent furiously rethrottling from running the command multiple times.
-     * Without it the command would be run multiple times.
-     */
-    private static class RunOnce extends AbstractRunnable {
-        private final AtomicBoolean hasRun = new AtomicBoolean(false);
-        private final AbstractRunnable delegate;
-
-        RunOnce(AbstractRunnable delegate) {
-            this.delegate = delegate;
-        }
-
-        @Override
-        protected void doRun() throws Exception {
-            if (hasRun.compareAndSet(false, true)) {
-                delegate.run();
-            }
-        }
-
-        @Override
-        public void onFailure(Exception e) {
-            delegate.onFailure(e);
-        }
-    }
 }

+ 101 - 0
server/src/test/java/org/elasticsearch/common/util/concurrent/RunOnceTests.java

@@ -0,0 +1,101 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.util.concurrent;
+
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RunOnceTests extends ESTestCase {
+
+    public void testRunOnce() {
+        final AtomicInteger counter = new AtomicInteger(0);
+        final RunOnce runOnce = new RunOnce(counter::incrementAndGet);
+        assertFalse(runOnce.hasRun());
+
+        runOnce.run();
+        assertTrue(runOnce.hasRun());
+        assertEquals(1, counter.get());
+
+        runOnce.run();
+        assertTrue(runOnce.hasRun());
+        assertEquals(1, counter.get());
+    }
+
+    public void testRunOnceConcurrently() throws InterruptedException {
+        final AtomicInteger counter = new AtomicInteger(0);
+        final RunOnce runOnce = new RunOnce(counter::incrementAndGet);
+
+        final Thread[] threads = new Thread[between(3, 10)];
+        final CountDownLatch latch = new CountDownLatch(1);
+        for (int i = 0; i < threads.length; i++) {
+            threads[i] = new Thread(() -> {
+                try {
+                    latch.await();
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+                runOnce.run();
+            });
+            threads[i].start();
+        }
+
+        latch.countDown();
+        for (Thread thread : threads) {
+            thread.join();
+        }
+        assertTrue(runOnce.hasRun());
+        assertEquals(1, counter.get());
+    }
+
+    public void testRunOnceWithAbstractRunnable() {
+        final AtomicInteger onRun = new AtomicInteger(0);
+        final AtomicInteger onFailure = new AtomicInteger(0);
+        final AtomicInteger onAfter = new AtomicInteger(0);
+
+        final RunOnce runOnce = new RunOnce(new AbstractRunnable() {
+            @Override
+            protected void doRun() throws Exception {
+                onRun.incrementAndGet();
+                throw new RuntimeException("failure");
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                onFailure.incrementAndGet();
+            }
+
+            @Override
+            public void onAfter() {
+                onAfter.incrementAndGet();
+            }
+        });
+
+        final int iterations = randomIntBetween(1, 10);
+        for (int i = 0; i < iterations; i++) {
+            runOnce.run();
+            assertEquals(1, onRun.get());
+            assertEquals(1, onFailure.get());
+            assertEquals(1, onAfter.get());
+            assertTrue(runOnce.hasRun());
+        }
+    }
+}