|
@@ -65,7 +65,6 @@ import static org.hamcrest.Matchers.oneOf;
|
|
|
|
|
|
public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
|
|
|
- private static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version"));
|
|
|
private static final String TRANSFORM_ENDPOINT = "/_transform/";
|
|
|
private static final String TRANSFORM_ENDPOINT_DEPRECATED = "/_data_frame/transforms/";
|
|
|
private static final String CONTINUOUS_TRANSFORM_ID = "continuous-transform-upgrade-job";
|
|
@@ -89,12 +88,12 @@ public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
|
|
|
// match notifications index templates, independent of the version, at least 1 should exist
|
|
|
SortedSet<String> notificationsDeprecated = templates
|
|
|
- .tailSet(TRANSFORM_NOTIFICATIONS_INDEX_PREFIX_DEPRECATED);
|
|
|
+ .tailSet(TRANSFORM_NOTIFICATIONS_INDEX_PREFIX_DEPRECATED);
|
|
|
SortedSet<String> notifications = templates.tailSet(TRANSFORM_NOTIFICATIONS_INDEX_PREFIX);
|
|
|
|
|
|
int foundTemplates = 0;
|
|
|
foundTemplates += notificationsDeprecated.isEmpty() ? 0
|
|
|
- : notificationsDeprecated.first().startsWith(TRANSFORM_NOTIFICATIONS_INDEX_PREFIX_DEPRECATED) ? 1 : 0;
|
|
|
+ : notificationsDeprecated.first().startsWith(TRANSFORM_NOTIFICATIONS_INDEX_PREFIX_DEPRECATED) ? 1 : 0;
|
|
|
foundTemplates += notifications.isEmpty() ? 0 : notifications.first().startsWith(TRANSFORM_NOTIFICATIONS_INDEX_PREFIX) ? 1 : 0;
|
|
|
|
|
|
if (foundTemplates < 1) {
|
|
@@ -120,7 +119,6 @@ public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
* index mappings when it is assigned to an upgraded node even if no other ML endpoint is called after the upgrade
|
|
|
*/
|
|
|
public void testTransformRollingUpgrade() throws Exception {
|
|
|
- assumeTrue("Continuous transform not supported until 7.3", UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_3_0));
|
|
|
Request adjustLoggingLevels = new Request("PUT", "/_cluster/settings");
|
|
|
adjustLoggingLevels.setJsonEntity(
|
|
|
"{\"transient\": {" +
|
|
@@ -188,7 +186,7 @@ public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
|
|
|
assertBusy(() -> {
|
|
|
TransformStats stateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID);
|
|
|
- assertThat(stateAndStats.getIndexerStats().getDocumentsIndexed(), equalTo((long)ENTITIES.size()));
|
|
|
+ assertThat(stateAndStats.getIndexerStats().getDocumentsIndexed(), equalTo((long) ENTITIES.size()));
|
|
|
assertThat(stateAndStats.getIndexerStats().getDocumentsProcessed(), equalTo(totalDocsWritten));
|
|
|
// Even if we get back to started, we may periodically get set back to `indexing` when triggered.
|
|
|
// Though short lived due to no changes on the source indices, it could result in flaky test behavior
|
|
@@ -206,11 +204,11 @@ public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
// A continuous transform should automatically become started when it gets assigned to a node
|
|
|
// if it was assigned to the node that was removed from the cluster
|
|
|
assertBusy(() -> {
|
|
|
- TransformStats stateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID);
|
|
|
- assertThat(stateAndStats.getState(), oneOf(TransformStats.State.STARTED, TransformStats.State.INDEXING));
|
|
|
- },
|
|
|
- 120,
|
|
|
- TimeUnit.SECONDS);
|
|
|
+ TransformStats stateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID);
|
|
|
+ assertThat(stateAndStats.getState(), oneOf(TransformStats.State.STARTED, TransformStats.State.INDEXING));
|
|
|
+ },
|
|
|
+ 120,
|
|
|
+ TimeUnit.SECONDS);
|
|
|
|
|
|
TransformStats previousStateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID);
|
|
|
|
|
@@ -236,12 +234,12 @@ public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
assertThat(stateAndStats.getState(),
|
|
|
oneOf(TransformStats.State.STARTED, TransformStats.State.INDEXING));
|
|
|
awaitWrittenIndexerState(CONTINUOUS_TRANSFORM_ID, (responseBody) -> {
|
|
|
- Map<String, Object> indexerStats = (Map<String,Object>)((List<?>)XContentMapValues.extractValue("hits.hits._source.stats",
|
|
|
+ Map<String, Object> indexerStats = (Map<String, Object>) ((List<?>) XContentMapValues.extractValue("hits.hits._source.stats",
|
|
|
responseBody))
|
|
|
.get(0);
|
|
|
- assertThat((Integer)indexerStats.get("documents_indexed"),
|
|
|
+ assertThat((Integer) indexerStats.get("documents_indexed"),
|
|
|
greaterThan(Long.valueOf(previousStateAndStats.getIndexerStats().getDocumentsIndexed()).intValue()));
|
|
|
- assertThat((Integer)indexerStats.get("documents_processed"),
|
|
|
+ assertThat((Integer) indexerStats.get("documents_processed"),
|
|
|
greaterThan(Long.valueOf(previousStateAndStats.getIndexerStats().getDocumentsProcessed()).intValue()));
|
|
|
});
|
|
|
}
|
|
@@ -249,8 +247,8 @@ public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
private void awaitWrittenIndexerState(String id, Consumer<Map<?, ?>> responseAssertion) throws Exception {
|
|
|
Request getStatsDocsRequest = new Request("GET",
|
|
|
TRANSFORM_INTERNAL_INDEX_PREFIX + "*," +
|
|
|
- TRANSFORM_INTERNAL_INDEX_PREFIX_DEPRECATED + "*" +
|
|
|
- "/_search");
|
|
|
+ TRANSFORM_INTERNAL_INDEX_PREFIX_DEPRECATED + "*" +
|
|
|
+ "/_search");
|
|
|
|
|
|
getStatsDocsRequest.setJsonEntity("{\n" +
|
|
|
" \"query\": {\n" +
|
|
@@ -284,7 +282,7 @@ public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
|
|
|
private void awaitWrittenIndexerState(String id, String indexerState) throws Exception {
|
|
|
awaitWrittenIndexerState(id, (responseBody) -> {
|
|
|
- String storedState = ((List<?>)XContentMapValues.extractValue("hits.hits._source.state.indexer_state", responseBody))
|
|
|
+ String storedState = ((List<?>) XContentMapValues.extractValue("hits.hits._source.state.indexer_state", responseBody))
|
|
|
.get(0)
|
|
|
.toString();
|
|
|
assertThat(storedState, equalTo(indexerState));
|
|
@@ -292,7 +290,10 @@ public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
}
|
|
|
|
|
|
private String getTransformEndpoint() {
|
|
|
- return CLUSTER_TYPE == ClusterType.UPGRADED ? TRANSFORM_ENDPOINT : TRANSFORM_ENDPOINT_DEPRECATED;
|
|
|
+ // always hit the destination cluster on the non-deprecated endpoint, sometimes hit the source cluster on the non-deprecated
|
|
|
+ // endpoint, unless we're upgrading from 8.0.0, in which case always hit the non-deprecated endpoint
|
|
|
+ return (CLUSTER_TYPE == ClusterType.UPGRADED) || randomBoolean() || UPGRADE_FROM_VERSION.onOrAfter(Version.V_8_0_0) ?
|
|
|
+ TRANSFORM_ENDPOINT : TRANSFORM_ENDPOINT_DEPRECATED;
|
|
|
}
|
|
|
|
|
|
private void putTransform(String id, TransformConfig config) throws IOException {
|
|
@@ -307,13 +308,13 @@ public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
assertEquals(200, response.getStatusLine().getStatusCode());
|
|
|
}
|
|
|
|
|
|
- private void startTransform(String id) throws IOException {
|
|
|
+ private void startTransform(String id) throws IOException {
|
|
|
final Request startDataframeTransformRequest = new Request("POST", getTransformEndpoint() + id + "/_start");
|
|
|
Response response = client().performRequest(startDataframeTransformRequest);
|
|
|
assertEquals(200, response.getStatusLine().getStatusCode());
|
|
|
}
|
|
|
|
|
|
- private void stopTransform(String id) throws IOException {
|
|
|
+ private void stopTransform(String id) throws IOException {
|
|
|
final Request stopDataframeTransformRequest = new Request("POST",
|
|
|
getTransformEndpoint() + id + "/_stop?wait_for_completion=true");
|
|
|
Response response = client().performRequest(stopDataframeTransformRequest);
|