|
@@ -57,12 +57,15 @@ import org.elasticsearch.common.transport.TransportAddress;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
|
|
|
import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver;
|
|
|
+import org.elasticsearch.env.NodeEnvironment;
|
|
|
+import org.elasticsearch.gateway.MockGatewayMetaState;
|
|
|
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
|
import org.elasticsearch.test.disruption.DisruptableMockTransport;
|
|
|
import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
import org.hamcrest.Matcher;
|
|
|
+import org.junit.After;
|
|
|
import org.junit.Before;
|
|
|
|
|
|
import java.io.IOException;
|
|
@@ -131,6 +134,16 @@ import static org.hamcrest.Matchers.startsWith;
|
|
|
|
|
|
public class CoordinatorTests extends ESTestCase {
|
|
|
|
|
|
+ private final List<NodeEnvironment> nodeEnvironments = new ArrayList<>();
|
|
|
+
|
|
|
+ @After
|
|
|
+ public void closeNodeEnvironmentsAfterEachTest() {
|
|
|
+ for (NodeEnvironment nodeEnvironment : nodeEnvironments) {
|
|
|
+ nodeEnvironment.close();
|
|
|
+ }
|
|
|
+ nodeEnvironments.clear();
|
|
|
+ }
|
|
|
+
|
|
|
@Before
|
|
|
public void resetPortCounterBeforeEachTest() {
|
|
|
resetPortCounter();
|
|
@@ -1102,8 +1115,8 @@ public class CoordinatorTests extends ESTestCase {
|
|
|
private final Set<String> blackholedNodes = new HashSet<>();
|
|
|
private final Map<Long, ClusterState> committedStatesByVersion = new HashMap<>();
|
|
|
|
|
|
- private final Function<DiscoveryNode, PersistedState> defaultPersistedStateSupplier = localNode -> new MockPersistedState(0L,
|
|
|
- clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
|
|
|
+ private final Function<DiscoveryNode, MockPersistedState> defaultPersistedStateSupplier =
|
|
|
+ localNode -> new MockPersistedState(localNode);
|
|
|
|
|
|
Cluster(int initialNodeCount) {
|
|
|
this(initialNodeCount, true);
|
|
@@ -1490,35 +1503,77 @@ public class CoordinatorTests extends ESTestCase {
|
|
|
return getAnyNode();
|
|
|
}
|
|
|
|
|
|
- class MockPersistedState extends InMemoryPersistedState {
|
|
|
- MockPersistedState(long term, ClusterState acceptedState) {
|
|
|
- super(term, acceptedState);
|
|
|
+ class MockPersistedState implements PersistedState {
|
|
|
+ private final PersistedState delegate;
|
|
|
+ private final NodeEnvironment nodeEnvironment;
|
|
|
+
|
|
|
+ MockPersistedState(DiscoveryNode localNode) {
|
|
|
+ try {
|
|
|
+ if (rarely()) {
|
|
|
+ nodeEnvironment = newNodeEnvironment();
|
|
|
+ nodeEnvironments.add(nodeEnvironment);
|
|
|
+ delegate = new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), localNode)
|
|
|
+ .getPersistedState(Settings.EMPTY, null);
|
|
|
+ } else {
|
|
|
+ nodeEnvironment = null;
|
|
|
+ delegate = new InMemoryPersistedState(0L,
|
|
|
+ clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new UncheckedIOException("Unable to create MockPersistedState", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ MockPersistedState(DiscoveryNode newLocalNode, MockPersistedState oldState) {
|
|
|
+ try {
|
|
|
+ if (oldState.nodeEnvironment != null) {
|
|
|
+ nodeEnvironment = oldState.nodeEnvironment;
|
|
|
+ delegate = new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), newLocalNode)
|
|
|
+ .getPersistedState(Settings.EMPTY, null);
|
|
|
+ } else {
|
|
|
+ nodeEnvironment = null;
|
|
|
+ BytesStreamOutput outStream = new BytesStreamOutput();
|
|
|
+ outStream.setVersion(Version.CURRENT);
|
|
|
+ oldState.getLastAcceptedState().writeTo(outStream);
|
|
|
+ StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(),
|
|
|
+ new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
|
|
|
+ delegate = new InMemoryPersistedState(oldState.getCurrentTerm(), ClusterState.readFrom(inStream,
|
|
|
+ newLocalNode)); // adapts it to new localNode instance
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new UncheckedIOException("Unable to create MockPersistedState", e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void possiblyFail(String description) {
|
|
|
if (disruptStorage && rarely()) {
|
|
|
- // TODO revisit this when we've decided how PersistedState should throw exceptions
|
|
|
logger.trace("simulating IO exception [{}]", description);
|
|
|
- if (randomBoolean()) {
|
|
|
- throw new UncheckedIOException(new IOException("simulated IO exception [" + description + ']'));
|
|
|
- } else {
|
|
|
- throw new CoordinationStateRejectedException("simulated IO exception [" + description + ']');
|
|
|
- }
|
|
|
+ // In the real-life IOError might be thrown, for example if state fsync fails.
|
|
|
+ // This will require node restart and we're not emulating it here.
|
|
|
+ throw new UncheckedIOException(new IOException("simulated IO exception [" + description + ']'));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public long getCurrentTerm() {
|
|
|
+ return delegate.getCurrentTerm();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ClusterState getLastAcceptedState() {
|
|
|
+ return delegate.getLastAcceptedState();
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void setCurrentTerm(long currentTerm) {
|
|
|
possiblyFail("before writing term of " + currentTerm);
|
|
|
- super.setCurrentTerm(currentTerm);
|
|
|
- // TODO possiblyFail() here if that's a failure mode of the storage layer
|
|
|
+ delegate.setCurrentTerm(currentTerm);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void setLastAcceptedState(ClusterState clusterState) {
|
|
|
possiblyFail("before writing last-accepted state of term=" + clusterState.term() + ", version=" + clusterState.version());
|
|
|
- super.setLastAcceptedState(clusterState);
|
|
|
- // TODO possiblyFail() here if that's a failure mode of the storage layer
|
|
|
+ delegate.setLastAcceptedState(clusterState);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1528,7 +1583,7 @@ public class CoordinatorTests extends ESTestCase {
|
|
|
private final int nodeIndex;
|
|
|
private Coordinator coordinator;
|
|
|
private final DiscoveryNode localNode;
|
|
|
- private final PersistedState persistedState;
|
|
|
+ private final MockPersistedState persistedState;
|
|
|
private FakeClusterApplier clusterApplier;
|
|
|
private AckedFakeThreadPoolMasterService masterService;
|
|
|
private TransportService transportService;
|
|
@@ -1540,7 +1595,7 @@ public class CoordinatorTests extends ESTestCase {
|
|
|
this(nodeIndex, createDiscoveryNode(nodeIndex, masterEligible), defaultPersistedStateSupplier);
|
|
|
}
|
|
|
|
|
|
- ClusterNode(int nodeIndex, DiscoveryNode localNode, Function<DiscoveryNode, PersistedState> persistedStateSupplier) {
|
|
|
+ ClusterNode(int nodeIndex, DiscoveryNode localNode, Function<DiscoveryNode, MockPersistedState> persistedStateSupplier) {
|
|
|
this.nodeIndex = nodeIndex;
|
|
|
this.localNode = localNode;
|
|
|
persistedState = persistedStateSupplier.apply(localNode);
|
|
@@ -1608,19 +1663,7 @@ public class CoordinatorTests extends ESTestCase {
|
|
|
UUIDs.randomBase64UUID(random()), // generated deterministically for repeatable tests
|
|
|
address.address().getHostString(), address.getAddress(), address, Collections.emptyMap(),
|
|
|
localNode.isMasterNode() ? EnumSet.allOf(Role.class) : emptySet(), Version.CURRENT);
|
|
|
- final PersistedState newPersistedState;
|
|
|
- try {
|
|
|
- BytesStreamOutput outStream = new BytesStreamOutput();
|
|
|
- outStream.setVersion(Version.CURRENT);
|
|
|
- persistedState.getLastAcceptedState().writeTo(outStream);
|
|
|
- StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(),
|
|
|
- new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
|
|
|
- newPersistedState = new MockPersistedState(persistedState.getCurrentTerm(),
|
|
|
- ClusterState.readFrom(inStream, newLocalNode)); // adapts it to new localNode instance
|
|
|
- } catch (IOException e) {
|
|
|
- throw new UncheckedIOException(e);
|
|
|
- }
|
|
|
- return new ClusterNode(nodeIndex, newLocalNode, node -> newPersistedState);
|
|
|
+ return new ClusterNode(nodeIndex, newLocalNode, node -> new MockPersistedState(newLocalNode, persistedState));
|
|
|
}
|
|
|
|
|
|
private PersistedState getPersistedState() {
|