Browse Source

`CoordinatorTests` sometimes needs three term bumps (#79574)

Today we require the cluster to stabilise in a time period that allows
time for the first election to encounter conflicts. However on very rare
occasions there might be an election conflict in the second election
too. This commit extends the stabilisation timeout to allow for this.
Similar to #64462 Closes #78370
David Turner 4 years ago
parent
commit
cca3a4cdde

+ 1 - 1
server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java

@@ -59,7 +59,7 @@ public class InternalClusterInfoServiceSchedulingTests extends ESTestCase {
         final ClusterApplierService clusterApplierService = new ClusterApplierService("test", settings, clusterSettings, threadPool) {
         final ClusterApplierService clusterApplierService = new ClusterApplierService("test", settings, clusterSettings, threadPool) {
             @Override
             @Override
             protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
             protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
-                return new MockSinglePrioritizingExecutor("mock-executor", deterministicTaskQueue, threadPool);
+                return new MockSinglePrioritizingExecutor("mock-executor", "", deterministicTaskQueue, threadPool);
             }
             }
         };
         };
 
 

+ 1 - 1
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -1628,7 +1628,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                     new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) {
                     new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) {
                         @Override
                         @Override
                         protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
                         protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
-                            return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue, threadPool);
+                            return new MockSinglePrioritizingExecutor(node.getName(), node.getId(), deterministicTaskQueue, threadPool);
                         }
                         }
 
 
                         @Override
                         @Override

+ 7 - 1
test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

@@ -236,6 +236,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
             + DEFAULT_ELECTION_DELAY
             + DEFAULT_ELECTION_DELAY
             // perhaps there is an election collision requiring another publication (which times out) and a term bump
             // perhaps there is an election collision requiring another publication (which times out) and a term bump
             + defaultMillis(PUBLISH_TIMEOUT_SETTING) + DEFAULT_ELECTION_DELAY
             + defaultMillis(PUBLISH_TIMEOUT_SETTING) + DEFAULT_ELECTION_DELAY
