|
@@ -196,7 +196,7 @@ public class FileSettingsServiceIT extends ESIntegTestCase {
|
|
|
return new Tuple<>(savedClusterState, metadataVersion);
|
|
|
}
|
|
|
|
|
|
- private Tuple<CountDownLatch, AtomicLong> setupClusterStateListener(String node) {
|
|
|
+ private Tuple<CountDownLatch, AtomicLong> setupClusterStateListener(String node, long version) {
|
|
|
ClusterService clusterService = internalCluster().clusterService(node);
|
|
|
CountDownLatch savedClusterState = new CountDownLatch(1);
|
|
|
AtomicLong metadataVersion = new AtomicLong(-1);
|
|
@@ -204,13 +204,10 @@ public class FileSettingsServiceIT extends ESIntegTestCase {
|
|
|
@Override
|
|
|
public void clusterChanged(ClusterChangedEvent event) {
|
|
|
ReservedStateMetadata reservedState = event.state().metadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE);
|
|
|
- if (reservedState != null) {
|
|
|
- ReservedStateHandlerMetadata handlerMetadata = reservedState.handlers().get(ReservedClusterSettingsAction.NAME);
|
|
|
- if (handlerMetadata != null && handlerMetadata.keys().contains("indices.recovery.max_bytes_per_sec")) {
|
|
|
- clusterService.removeListener(this);
|
|
|
- metadataVersion.set(event.state().metadata().version());
|
|
|
- savedClusterState.countDown();
|
|
|
- }
|
|
|
+ if (reservedState != null && reservedState.version() == version) {
|
|
|
+ clusterService.removeListener(this);
|
|
|
+ metadataVersion.set(event.state().metadata().version());
|
|
|
+ savedClusterState.countDown();
|
|
|
}
|
|
|
}
|
|
|
});
|
|
@@ -258,14 +255,14 @@ public class FileSettingsServiceIT extends ESIntegTestCase {
|
|
|
logger.info("--> start master node");
|
|
|
final String masterNode = internalCluster().startMasterOnlyNode();
|
|
|
assertMasterNode(internalCluster().nonMasterClient(), masterNode);
|
|
|
- var savedClusterState = setupClusterStateListener(masterNode);
|
|
|
+ var savedClusterState = setupClusterStateListener(masterNode, versionCounter.incrementAndGet());
|
|
|
|
|
|
FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);
|
|
|
|
|
|
assertBusy(() -> assertTrue(masterFileSettingsService.watching()));
|
|
|
assertFalse(dataFileSettingsService.watching());
|
|
|
|
|
|
- writeJSONFile(masterNode, testJSON, logger, versionCounter.incrementAndGet());
|
|
|
+ writeJSONFile(masterNode, testJSON, logger, versionCounter.get());
|
|
|
assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "50mb");
|
|
|
}
|
|
|
|
|
@@ -276,11 +273,11 @@ public class FileSettingsServiceIT extends ESIntegTestCase {
|
|
|
FileSettingsService dataFileSettingsService = internalCluster().getInstance(FileSettingsService.class, dataNode);
|
|
|
|
|
|
assertFalse(dataFileSettingsService.watching());
|
|
|
- var savedClusterState = setupClusterStateListener(dataNode);
|
|
|
+ var savedClusterState = setupClusterStateListener(dataNode, versionCounter.incrementAndGet());
|
|
|
|
|
|
// In internal cluster tests, the nodes share the config directory, so when we write with the data node path
|
|
|
// the master will pick it up on start
|
|
|
- writeJSONFile(dataNode, testJSON, logger, versionCounter.incrementAndGet());
|
|
|
+ writeJSONFile(dataNode, testJSON, logger, versionCounter.get());
|
|
|
|
|
|
logger.info("--> start master node");
|
|
|
final String masterNode = internalCluster().startMasterOnlyNode();
|
|
@@ -301,14 +298,14 @@ public class FileSettingsServiceIT extends ESIntegTestCase {
|
|
|
Settings.builder().put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s").build()
|
|
|
);
|
|
|
assertMasterNode(internalCluster().masterClient(), masterNode);
|
|
|
- var savedClusterState = setupClusterStateListener(masterNode);
|
|
|
+ var savedClusterState = setupClusterStateListener(masterNode, versionCounter.incrementAndGet());
|
|
|
|
|
|
FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);
|
|
|
|
|
|
assertBusy(() -> assertTrue(masterFileSettingsService.watching()));
|
|
|
|
|
|
logger.info("--> write some settings");
|
|
|
- writeJSONFile(masterNode, testJSON, logger, versionCounter.incrementAndGet());
|
|
|
+ writeJSONFile(masterNode, testJSON, logger, versionCounter.get());
|
|
|
assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "50mb");
|
|
|
|
|
|
logger.info("--> restart master");
|
|
@@ -476,12 +473,12 @@ public class FileSettingsServiceIT extends ESIntegTestCase {
|
|
|
assertFalse(master1FS.watching());
|
|
|
assertFalse(master2FS.watching());
|
|
|
|
|
|
- var savedClusterState = setupClusterStateListener(masterNode);
|
|
|
+ var savedClusterState = setupClusterStateListener(masterNode, versionCounter.incrementAndGet());
|
|
|
FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);
|
|
|
|
|
|
assertBusy(() -> assertTrue(masterFileSettingsService.watching()));
|
|
|
|
|
|
- writeJSONFile(masterNode, testJSON, logger, versionCounter.incrementAndGet());
|
|
|
+ writeJSONFile(masterNode, testJSON, logger, versionCounter.get());
|
|
|
assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "50mb");
|
|
|
|
|
|
internalCluster().stopCurrentMasterNode();
|
|
@@ -501,8 +498,8 @@ public class FileSettingsServiceIT extends ESIntegTestCase {
|
|
|
boolean awaitSuccessful = savedClusterState.v1().await(20, TimeUnit.SECONDS);
|
|
|
assertTrue(awaitSuccessful);
|
|
|
|
|
|
- savedClusterState = setupClusterStateListener(internalCluster().getMasterName());
|
|
|
- writeJSONFile(internalCluster().getMasterName(), testJSON43mb, logger, versionCounter.incrementAndGet());
|
|
|
+ savedClusterState = setupClusterStateListener(internalCluster().getMasterName(), versionCounter.incrementAndGet());
|
|
|
+ writeJSONFile(internalCluster().getMasterName(), testJSON43mb, logger, versionCounter.get());
|
|
|
|
|
|
assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "43mb");
|
|
|
}
|
|
@@ -515,21 +512,21 @@ public class FileSettingsServiceIT extends ESIntegTestCase {
|
|
|
assertBusy(() -> assertTrue(masterFileSettingsService.watching()));
|
|
|
|
|
|
{
|
|
|
- var savedClusterState = setupClusterStateListener(masterNode);
|
|
|
+ var savedClusterState = setupClusterStateListener(masterNode, versionCounter.incrementAndGet());
|
|
|
// Create the settings.json as a symlink to simulate k8 setup
|
|
|
// settings.json -> ..data/settings.json
|
|
|
// ..data -> ..TIMESTAMP_TEMP_FOLDER_1
|
|
|
var fileDir = Files.createDirectories(baseDir.resolve("..TIMESTAMP_TEMP_FOLDER_1"));
|
|
|
- writeJSONFile(masterNode, testJSON, logger, versionCounter.incrementAndGet(), fileDir.resolve("settings.json"));
|
|
|
+ writeJSONFile(masterNode, testJSON, logger, versionCounter.get(), fileDir.resolve("settings.json"));
|
|
|
var dataDir = Files.createSymbolicLink(baseDir.resolve("..data"), fileDir.getFileName());
|
|
|
Files.createSymbolicLink(baseDir.resolve("settings.json"), dataDir.getFileName().resolve("settings.json"));
|
|
|
assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "50mb");
|
|
|
}
|
|
|
{
|
|
|
- var savedClusterState = setupClusterStateListener(masterNode);
|
|
|
+ var savedClusterState = setupClusterStateListener(masterNode, versionCounter.incrementAndGet());
|
|
|
// Update ..data symlink to ..data -> ..TIMESTAMP_TEMP_FOLDER_2 to simulate kubernetes secret update
|
|
|
var fileDir = Files.createDirectories(baseDir.resolve("..TIMESTAMP_TEMP_FOLDER_2"));
|
|
|
- writeJSONFile(masterNode, testJSON43mb, logger, versionCounter.incrementAndGet(), fileDir.resolve("settings.json"));
|
|
|
+ writeJSONFile(masterNode, testJSON43mb, logger, versionCounter.get(), fileDir.resolve("settings.json"));
|
|
|
Files.deleteIfExists(baseDir.resolve("..data"));
|
|
|
Files.createSymbolicLink(baseDir.resolve("..data"), fileDir.getFileName());
|
|
|
assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "43mb");
|