|
@@ -7,10 +7,13 @@
|
|
|
*/
|
|
|
package org.elasticsearch.cluster.coordination;
|
|
|
|
|
|
+import org.apache.logging.log4j.Level;
|
|
|
import org.elasticsearch.Version;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
+import org.elasticsearch.action.support.PlainActionFuture;
|
|
|
import org.elasticsearch.cluster.ClusterName;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
+import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
|
|
import org.elasticsearch.cluster.NotMasterException;
|
|
|
import org.elasticsearch.cluster.block.ClusterBlocks;
|
|
|
import org.elasticsearch.cluster.metadata.DesiredNodeWithStatus;
|
|
@@ -24,16 +27,22 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|
|
import org.elasticsearch.cluster.routing.RerouteService;
|
|
|
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
|
|
import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils;
|
|
|
+import org.elasticsearch.common.Priority;
|
|
|
import org.elasticsearch.common.UUIDs;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
+import org.elasticsearch.test.ClusterServiceUtils;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
|
+import org.elasticsearch.test.MockLogAppender;
|
|
|
import org.elasticsearch.test.VersionUtils;
|
|
|
+import org.elasticsearch.threadpool.TestThreadPool;
|
|
|
+import org.elasticsearch.threadpool.ThreadPool;
|
|
|
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
|
import static org.elasticsearch.cluster.metadata.DesiredNodesTestCase.assertDesiredNodesStatusIsCorrect;
|
|
@@ -54,7 +63,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
|
-public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
+public class NodeJoinExecutorTests extends ESTestCase {
|
|
|
|
|
|
private static final ActionListener<Void> NOT_COMPLETED_LISTENER = ActionListener.wrap(
|
|
|
() -> { throw new AssertionError("should not complete publication"); }
|
|
@@ -70,11 +79,11 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
.build();
|
|
|
metaBuilder.put(indexMetadata, false);
|
|
|
Metadata metadata = metaBuilder.build();
|
|
|
- JoinTaskExecutor.ensureIndexCompatibility(Version.CURRENT, metadata);
|
|
|
+ NodeJoinExecutor.ensureIndexCompatibility(Version.CURRENT, metadata);
|
|
|
|
|
|
expectThrows(
|
|
|
IllegalStateException.class,
|
|
|
- () -> JoinTaskExecutor.ensureIndexCompatibility(VersionUtils.getPreviousVersion(Version.CURRENT), metadata)
|
|
|
+ () -> NodeJoinExecutor.ensureIndexCompatibility(VersionUtils.getPreviousVersion(Version.CURRENT), metadata)
|
|
|
);
|
|
|
}
|
|
|
|
|
@@ -88,7 +97,7 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
.build();
|
|
|
metaBuilder.put(indexMetadata, false);
|
|
|
Metadata metadata = metaBuilder.build();
|
|
|
- expectThrows(IllegalStateException.class, () -> JoinTaskExecutor.ensureIndexCompatibility(Version.CURRENT, metadata));
|
|
|
+ expectThrows(IllegalStateException.class, () -> NodeJoinExecutor.ensureIndexCompatibility(Version.CURRENT, metadata));
|
|
|
}
|
|
|
|
|
|
public void testPreventJoinClusterWithUnsupportedNodeVersions() {
|
|
@@ -104,9 +113,9 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
final Version tooLow = Version.fromId(maxNodeVersion.minimumCompatibilityVersion().id - 100);
|
|
|
expectThrows(IllegalStateException.class, () -> {
|
|
|
if (randomBoolean()) {
|
|
|
- JoinTaskExecutor.ensureNodesCompatibility(tooLow, nodes);
|
|
|
+ NodeJoinExecutor.ensureNodesCompatibility(tooLow, nodes);
|
|
|
} else {
|
|
|
- JoinTaskExecutor.ensureNodesCompatibility(tooLow, minNodeVersion, maxNodeVersion);
|
|
|
+ NodeJoinExecutor.ensureNodesCompatibility(tooLow, minNodeVersion, maxNodeVersion);
|
|
|
}
|
|
|
});
|
|
|
|
|
@@ -114,7 +123,7 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
v -> v.onOrAfter(minNodeVersion),
|
|
|
() -> rarely() ? Version.fromId(minNodeVersion.id - 1) : randomVersion(random())
|
|
|
);
|
|
|
- expectThrows(IllegalStateException.class, () -> JoinTaskExecutor.ensureVersionBarrier(oldVersion, minNodeVersion));
|
|
|
+ expectThrows(IllegalStateException.class, () -> NodeJoinExecutor.ensureVersionBarrier(oldVersion, minNodeVersion));
|
|
|
|
|
|
final Version minGoodVersion = maxNodeVersion.major == minNodeVersion.major ?
|
|
|
// we have to stick with the same major
|
|
@@ -122,9 +131,9 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
final Version justGood = randomVersionBetween(random(), minGoodVersion, maxCompatibleVersion(minNodeVersion));
|
|
|
|
|
|
if (randomBoolean()) {
|
|
|
- JoinTaskExecutor.ensureNodesCompatibility(justGood, nodes);
|
|
|
+ NodeJoinExecutor.ensureNodesCompatibility(justGood, nodes);
|
|
|
} else {
|
|
|
- JoinTaskExecutor.ensureNodesCompatibility(justGood, minNodeVersion, maxNodeVersion);
|
|
|
+ NodeJoinExecutor.ensureNodesCompatibility(justGood, minNodeVersion, maxNodeVersion);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -144,7 +153,7 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
.build();
|
|
|
metaBuilder.put(indexMetadata, false);
|
|
|
Metadata metadata = metaBuilder.build();
|
|
|
- JoinTaskExecutor.ensureIndexCompatibility(Version.CURRENT, metadata);
|
|
|
+ NodeJoinExecutor.ensureIndexCompatibility(Version.CURRENT, metadata);
|
|
|
}
|
|
|
|
|
|
public static Settings.Builder randomCompatibleVersionSettings() {
|
|
@@ -165,6 +174,8 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
return VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumIndexCompatibilityVersion(), Version.CURRENT);
|
|
|
}
|
|
|
|
|
|
+ private static final String TEST_REASON = "test";
|
|
|
+
|
|
|
public void testUpdatesNodeWithNewRoles() throws Exception {
|
|
|
// Node roles vary by version, and new roles are suppressed for BWC. This means we can receive a join from a node that's already
|
|
|
// in the cluster but with a different set of roles: the node didn't change roles, but the cluster state came via an older master.
|
|
@@ -174,7 +185,7 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
|
|
|
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
|
|
|
|
|
|
- final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService);
|
|
|
+ final NodeJoinExecutor executor = new NodeJoinExecutor(allocationService, rerouteService);
|
|
|
|
|
|
final DiscoveryNode masterNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);
|
|
|
|
|
@@ -196,8 +207,8 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
|
|
|
final var resultingState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
|
|
|
clusterState,
|
|
|
- joinTaskExecutor,
|
|
|
- List.of(JoinTask.singleNode(actualNode, "test", NOT_COMPLETED_LISTENER, 0L))
|
|
|
+ executor,
|
|
|
+ List.of(JoinTask.singleNode(actualNode, TEST_REASON, NOT_COMPLETED_LISTENER, 0L))
|
|
|
);
|
|
|
|
|
|
assertThat(resultingState.getNodes().get(actualNode.getId()).getRoles(), equalTo(actualNode.getRoles()));
|
|
@@ -208,7 +219,7 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
|
|
|
|
|
|
final long executorTerm = randomLongBetween(0L, Long.MAX_VALUE - 1);
|
|
|
- final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService);
|
|
|
+ final var executor = new NodeJoinExecutor(allocationService, rerouteService);
|
|
|
|
|
|
final var masterNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT);
|
|
|
final var clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
|
@@ -225,12 +236,12 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
NotMasterException.class,
|
|
|
() -> ClusterStateTaskExecutorUtils.executeHandlingResults(
|
|
|
clusterState,
|
|
|
- joinTaskExecutor,
|
|
|
+ executor,
|
|
|
randomBoolean()
|
|
|
- ? List.of(JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER, executorTerm))
|
|
|
+ ? List.of(JoinTask.singleNode(masterNode, TEST_REASON, NOT_COMPLETED_LISTENER, executorTerm))
|
|
|
: List.of(
|
|
|
JoinTask.completingElection(
|
|
|
- Stream.of(new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER)),
|
|
|
+ Stream.of(new JoinTask.NodeJoinTask(masterNode, TEST_REASON, NOT_COMPLETED_LISTENER)),
|
|
|
executorTerm
|
|
|
)
|
|
|
),
|
|
@@ -247,7 +258,7 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
|
|
|
|
|
|
final long executorTerm = randomNonNegativeLong();
|
|
|
- final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService);
|
|
|
+ final var executor = new NodeJoinExecutor(allocationService, rerouteService);
|
|
|
|
|
|
final var masterNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT);
|
|
|
final var localNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT);
|
|
@@ -272,12 +283,12 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
NotMasterException.class,
|
|
|
() -> ClusterStateTaskExecutorUtils.executeHandlingResults(
|
|
|
clusterState,
|
|
|
- joinTaskExecutor,
|
|
|
+ executor,
|
|
|
randomBoolean()
|
|
|
- ? List.of(JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER, executorTerm))
|
|
|
+ ? List.of(JoinTask.singleNode(masterNode, TEST_REASON, NOT_COMPLETED_LISTENER, executorTerm))
|
|
|
: List.of(
|
|
|
JoinTask.completingElection(
|
|
|
- Stream.of(new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER)),
|
|
|
+ Stream.of(new JoinTask.NodeJoinTask(masterNode, TEST_REASON, NOT_COMPLETED_LISTENER)),
|
|
|
executorTerm
|
|
|
)
|
|
|
),
|
|
@@ -294,7 +305,7 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
|
|
|
|
|
|
final long executorTerm = randomNonNegativeLong();
|
|
|
- final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService);
|
|
|
+ final var executor = new NodeJoinExecutor(allocationService, rerouteService);
|
|
|
|
|
|
final var masterNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);
|
|
|
final var clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
|
@@ -311,8 +322,8 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
NotMasterException.class,
|
|
|
() -> ClusterStateTaskExecutorUtils.executeHandlingResults(
|
|
|
clusterState,
|
|
|
- joinTaskExecutor,
|
|
|
- List.of(JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER, executorTerm)),
|
|
|
+ executor,
|
|
|
+ List.of(JoinTask.singleNode(masterNode, TEST_REASON, NOT_COMPLETED_LISTENER, executorTerm)),
|
|
|
t -> fail("should not succeed"),
|
|
|
(t, e) -> assertThat(e, instanceOf(NotMasterException.class))
|
|
|
)
|
|
@@ -326,7 +337,7 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
|
|
|
|
|
|
final long executorTerm = randomLongBetween(1, Long.MAX_VALUE);
|
|
|
- final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService);
|
|
|
+ final var executor = new NodeJoinExecutor(allocationService, rerouteService);
|
|
|
|
|
|
final var masterNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT);
|
|
|
final var otherNodeOld = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT);
|
|
@@ -352,12 +363,12 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
.build()
|
|
|
)
|
|
|
.build(),
|
|
|
- joinTaskExecutor,
|
|
|
+ executor,
|
|
|
List.of(
|
|
|
JoinTask.completingElection(
|
|
|
Stream.of(
|
|
|
- new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER),
|
|
|
- new JoinTask.NodeJoinTask(otherNodeNew, "test", NOT_COMPLETED_LISTENER)
|
|
|
+ new JoinTask.NodeJoinTask(masterNode, TEST_REASON, NOT_COMPLETED_LISTENER),
|
|
|
+ new JoinTask.NodeJoinTask(otherNodeNew, TEST_REASON, NOT_COMPLETED_LISTENER)
|
|
|
),
|
|
|
executorTerm
|
|
|
)
|
|
@@ -376,10 +387,10 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
"existing node should not be replaced if not completing an election",
|
|
|
ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
|
|
|
afterElectionClusterState,
|
|
|
- joinTaskExecutor,
|
|
|
+ executor,
|
|
|
List.of(
|
|
|
- JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER, executorTerm),
|
|
|
- JoinTask.singleNode(otherNodeOld, "test", NOT_COMPLETED_LISTENER, executorTerm)
|
|
|
+ JoinTask.singleNode(masterNode, TEST_REASON, NOT_COMPLETED_LISTENER, executorTerm),
|
|
|
+ JoinTask.singleNode(otherNodeOld, TEST_REASON, NOT_COMPLETED_LISTENER, executorTerm)
|
|
|
)
|
|
|
).nodes().get(otherNodeNew.getId()).getEphemeralId(),
|
|
|
equalTo(otherNodeNew.getEphemeralId())
|
|
@@ -391,7 +402,7 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
|
|
|
|
|
|
final long executorTerm = randomLongBetween(1, Long.MAX_VALUE);
|
|
|
- final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService);
|
|
|
+ final var executor = new NodeJoinExecutor(allocationService, rerouteService);
|
|
|
|
|
|
final var masterNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT);
|
|
|
final var otherNode = new DiscoveryNode(
|
|
@@ -429,12 +440,12 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
if (randomBoolean()) {
|
|
|
clusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
|
|
|
clusterState,
|
|
|
- joinTaskExecutor,
|
|
|
+ executor,
|
|
|
List.of(
|
|
|
JoinTask.completingElection(
|
|
|
Stream.of(
|
|
|
- new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER),
|
|
|
- new JoinTask.NodeJoinTask(otherNode, "test", NOT_COMPLETED_LISTENER)
|
|
|
+ new JoinTask.NodeJoinTask(masterNode, TEST_REASON, NOT_COMPLETED_LISTENER),
|
|
|
+ new JoinTask.NodeJoinTask(otherNode, TEST_REASON, NOT_COMPLETED_LISTENER)
|
|
|
),
|
|
|
executorTerm
|
|
|
)
|
|
@@ -443,18 +454,18 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
} else {
|
|
|
clusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
|
|
|
clusterState,
|
|
|
- joinTaskExecutor,
|
|
|
+ executor,
|
|
|
List.of(
|
|
|
JoinTask.completingElection(
|
|
|
- Stream.of(new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER)),
|
|
|
+ Stream.of(new JoinTask.NodeJoinTask(masterNode, TEST_REASON, NOT_COMPLETED_LISTENER)),
|
|
|
executorTerm
|
|
|
)
|
|
|
)
|
|
|
);
|
|
|
clusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
|
|
|
clusterState,
|
|
|
- joinTaskExecutor,
|
|
|
- List.of(JoinTask.singleNode(otherNode, "test", NOT_COMPLETED_LISTENER, executorTerm))
|
|
|
+ executor,
|
|
|
+ List.of(JoinTask.singleNode(otherNode, TEST_REASON, NOT_COMPLETED_LISTENER, executorTerm))
|
|
|
);
|
|
|
}
|
|
|
|
|
@@ -471,7 +482,7 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
|
|
|
|
|
|
final long currentTerm = randomLongBetween(100, 1000);
|
|
|
- final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService);
|
|
|
+ final var executor = new NodeJoinExecutor(allocationService, rerouteService);
|
|
|
|
|
|
final var masterNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT);
|
|
|
final var clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
|
@@ -480,14 +491,13 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
.build();
|
|
|
|
|
|
var tasks = Stream.concat(
|
|
|
- Stream.generate(() -> createRandomTask(masterNode, "outdated", randomLongBetween(0, currentTerm - 1)))
|
|
|
- .limit(randomLongBetween(1, 10)),
|
|
|
- Stream.of(createRandomTask(masterNode, "current", currentTerm))
|
|
|
+ Stream.generate(() -> createRandomTask(masterNode, randomLongBetween(0, currentTerm - 1))).limit(randomLongBetween(1, 10)),
|
|
|
+ Stream.of(createRandomTask(masterNode, currentTerm))
|
|
|
).toList();
|
|
|
|
|
|
ClusterStateTaskExecutorUtils.executeHandlingResults(
|
|
|
clusterState,
|
|
|
- joinTaskExecutor,
|
|
|
+ executor,
|
|
|
tasks,
|
|
|
t -> assertThat(t.term(), equalTo(currentTerm)),
|
|
|
(t, e) -> {
|
|
@@ -500,7 +510,7 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
public void testDesiredNodesMembershipIsUpgradedWhenNewNodesJoin() throws Exception {
|
|
|
final var allocationService = createAllocationService();
|
|
|
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
|
|
|
- final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService);
|
|
|
+ final var executor = new NodeJoinExecutor(allocationService, rerouteService);
|
|
|
|
|
|
final var actualizedDesiredNodes = randomList(0, 5, this::createActualizedDesiredNode);
|
|
|
final var pendingDesiredNodes = randomList(0, 5, this::createPendingDesiredNode);
|
|
@@ -519,9 +529,9 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
);
|
|
|
final var desiredNodes = DesiredNodes.latestFromClusterState(clusterState);
|
|
|
|
|
|
- var tasks = joiningNodes.stream().map(node -> JoinTask.singleNode(node, "join", NOT_COMPLETED_LISTENER, 0L)).toList();
|
|
|
+ var tasks = joiningNodes.stream().map(node -> JoinTask.singleNode(node, TEST_REASON, NOT_COMPLETED_LISTENER, 0L)).toList();
|
|
|
|
|
|
- final var updatedClusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(clusterState, joinTaskExecutor, tasks);
|
|
|
+ final var updatedClusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(clusterState, executor, tasks);
|
|
|
|
|
|
final var updatedDesiredNodes = DesiredNodes.latestFromClusterState(clusterState);
|
|
|
assertThat(updatedDesiredNodes, is(notNullValue()));
|
|
@@ -537,7 +547,7 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
public void testDesiredNodesMembershipIsUpgradedWhenANewMasterIsElected() throws Exception {
|
|
|
final var allocationService = createAllocationService();
|
|
|
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
|
|
|
- final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService);
|
|
|
+ final var executor = new NodeJoinExecutor(allocationService, rerouteService);
|
|
|
|
|
|
final var actualizedDesiredNodes = randomList(1, 5, this::createPendingDesiredNode);
|
|
|
final var pendingDesiredNodes = randomList(0, 5, this::createPendingDesiredNode);
|
|
@@ -552,13 +562,13 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
final var desiredNodes = DesiredNodes.latestFromClusterState(clusterState);
|
|
|
|
|
|
final var completingElectionTask = JoinTask.completingElection(
|
|
|
- clusterState.nodes().stream().map(node -> new JoinTask.NodeJoinTask(node, "test", NOT_COMPLETED_LISTENER)),
|
|
|
+ clusterState.nodes().stream().map(node -> new JoinTask.NodeJoinTask(node, TEST_REASON, NOT_COMPLETED_LISTENER)),
|
|
|
1L
|
|
|
);
|
|
|
|
|
|
final var updatedClusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
|
|
|
clusterState,
|
|
|
- joinTaskExecutor,
|
|
|
+ executor,
|
|
|
List.of(completingElectionTask)
|
|
|
);
|
|
|
|
|
@@ -573,6 +583,50 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
+ public void testPerNodeLogging() {
|
|
|
+ final AllocationService allocationService = createAllocationService();
|
|
|
+ when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
|
|
|
+ final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
|
|
|
+
|
|
|
+ final NodeJoinExecutor executor = new NodeJoinExecutor(allocationService, rerouteService);
|
|
|
+
|
|
|
+ final DiscoveryNode masterNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);
|
|
|
+ final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
|
|
+ .nodes(DiscoveryNodes.builder().add(masterNode).localNodeId(masterNode.getId()).masterNodeId(masterNode.getId()))
|
|
|
+ .build();
|
|
|
+
|
|
|
+ final MockLogAppender appender = new MockLogAppender();
|
|
|
+ final ThreadPool threadPool = new TestThreadPool("test");
|
|
|
+ try (
|
|
|
+ var ignored = appender.capturing(NodeJoinExecutor.class);
|
|
|
+ var clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool)
|
|
|
+ ) {
|
|
|
+ final var node1 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);
|
|
|
+ appender.addExpectation(
|
|
|
+ new MockLogAppender.SeenEventExpectation(
|
|
|
+ "info message",
|
|
|
+ LOGGER_NAME,
|
|
|
+ Level.INFO,
|
|
|
+ "node-join: [" + node1.descriptionWithoutAttributes() + "] with reason [" + TEST_REASON + "]"
|
|
|
+ )
|
|
|
+ );
|
|
|
+ assertNull(
|
|
|
+ PlainActionFuture.<Void, RuntimeException>get(
|
|
|
+ future -> clusterService.getMasterService()
|
|
|
+ .submitStateUpdateTask(
|
|
|
+ "test",
|
|
|
+ JoinTask.singleNode(node1, TEST_REASON, future, 0L),
|
|
|
+ ClusterStateTaskConfig.build(Priority.NORMAL),
|
|
|
+ executor
|
|
|
+ )
|
|
|
+ )
|
|
|
+ );
|
|
|
+ appender.assertAllExpectationsMatched();
|
|
|
+ } finally {
|
|
|
+ TestThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private DesiredNodeWithStatus createActualizedDesiredNode() {
|
|
|
return new DesiredNodeWithStatus(randomDesiredNode(), DesiredNodeWithStatus.Status.ACTUALIZED);
|
|
|
}
|
|
@@ -581,10 +635,10 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
return new DesiredNodeWithStatus(randomDesiredNode(), DesiredNodeWithStatus.Status.PENDING);
|
|
|
}
|
|
|
|
|
|
- private static JoinTask createRandomTask(DiscoveryNode node, String reason, long term) {
|
|
|
+ private static JoinTask createRandomTask(DiscoveryNode node, long term) {
|
|
|
return randomBoolean()
|
|
|
- ? JoinTask.singleNode(node, reason, NOT_COMPLETED_LISTENER, term)
|
|
|
- : JoinTask.completingElection(Stream.of(new JoinTask.NodeJoinTask(node, reason, NOT_COMPLETED_LISTENER)), term);
|
|
|
+ ? JoinTask.singleNode(node, TEST_REASON, NOT_COMPLETED_LISTENER, term)
|
|
|
+ : JoinTask.completingElection(Stream.of(new JoinTask.NodeJoinTask(node, TEST_REASON, NOT_COMPLETED_LISTENER)), term);
|
|
|
}
|
|
|
|
|
|
private static AllocationService createAllocationService() {
|
|
@@ -595,4 +649,8 @@ public class JoinTaskExecutorTests extends ESTestCase {
|
|
|
);
|
|
|
return allocationService;
|
|
|
}
|
|
|
+
|
|
|
+ // Hard-coding the class name here because it is also mentioned in the troubleshooting docs, so should not be renamed without care.
|
|
|
+ private static final String LOGGER_NAME = "org.elasticsearch.cluster.coordination.NodeJoinExecutor";
|
|
|
+
|
|
|
}
|