1
0
Эх сурвалжийг харах

Add LifecycleRunnable

I've noticed throughout the code that we have a need to remove the boilerplate lifecycle check when starting/rescheduling certain runnables. This provides a simpler implementation to get this functionality without duplicating it.
Chris Earle 9 жил өмнө
parent
commit
8fa0d8e905

+ 108 - 0
core/src/main/java/org/elasticsearch/common/util/concurrent/AbstractLifecycleRunnable.java

@@ -0,0 +1,108 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.common.util.concurrent;
+
+import org.elasticsearch.common.component.Lifecycle;
+import org.elasticsearch.common.logging.ESLogger;
+
+import java.util.Objects;
+
+/**
+ * {@code AbstractLifecycleRunnable} is a service-lifecycle aware {@link AbstractRunnable}.
+ * <p>
+ * This simplifies the running and rescheduling of {@link Lifecycle}-based {@code Runnable}s.
+ */
+public abstract class AbstractLifecycleRunnable extends AbstractRunnable {
+    /**
+     * The monitored lifecycle for the associated service.
+     */
+    private final Lifecycle lifecycle;
+    /**
+     * The service's logger (note: this is passed in!).
+     */
+    private final ESLogger logger;
+
+    /**
+     * {@link AbstractLifecycleRunnable} must be aware of the actual {@code lifecycle} to react properly.
+     *
+     * @param lifecycle The lifecycle to react too
+     * @param logger The logger to use when logging
+     * @throws NullPointerException if any parameter is {@code null}
+     */
+    public AbstractLifecycleRunnable(Lifecycle lifecycle, ESLogger logger) {
+        Objects.requireNonNull(lifecycle, "lifecycle must not be null");
+        Objects.requireNonNull(logger, "logger must not be null");
+
+        this.lifecycle = lifecycle;
+        this.logger = logger;
+    }
+
+    /**
+     * {@inheritDoc}
+     * <p>
+     * This invokes {@link #doRunInLifecycle()} <em>only</em> if the {@link #lifecycle} is not stopped or closed. Otherwise it exits
+     * immediately.
+     */
+    @Override
+    protected final void doRun() throws Exception {
+        // prevent execution if the service is stopped
+        if (lifecycle.stoppedOrClosed()) {
+            logger.trace("service is stopping. exiting");
+            return;
+        }
+
+        doRunInLifecycle();
+    }
+
+    /**
+     * Perform runnable logic, but only if the {@link #lifecycle} is <em>not</em> stopped or closed.
+     *
+     * @throws InterruptedException if the run method throws an {@link InterruptedException}
+     */
+    protected abstract void doRunInLifecycle() throws Exception;
+
+    /**
+     * {@inheritDoc}
+     * <p>
+     * This overrides the default behavior of {@code onAfter} to add the caveat that it only runs if the {@link #lifecycle} is <em>not</em>
+     * stopped or closed.
+     * <p>
+     * Note: this does not guarantee that it won't be stopped concurrently as it invokes {@link #onAfterInLifecycle()},
+     * but it's a solid attempt at preventing it. For those that use this for rescheduling purposes, the next invocation would be
+     * effectively cancelled immediately if that's the case.
+     *
+     * @see #onAfterInLifecycle()
+     */
+    @Override
+    public final void onAfter() {
+        if (lifecycle.stoppedOrClosed() == false) {
+            onAfterInLifecycle();
+        }
+    }
+
+    /**
+     * This method is invoked in the finally block of the run method, but it is only executed if the {@link #lifecycle} is <em>not</em>
+     * stopped or closed.
+     * <p>
+     * This method is most useful for rescheduling the next iteration of the current runnable.
+     */
+    protected void onAfterInLifecycle() {
+        // nothing by default
+    }
+}

+ 11 - 6
core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java

@@ -53,7 +53,7 @@ import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.common.util.concurrent.AbstractRunnable;
+import org.elasticsearch.common.util.concurrent.AbstractLifecycleRunnable;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.KeyedLock;
 import org.elasticsearch.monitor.jvm.JvmInfo;
@@ -1342,16 +1342,17 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
         }
     }
 
-    class ScheduledPing extends AbstractRunnable {
+    class ScheduledPing extends AbstractLifecycleRunnable {
 
         final CounterMetric successfulPings = new CounterMetric();
         final CounterMetric failedPings = new CounterMetric();
 
+        public ScheduledPing() {
+            super(lifecycle, logger);
+        }
+
         @Override
-        protected void doRun() throws Exception {
-            if (lifecycle.stoppedOrClosed()) {
-                return;
-            }
+        protected void doRunInLifecycle() throws Exception {
             for (Map.Entry<DiscoveryNode, NodeChannels> entry : connectedNodes.entrySet()) {
                 DiscoveryNode node = entry.getKey();
                 NodeChannels channels = entry.getValue();
@@ -1374,6 +1375,10 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
                     }
                 }
             }
+        }
+
+        @Override
+        protected void onAfterInLifecycle() {
             threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, this);
         }
 