+            // very rarely there is another election collision requiring another publication (which times out) and a term bump
+            + defaultMillis(PUBLISH_TIMEOUT_SETTING) + DEFAULT_ELECTION_DELAY
             // then wait for the new leader to notice that the old leader is unresponsive
             // then wait for the new leader to notice that the old leader is unresponsive
             + (defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING)) * defaultInt(
             + (defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING)) * defaultInt(
                 FOLLOWER_CHECK_RETRY_COUNT_SETTING
                 FOLLOWER_CHECK_RETRY_COUNT_SETTING
@@ -1165,6 +1167,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                 final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
                 final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
                 clusterApplierService = new DisruptableClusterApplierService(
                 clusterApplierService = new DisruptableClusterApplierService(
                     localNode.getId(),
                     localNode.getId(),
+                    localNode.getEphemeralId(),
                     settings,
                     settings,
                     clusterSettings,
                     clusterSettings,
                     deterministicTaskQueue,
                     deterministicTaskQueue,
@@ -1612,6 +1615,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
 
 
     static class DisruptableClusterApplierService extends ClusterApplierService {
     static class DisruptableClusterApplierService extends ClusterApplierService {
         private final String nodeName;
         private final String nodeName;
+        private final String nodeId;
         private final DeterministicTaskQueue deterministicTaskQueue;
         private final DeterministicTaskQueue deterministicTaskQueue;
         private final ThreadPool threadPool;
         private final ThreadPool threadPool;
         ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED;
         ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED;
@@ -1619,6 +1623,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
 
 
         DisruptableClusterApplierService(
         DisruptableClusterApplierService(
             String nodeName,
             String nodeName,
+            String nodeId,
             Settings settings,
             Settings settings,
             ClusterSettings clusterSettings,
             ClusterSettings clusterSettings,
             DeterministicTaskQueue deterministicTaskQueue,
             DeterministicTaskQueue deterministicTaskQueue,
@@ -1626,6 +1631,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
         ) {
         ) {
             super(nodeName, settings, clusterSettings, threadPool);
             super(nodeName, settings, clusterSettings, threadPool);
             this.nodeName = nodeName;
             this.nodeName = nodeName;
+            this.nodeId = nodeId;
             this.deterministicTaskQueue = deterministicTaskQueue;
             this.deterministicTaskQueue = deterministicTaskQueue;
             this.threadPool = threadPool;
             this.threadPool = threadPool;
             addStateApplier(event -> {
             addStateApplier(event -> {
@@ -1648,7 +1654,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
 
 
         @Override
         @Override
         protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
         protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
-            return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue, threadPool);
+            return new MockSinglePrioritizingExecutor(nodeName, nodeId, deterministicTaskQueue, threadPool);
         }
         }
 
 
         @Override
         @Override

+ 16 - 3
test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java

@@ -7,6 +7,7 @@
  */
  */
 package org.elasticsearch.cluster.coordination;
 package org.elasticsearch.cluster.coordination;
 
 
+import org.apache.logging.log4j.CloseableThreadContext;
 import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
 import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
 import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
@@ -14,20 +15,32 @@ import org.elasticsearch.threadpool.ThreadPool;
 
 
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
+import static org.elasticsearch.common.util.concurrent.DeterministicTaskQueue.NODE_ID_LOG_CONTEXT_KEY;
+
 /**
 /**
  * Mock single threaded {@link PrioritizedEsThreadPoolExecutor} based on {@link DeterministicTaskQueue},
  * Mock single threaded {@link PrioritizedEsThreadPoolExecutor} based on {@link DeterministicTaskQueue},
  * simulating the behaviour of an executor returned by {@link EsExecutors#newSinglePrioritizing}.
  * simulating the behaviour of an executor returned by {@link EsExecutors#newSinglePrioritizing}.
  */
  */
 public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecutor {
 public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecutor {
 
 
-    public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
-        super(name, 0, 1, 0L, TimeUnit.MILLISECONDS, r -> new Thread() {
+    public MockSinglePrioritizingExecutor(
+        String nodeName,
+        String nodeId,
+        DeterministicTaskQueue deterministicTaskQueue,
+        ThreadPool threadPool
+    ) {
+        super(nodeName, 0, 1, 0L, TimeUnit.MILLISECONDS, r -> new Thread() {
             @Override
             @Override
             public void start() {
             public void start() {
                 deterministicTaskQueue.scheduleNow(new Runnable() {
                 deterministicTaskQueue.scheduleNow(new Runnable() {
                     @Override
                     @Override
                     public void run() {
                     public void run() {
-                        try {
+                        try (
+                            CloseableThreadContext.Instance ignored = CloseableThreadContext.put(
+                                NODE_ID_LOG_CONTEXT_KEY,
+                                '{' + nodeName + "}{" + nodeId + '}'
+                            )
+                        ) {
                             r.run();
                             r.run();
                         } catch (KillWorkerError kwe) {
                         } catch (KillWorkerError kwe) {
                             // hacks everywhere
                             // hacks everywhere

+ 6 - 1
test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java

@@ -20,7 +20,12 @@ public class MockSinglePrioritizingExecutorTests extends ESTestCase {
     public void testPrioritizedEsThreadPoolExecutor() {
     public void testPrioritizedEsThreadPoolExecutor() {
         final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
         final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
 
 
-        final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue, taskQueue.getThreadPool());
+        final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor(
+            "test",
+            "",
+            taskQueue,
+            taskQueue.getThreadPool()
+        );
         final AtomicBoolean called1 = new AtomicBoolean();
         final AtomicBoolean called1 = new AtomicBoolean();
         final AtomicBoolean called2 = new AtomicBoolean();
         final AtomicBoolean called2 = new AtomicBoolean();
         executor.execute(new PrioritizedRunnable(Priority.NORMAL) {
         executor.execute(new PrioritizedRunnable(Priority.NORMAL) {