Explorar o código

Merge pull request #15777 from jasontedor/safer-cluster-state-task-notifications

Safe cluster state task notifications
Jason Tedor %!s(int64=9) %!d(string=hai) anos
pai
achega
d032dabed5

+ 96 - 0
core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java

@@ -313,6 +313,13 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
                                           final ClusterStateTaskExecutor<T> executor,
                                           final ClusterStateTaskListener listener
     ) {
+        innerSubmitStateUpdateTask(source, task, config, executor, safe(listener, logger));
+    }
+
+    private <T> void innerSubmitStateUpdateTask(final String source, final T task,
+                                           final ClusterStateTaskConfig config,
+                                           final ClusterStateTaskExecutor executor,
+                                           final SafeClusterStateTaskListener listener) {
         if (!lifecycle.started()) {
             return;
         }
@@ -640,6 +647,95 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
 
     }
 
+    private static SafeClusterStateTaskListener safe(ClusterStateTaskListener listener, ESLogger logger) {
+        if (listener instanceof AckedClusterStateTaskListener) {
+            return new SafeAckedClusterStateTaskListener((AckedClusterStateTaskListener) listener, logger);
+        } else {
+            return new SafeClusterStateTaskListener(listener, logger);
+        }
+    }
+
+    private static class SafeClusterStateTaskListener implements ClusterStateTaskListener {
+        private final ClusterStateTaskListener listener;
+        private final ESLogger logger;
+
+        public SafeClusterStateTaskListener(ClusterStateTaskListener listener, ESLogger logger) {
+            this.listener = listener;
+            this.logger = logger;
+        }
+
+        @Override
+        public void onFailure(String source, Throwable t) {
+            try {
+                listener.onFailure(source, t);
+            } catch (Exception e) {
+                logger.error("exception thrown by listener notifying of failure [{}] from [{}]", e, t, source);
+            }
+        }
+
+        @Override
+        public void onNoLongerMaster(String source) {
+            try {
+                listener.onNoLongerMaster(source);
+            } catch (Exception e) {
+                logger.error("exception thrown by listener while notifying no longer master from [{}]", e, source);
+            }
+        }
+
+        @Override
+        public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+            try {
+                listener.clusterStateProcessed(source, oldState, newState);
+            } catch (Exception e) {
+                logger.error(
+                    "exception thrown by listener while notifying of cluster state processed from [{}], old cluster state:\n{}\nnew cluster state:\n{}",
+                    e,
+                    source,
+                    oldState.prettyPrint(),
+                    newState.prettyPrint());
+            }
+        }
+    }
+
+    private static class SafeAckedClusterStateTaskListener extends SafeClusterStateTaskListener implements AckedClusterStateTaskListener {
+        private final AckedClusterStateTaskListener listener;
+        private final ESLogger logger;
+
+        public SafeAckedClusterStateTaskListener(AckedClusterStateTaskListener listener, ESLogger logger) {
+            super(listener, logger);
+            this.listener = listener;
+            this.logger = logger;
+        }
+
+        @Override
+        public boolean mustAck(DiscoveryNode discoveryNode) {
+            return listener.mustAck(discoveryNode);
+        }
+
+        @Override
+        public void onAllNodesAcked(@Nullable Throwable t) {
+            try {
+                listener.onAllNodesAcked(t);
+            } catch (Exception e) {
+                logger.error("exception thrown by listener while notifying on all nodes acked [{}]", e, t);
+            }
+        }
+
+        @Override
+        public void onAckTimeout() {
+            try {
+                listener.onAckTimeout();
+            } catch (Exception e) {
+                logger.error("exception thrown by listener while notifying on ack timeout", e);
+            }
+        }
+
+        @Override
+        public TimeValue ackTimeout() {
+            return listener.ackTimeout();
+        }
+    }
+
     class UpdateTask<T> extends SourcePrioritizedRunnable {
 
         public final T task;

+ 53 - 0
core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java

@@ -743,6 +743,59 @@ public class ClusterServiceIT extends ESIntegTestCase {
         }
     }
 
+    /*
+     * test that a listener throwing an exception while handling a
+     * notification does not prevent publication notification to the
+     * executor
+     */
+    public void testClusterStateTaskListenerThrowingExceptionIsOkay() throws InterruptedException {
+        Settings settings = settingsBuilder()
+            .put("discovery.type", "local")
+            .build();
+        internalCluster().startNode(settings);
+        ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        AtomicBoolean published = new AtomicBoolean();
+
+        clusterService.submitStateUpdateTask(
+            "testClusterStateTaskListenerThrowingExceptionIsOkay",
+            new Object(),
+            ClusterStateTaskConfig.build(Priority.NORMAL),
+            new ClusterStateTaskExecutor<Object>() {
+                @Override
+                public boolean runOnlyOnMaster() {
+                    return false;
+                }
+
+                @Override
+                public BatchResult<Object> execute(ClusterState currentState, List<Object> tasks) throws Exception {
+                    ClusterState newClusterState = ClusterState.builder(currentState).build();
+                    return BatchResult.builder().successes(tasks).build(newClusterState);
+                }
+
+                @Override
+                public void clusterStatePublished(ClusterState newClusterState) {
+                    published.set(true);
+                    latch.countDown();
+                }
+            },
+            new ClusterStateTaskListener() {
+                @Override
+                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                    throw new IllegalStateException(source);
+                }
+
+                @Override
+                public void onFailure(String source, Throwable t) {
+                }
+            }
+        );
+
+        latch.await();
+        assertTrue(published.get());
+    }
+
     public void testClusterStateBatchedUpdates() throws InterruptedException {
         Settings settings = settingsBuilder()
                 .put("discovery.type", "local")