Browse Source

Merge pull request #15494 from jasontedor/cluster-state-published-callback

Add callback for publication of new cluster state
Jason Tedor 9 years ago
parent
commit
1bc196c387

+ 7 - 0
core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java

@@ -37,6 +37,13 @@ public interface ClusterStateTaskExecutor<T> {
         return true;
         return true;
     }
     }
 
 
+    /**
+     * Callback invoked after new cluster state is published. Note that
+     * this method is not invoked if the cluster state was not updated.
+     */
+    default void clusterStatePublished(ClusterState newClusterState) {
+    }
+
     /**
     /**
      * Represents the result of a batched execution of cluster state update tasks
      * Represents the result of a batched execution of cluster state update tasks
      * @param <T> the type of the cluster state update task
      * @param <T> the type of the cluster state update task

+ 2 - 0
core/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java

@@ -605,6 +605,8 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
                 task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
                 task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
             }
             }
 
 
+            executor.clusterStatePublished(newClusterState);
+
             TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
             TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
             logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {}, uuid: {})", source, executionTime, newClusterState.version(), newClusterState.stateUUID());
             logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {}, uuid: {})", source, executionTime, newClusterState.version(), newClusterState.stateUUID());
             warnAboutSlowTaskIfNeeded(executionTime, source);
             warnAboutSlowTaskIfNeeded(executionTime, source);

+ 27 - 3
core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java

@@ -43,7 +43,14 @@ import org.elasticsearch.test.MockLogAppender;
 import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CountDownLatch;
@@ -53,7 +60,11 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 
 import static org.elasticsearch.common.settings.Settings.settingsBuilder;
 import static org.elasticsearch.common.settings.Settings.settingsBuilder;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
-import static org.hamcrest.Matchers.*;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 
 
 /**
 /**
  *
  *
@@ -753,18 +764,30 @@ public class ClusterServiceIT extends ESIntegTestCase {
 
 
         class TaskExecutor implements ClusterStateTaskExecutor<Task> {
         class TaskExecutor implements ClusterStateTaskExecutor<Task> {
             private AtomicInteger counter = new AtomicInteger();
             private AtomicInteger counter = new AtomicInteger();
+            private AtomicInteger batches = new AtomicInteger();
+            private AtomicInteger published = new AtomicInteger();
 
 
             @Override
             @Override
             public BatchResult<Task> execute(ClusterState currentState, List<Task> tasks) throws Exception {
             public BatchResult<Task> execute(ClusterState currentState, List<Task> tasks) throws Exception {
                 tasks.forEach(task -> task.execute());
                 tasks.forEach(task -> task.execute());
                 counter.addAndGet(tasks.size());
                 counter.addAndGet(tasks.size());
-                return BatchResult.<Task>builder().successes(tasks).build(currentState);
+                ClusterState maybeUpdatedClusterState = currentState;
+                if (randomBoolean()) {
+                    maybeUpdatedClusterState = ClusterState.builder(currentState).build();
+                    batches.incrementAndGet();
+                }
+                return BatchResult.<Task>builder().successes(tasks).build(maybeUpdatedClusterState);
             }
             }
 
 
             @Override
             @Override
             public boolean runOnlyOnMaster() {
             public boolean runOnlyOnMaster() {
                 return false;
                 return false;
             }
             }
+
+            @Override
+            public void clusterStatePublished(ClusterState newClusterState) {
+                published.incrementAndGet();
+            }
         }
         }
         int numberOfThreads = randomIntBetween(2, 8);
         int numberOfThreads = randomIntBetween(2, 8);
         int tasksSubmittedPerThread = randomIntBetween(1, 1024);
         int tasksSubmittedPerThread = randomIntBetween(1, 1024);
@@ -838,6 +861,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
         for (TaskExecutor executor : executors) {
         for (TaskExecutor executor : executors) {
             if (counts.containsKey(executor)) {
             if (counts.containsKey(executor)) {
                 assertEquals((int) counts.get(executor), executor.counter.get());
                 assertEquals((int) counts.get(executor), executor.counter.get());
+                assertEquals(executor.batches.get(), executor.published.get());
             }
             }
         }
         }