|
@@ -208,19 +208,21 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
createDataStreamFromNonDataStreamIndices(dataStreamFromNonDataStreamIndices);
|
|
|
} else if (CLUSTER_TYPE == ClusterType.UPGRADED) {
|
|
|
Map<String, Map<String, Object>> oldIndicesMetadata = getIndicesMetadata(dataStreamName);
|
|
|
+ String oldWriteIndex = getDataStreamBackingIndexNames(dataStreamName).getLast();
|
|
|
upgradeDataStream(dataStreamName, numRollovers, numRollovers + 1, 0, ilmEnabled);
|
|
|
cancelReindexTask(dataStreamName);
|
|
|
upgradeDataStream(dataStreamFromNonDataStreamIndices, 0, 1, 0, ilmEnabled);
|
|
|
cancelReindexTask(dataStreamFromNonDataStreamIndices);
|
|
|
Map<String, Map<String, Object>> upgradedIndicesMetadata = getIndicesMetadata(dataStreamName);
|
|
|
+ String newWriteIndex = getDataStreamBackingIndexNames(dataStreamName).getLast();
|
|
|
|
|
|
if (ilmEnabled) {
|
|
|
- checkILMPhase(dataStreamName, upgradedIndicesMetadata);
|
|
|
+ checkILMPhase(dataStreamName, newWriteIndex);
|
|
|
// Delete the data streams to avoid ILM continuously running cluster state tasks, see
|
|
|
// https://github.com/elastic/elasticsearch/issues/129097#issuecomment-3016122739
|
|
|
deleteDataStream(dataStreamName);
|
|
|
} else {
|
|
|
- compareIndexMetadata(oldIndicesMetadata, upgradedIndicesMetadata);
|
|
|
+ compareIndexMetadata(oldIndicesMetadata, oldWriteIndex, upgradedIndicesMetadata);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -262,9 +264,9 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
|
|
|
private void compareIndexMetadata(
|
|
|
Map<String, Map<String, Object>> oldIndicesMetadata,
|
|
|
+ String oldWriteIndex,
|
|
|
Map<String, Map<String, Object>> upgradedIndicesMetadata
|
|
|
) {
|
|
|
- String oldWriteIndex = getWriteIndexFromDataStreamIndexMetadata(oldIndicesMetadata);
|
|
|
for (Map.Entry<String, Map<String, Object>> upgradedIndexEntry : upgradedIndicesMetadata.entrySet()) {
|
|
|
String upgradedIndexName = upgradedIndexEntry.getKey();
|
|
|
if (upgradedIndexName.startsWith(".migrated-")) {
|
|
@@ -287,10 +289,8 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- private void checkILMPhase(String dataStreamName, Map<String, Map<String, Object>> upgradedIndicesMetadata) throws Exception {
|
|
|
- var writeIndex = getWriteIndexFromDataStreamIndexMetadata(upgradedIndicesMetadata);
|
|
|
+ private void checkILMPhase(String dataStreamName, String writeIndex) throws Exception {
|
|
|
assertBusy(() -> {
|
|
|
-
|
|
|
Request request = new Request("GET", dataStreamName + "/_ilm/explain");
|
|
|
Response response = client().performRequest(request);
|
|
|
Map<String, Object> responseMap = XContentHelper.convertToMap(
|
|
@@ -302,21 +302,12 @@ public class DataStreamsUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
for (var index : indices.keySet()) {
|
|
|
if (index.equals(writeIndex) == false) {
|
|
|
Map<String, Object> ilmInfo = (Map<String, Object>) indices.get(index);
|
|
|
- assertThat("Index has not moved to cold ILM phase", ilmInfo.get("phase"), equalTo("cold"));
|
|
|
+ assertThat("Index [" + index + "] has not moved to cold ILM phase, " + indices, ilmInfo.get("phase"), equalTo("cold"));
|
|
|
}
|
|
|
}
|
|
|
}, 30, TimeUnit.SECONDS);
|
|
|
}
|
|
|
|
|
|
- private String getWriteIndexFromDataStreamIndexMetadata(Map<String, Map<String, Object>> indexMetadataForDataStream) {
|
|
|
- return indexMetadataForDataStream.entrySet()
|
|
|
- .stream()
|
|
|
- .sorted((o1, o2) -> Long.compare(getCreationDate(o2.getValue()), getCreationDate(o1.getValue())))
|
|
|
- .map(Map.Entry::getKey)
|
|
|
- .findFirst()
|
|
|
- .get();
|
|
|
- }
|
|
|
-
|
|
|
private void startILM() throws IOException {
|
|
|
setILMInterval();
|
|
|
var request = new Request("POST", "/_ilm/start");
|