+ 168 - 0
core/src/test/java/org/elasticsearch/common/util/concurrent/AbstractLifecycleRunnableTests.java

@@ -0,0 +1,168 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.common.util.concurrent;
+
+import org.elasticsearch.common.component.Lifecycle;
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.test.ESTestCase;
+
+import org.mockito.InOrder;
+
+import java.util.concurrent.Callable;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests {@link AbstractLifecycleRunnable}.
+ */
+public class AbstractLifecycleRunnableTests extends ESTestCase {
+    private final Lifecycle lifecycle = mock(Lifecycle.class);
+    private final ESLogger logger = mock(ESLogger.class);
+
+    public void testDoRunOnlyRunsWhenNotStoppedOrClosed() throws Exception {
+        Callable<?> runCallable = mock(Callable.class);
+
+        // it's "not stopped or closed"
+        when(lifecycle.stoppedOrClosed()).thenReturn(false);
+
+        AbstractLifecycleRunnable runnable = new AbstractLifecycleRunnable(lifecycle, logger) {
+            @Override
+            public void onFailure(Throwable t) {
+                fail("It should not fail");
+            }
+
+            @Override
+            protected void doRunInLifecycle() throws Exception {
+                runCallable.call();
+            }
+        };
+
+        runnable.run();
+
+        InOrder inOrder = inOrder(lifecycle, logger, runCallable);
+
+        inOrder.verify(lifecycle).stoppedOrClosed();
+        inOrder.verify(runCallable).call();
+        inOrder.verify(lifecycle).stoppedOrClosed(); // onAfter uses it too, but we're not testing it here
+        inOrder.verifyNoMoreInteractions();
+    }
+
+    public void testDoRunDoesNotRunWhenStoppedOrClosed() throws Exception {
+        Callable<?> runCallable = mock(Callable.class);
+
+        // it's stopped or closed
+        when(lifecycle.stoppedOrClosed()).thenReturn(true);
+
+        AbstractLifecycleRunnable runnable = new AbstractLifecycleRunnable(lifecycle, logger) {
+            @Override
+            public void onFailure(Throwable t) {
+                fail("It should not fail");
+            }
+
+            @Override
+            protected void doRunInLifecycle() throws Exception {
+                fail("Should not run with lifecycle stopped or closed.");
+            }
+        };
+
+        runnable.run();
+
+        InOrder inOrder = inOrder(lifecycle, logger, runCallable);
+
+        inOrder.verify(lifecycle).stoppedOrClosed();
+        inOrder.verify(logger).trace(anyString());
+        inOrder.verify(lifecycle).stoppedOrClosed(); // onAfter uses it too, but we're not testing it here
+        inOrder.verifyNoMoreInteractions();
+    }
+
+    public void testOnAfterOnlyWhenNotStoppedOrClosed() throws Exception {
+        Callable<?> runCallable = mock(Callable.class);
+        Callable<?> afterCallable = mock(Callable.class);
+
+        // it's "not stopped or closed"
+        when(lifecycle.stoppedOrClosed()).thenReturn(false);
+
+        AbstractLifecycleRunnable runnable = new AbstractLifecycleRunnable(lifecycle, logger) {
+            @Override
+            public void onFailure(Throwable t) {
+                fail("It should not fail");
+            }
+
+            @Override
+            protected void doRunInLifecycle() throws Exception {
+                runCallable.call();
+            }
+
+            @Override
+            protected void onAfterInLifecycle() {
+                try {
+                    afterCallable.call();
+                }
+                catch (Exception e) {
+                    fail("Unexpected for mock.");
+                }
+            }
+        };
+
+        runnable.run();
+
+        InOrder inOrder = inOrder(lifecycle, logger, runCallable, afterCallable);
+
+        inOrder.verify(lifecycle).stoppedOrClosed();
+        inOrder.verify(runCallable).call();
+        inOrder.verify(lifecycle).stoppedOrClosed();
+        inOrder.verify(afterCallable).call();
+        inOrder.verifyNoMoreInteractions();
+    }
+
+    public void testOnAfterDoesNotHappenWhenStoppedOrClosed() throws Exception {
+        Callable<?> runCallable = mock(Callable.class);
+
+        // it's stopped or closed
+        when(lifecycle.stoppedOrClosed()).thenReturn(true);
+
+        AbstractLifecycleRunnable runnable = new AbstractLifecycleRunnable(lifecycle, logger) {
+            @Override
+            public void onFailure(Throwable t) {
+                fail("It should not fail");
+            }
+
+            @Override
+            protected void doRunInLifecycle() throws Exception {
+                fail("Should not run with lifecycle stopped or closed.");
+            }
+
+            @Override
+            protected void onAfterInLifecycle() {
+                fail("Should not run with lifecycle stopped or closed.");
+            }
+        };
+
+        runnable.run();
+
+        InOrder inOrder = inOrder(lifecycle, runCallable);
+
+        inOrder.verify(lifecycle, times(2)).stoppedOrClosed();
+        inOrder.verifyNoMoreInteractions();
+    }
+}

