Browse Source

Merge pull request #15428 from jasontedor/cluster-state-batch-logging

Explicitly log cluster state update failures
Jason Tedor 10 years ago
parent
commit
89c960b00a

+ 50 - 5
core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java

@@ -20,8 +20,19 @@
 package org.elasticsearch.cluster.service;
 
 import org.elasticsearch.Version;
-import org.elasticsearch.cluster.*;
+import org.elasticsearch.cluster.AckedClusterStateTaskListener;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterService;
+import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterState.Builder;
+import org.elasticsearch.cluster.ClusterStateListener;
+import org.elasticsearch.cluster.ClusterStateTaskConfig;
+import org.elasticsearch.cluster.ClusterStateTaskExecutor;
+import org.elasticsearch.cluster.ClusterStateTaskListener;
+import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.LocalNodeMasterListener;
+import org.elasticsearch.cluster.TimeoutClusterStateListener;
 import org.elasticsearch.cluster.block.ClusterBlock;
 import org.elasticsearch.cluster.block.ClusterBlocks;
 import org.elasticsearch.cluster.metadata.MetaData;
@@ -42,7 +53,13 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.text.StringText;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.*;
+import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.CountDown;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.elasticsearch.common.util.concurrent.FutureUtils;
+import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
+import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;
 import org.elasticsearch.common.util.iterable.Iterables;
 import org.elasticsearch.discovery.Discovery;
 import org.elasticsearch.discovery.DiscoveryService;
@@ -50,8 +67,20 @@ import org.elasticsearch.node.settings.NodeSettingsService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
@@ -292,6 +321,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
             if (config.timeout() != null) {
                 updateTasksExecutor.execute(updateTask, threadPool.scheduler(), config.timeout(), () -> threadPool.generic().execute(() -> {
                     if (updateTask.processed.getAndSet(true) == false) {
+                        logger.debug("cluster state update task [{}] timed out after [{}]", source, config.timeout());
                         listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source));
                     }}));
             } else {
@@ -413,6 +443,15 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
         }
 
         assert batchResult.executionResults != null;
+        assert batchResult.executionResults.size() == toExecute.size()
+            : String.format(Locale.ROOT, "expected [%d] task result%s but was [%d]", toExecute.size(), toExecute.size() == 1 ? "" : "s", batchResult.executionResults.size());
+        boolean assertsEnabled = false;
+        assert (assertsEnabled = true);
+        if (assertsEnabled) {
+            for (UpdateTask<T> updateTask : toExecute) {
+                assert batchResult.executionResults.containsKey(updateTask.task) : "missing task result for [" + updateTask.task + "]";
+            }
+        }
 
         ClusterState newClusterState = batchResult.resultingState;
         final ArrayList<UpdateTask<T>> proccessedListeners = new ArrayList<>();
@@ -421,7 +460,13 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
             assert batchResult.executionResults.containsKey(updateTask.task) : "missing " + updateTask.task.toString();
             final ClusterStateTaskExecutor.TaskResult executionResult =
                     batchResult.executionResults.get(updateTask.task);
-            executionResult.handle(() -> proccessedListeners.add(updateTask), ex -> updateTask.listener.onFailure(updateTask.source, ex));
+            executionResult.handle(
+                () -> proccessedListeners.add(updateTask),
+                ex -> {
+                    logger.debug("cluster state update task [{}] failed", ex, updateTask.source);
+                    updateTask.listener.onFailure(updateTask.source, ex);
+                }
+            );
         }
 
         if (previousClusterState == newClusterState) {