+ 180 - 0
core/src/test/java/org/elasticsearch/common/util/concurrent/AbstractRunnableTests.java

@@ -0,0 +1,180 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.common.util.concurrent;
+
+import org.elasticsearch.test.ESTestCase;
+
+import org.mockito.InOrder;
+
+import java.util.concurrent.Callable;
+
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests {@link AbstractRunnable}
+ */
+public class AbstractRunnableTests extends ESTestCase {
+    public void testRunSuccess() throws Exception {
+        Callable<?> runCallable = mock(Callable.class);
+
+        AbstractRunnable runnable = new AbstractRunnable() {
+            @Override
+            public void onFailure(Throwable t) {
+                fail("It should not fail");
+            }
+
+            @Override
+            protected void doRun() throws Exception {
+                runCallable.call();
+            }
+        };
+
+        runnable.run();
+
+        verify(runCallable).call();
+    }
+
+    public void testRunFailure() throws Exception {
+        RuntimeException exception = new RuntimeException();
+
+        AbstractRunnable runnable = new AbstractRunnable() {
+            @Override
+            public void onFailure(Throwable t) {
+                assertSame(exception, t);
+            }
+
+            @Override
+            protected void doRun() throws Exception {
+                throw exception;
+            }
+        };
+
+        runnable.run();
+    }
+
+    public void testOnAfterSuccess() throws Exception {
+        Callable<?> runCallable = mock(Callable.class);
+        Callable<?> afterCallable = mock(Callable.class);
+
+        AbstractRunnable runnable = new AbstractRunnable() {
+            @Override
+            public void onFailure(Throwable t) {
+                fail("It should not fail");
+            }
+
+            @Override
+            protected void doRun() throws Exception {
+                runCallable.call();
+            }
+
+            @Override
+            public void onAfter() {
+                try {
+                    afterCallable.call();
+                }
+                catch (Exception e) {
+                    fail("Unexpected for mock.");
+                }
+            }
+        };
+
+        runnable.run();
+
+        InOrder inOrder = inOrder(runCallable, afterCallable);
+
+        inOrder.verify(runCallable).call();
+        inOrder.verify(afterCallable).call();
+
+    }
+
+    public void testOnAfterFailure() throws Exception {
+        RuntimeException exception = new RuntimeException();
+        Callable<?> afterCallable = mock(Callable.class);
+
+        AbstractRunnable runnable = new AbstractRunnable() {
+            @Override
+            public void onFailure(Throwable t) {
+                assertSame(exception, t);
+            }
+
+            @Override
+            protected void doRun() throws Exception {
+                throw exception;
+            }
+
+            @Override
+            public void onAfter() {
+                try {
+                    afterCallable.call();
+                }
+                catch (Exception e) {
+                    fail("Unexpected for mock.");
+                }
+            }
+        };
+
+        runnable.run();
+
+        verify(afterCallable).call();
+    }
+
+    public void testOnRejection() throws Exception {
+        RuntimeException exception = new RuntimeException();
+        Callable<?> failureCallable = mock(Callable.class);
+
+        AbstractRunnable runnable = new AbstractRunnable() {
+            @Override
+            public void onFailure(Throwable t) {
+                assertSame(exception, t);
+
+                try {
+                    failureCallable.call();
+                }
+                catch (Exception e) {
+                    fail("Unexpected for mock.");
+                }
+            }
+
+            @Override
+            protected void doRun() throws Exception {
+                fail("Not tested");
+            }
+        };
+
+        runnable.onRejection(exception);
+    }
+
+    public void testIsForceExecutuonDefaultsFalse() {
+        AbstractRunnable runnable = new AbstractRunnable() {
+            @Override
+            public void onFailure(Throwable t) {
+                fail("Not tested");
+            }
+
+            @Override
+            protected void doRun() throws Exception {
+                fail("Not tested");
+            }
+        };
+
+        assertFalse(runnable.isForceExecution());
+    }
+}