|
@@ -5,23 +5,27 @@
|
|
|
*/
|
|
|
package org.elasticsearch.upgrades;
|
|
|
|
|
|
+import org.apache.http.HttpHost;
|
|
|
import org.apache.http.entity.ContentType;
|
|
|
import org.apache.http.entity.StringEntity;
|
|
|
import org.elasticsearch.Version;
|
|
|
import org.elasticsearch.client.Request;
|
|
|
import org.elasticsearch.client.Response;
|
|
|
+import org.elasticsearch.client.RestClient;
|
|
|
+import org.elasticsearch.client.RestClientBuilder;
|
|
|
import org.elasticsearch.client.core.IndexerState;
|
|
|
import org.elasticsearch.client.transform.GetTransformStatsResponse;
|
|
|
-import org.elasticsearch.client.transform.transforms.TransformConfig;
|
|
|
-import org.elasticsearch.client.transform.transforms.TransformStats;
|
|
|
import org.elasticsearch.client.transform.transforms.DestConfig;
|
|
|
import org.elasticsearch.client.transform.transforms.SourceConfig;
|
|
|
import org.elasticsearch.client.transform.transforms.TimeSyncConfig;
|
|
|
+import org.elasticsearch.client.transform.transforms.TransformConfig;
|
|
|
+import org.elasticsearch.client.transform.transforms.TransformStats;
|
|
|
import org.elasticsearch.client.transform.transforms.pivot.GroupConfig;
|
|
|
import org.elasticsearch.client.transform.transforms.pivot.PivotConfig;
|
|
|
import org.elasticsearch.client.transform.transforms.pivot.TermsGroupSource;
|
|
|
import org.elasticsearch.common.Booleans;
|
|
|
import org.elasticsearch.common.Strings;
|
|
|
+import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
|
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
|
@@ -51,12 +55,13 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
|
|
import static org.hamcrest.Matchers.hasSize;
|
|
|
import static org.hamcrest.Matchers.oneOf;
|
|
|
|
|
|
-public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
+public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
|
|
|
private static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version"));
|
|
|
- private static final String DATAFRAME_ENDPOINT = "/_data_frame/transforms/";
|
|
|
- private static final String CONTINUOUS_DATA_FRAME_ID = "continuous-data-frame-upgrade-job";
|
|
|
- private static final String CONTINUOUS_DATA_FRAME_SOURCE = "data-frame-upgrade-continuous-source";
|
|
|
+ private static final String DATAFRAME_ENDPOINT = "/_transform/";
|
|
|
+ private static final String DATAFRAME_ENDPOINT_DEPRECATED = "/_data_frame/transforms/";
|
|
|
+ private static final String CONTINUOUS_TRANSFORM_ID = "continuous-transform-upgrade-job";
|
|
|
+ private static final String CONTINUOUS_TRANSFORM_SOURCE = "transform-upgrade-continuous-source";
|
|
|
private static final List<String> ENTITIES = Stream.iterate(1, n -> n + 1)
|
|
|
.limit(5)
|
|
|
.map(v -> "user_" + v)
|
|
@@ -76,6 +81,14 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
waitForPendingTasks(adminClient(), taskName -> taskName.startsWith("data_frame/transforms") == false);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException {
|
|
|
+ RestClientBuilder builder = RestClient.builder(hosts);
|
|
|
+ configureClient(builder, settings);
|
|
|
+ builder.setStrictDeprecationMode(false);
|
|
|
+ return builder.build();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* The purpose of this test is to ensure that when a job is open through a rolling upgrade we upgrade the results
|
|
|
* index mappings when it is assigned to an upgraded node even if no other ML endpoint is called after the upgrade
|
|
@@ -86,7 +99,9 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
adjustLoggingLevels.setJsonEntity(
|
|
|
"{\"transient\": {" +
|
|
|
"\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," +
|
|
|
- "\"logger.org.elasticsearch.xpack.dataframe\": \"trace\"}}");
|
|
|
+ "\"logger.org.elasticsearch.xpack.dataframe\": \"trace\"," +
|
|
|
+ "\"logger.org.elasticsearch.xpack.transform\": \"trace\"" +
|
|
|
+ "}}");
|
|
|
client().performRequest(adjustLoggingLevels);
|
|
|
Request waitForYellow = new Request("GET", "/_cluster/health");
|
|
|
waitForYellow.addParameter("wait_for_nodes", "3");
|
|
@@ -115,17 +130,17 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
}
|
|
|
|
|
|
private void cleanUpTransforms() throws Exception {
|
|
|
- stopTransform(CONTINUOUS_DATA_FRAME_ID);
|
|
|
- deleteTransform(CONTINUOUS_DATA_FRAME_ID);
|
|
|
+ stopTransform(CONTINUOUS_TRANSFORM_ID);
|
|
|
+ deleteTransform(CONTINUOUS_TRANSFORM_ID);
|
|
|
waitForPendingDataFrameTasks();
|
|
|
}
|
|
|
|
|
|
private void createAndStartContinuousDataFrame() throws Exception {
|
|
|
- createIndex(CONTINUOUS_DATA_FRAME_SOURCE);
|
|
|
+ createIndex(CONTINUOUS_TRANSFORM_SOURCE);
|
|
|
long totalDocsWrittenSum = 0;
|
|
|
for (TimeValue bucket : BUCKETS) {
|
|
|
int docs = randomIntBetween(1, 25);
|
|
|
- putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, bucket, ENTITIES);
|
|
|
+ putData(CONTINUOUS_TRANSFORM_SOURCE, docs, bucket, ENTITIES);
|
|
|
totalDocsWrittenSum += docs * ENTITIES.size();
|
|
|
}
|
|
|
long totalDocsWritten = totalDocsWrittenSum;
|
|
@@ -135,18 +150,18 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("stars").field("stars")))
|
|
|
.setGroups(GroupConfig.builder().groupBy("user_id", TermsGroupSource.builder().setField("user_id").build()).build())
|
|
|
.build())
|
|
|
- .setDest(DestConfig.builder().setIndex(CONTINUOUS_DATA_FRAME_ID + "_idx").build())
|
|
|
- .setSource(SourceConfig.builder().setIndex(CONTINUOUS_DATA_FRAME_SOURCE).build())
|
|
|
- .setId(CONTINUOUS_DATA_FRAME_ID)
|
|
|
+ .setDest(DestConfig.builder().setIndex(CONTINUOUS_TRANSFORM_ID + "_idx").build())
|
|
|
+ .setSource(SourceConfig.builder().setIndex(CONTINUOUS_TRANSFORM_SOURCE).build())
|
|
|
+ .setId(CONTINUOUS_TRANSFORM_ID)
|
|
|
.setFrequency(TimeValue.timeValueSeconds(1))
|
|
|
.build();
|
|
|
- putTransform(CONTINUOUS_DATA_FRAME_ID, config);
|
|
|
+ putTransform(CONTINUOUS_TRANSFORM_ID, config);
|
|
|
|
|
|
- startTransform(CONTINUOUS_DATA_FRAME_ID);
|
|
|
- waitUntilAfterCheckpoint(CONTINUOUS_DATA_FRAME_ID, 0L);
|
|
|
+ startTransform(CONTINUOUS_TRANSFORM_ID);
|
|
|
+ waitUntilAfterCheckpoint(CONTINUOUS_TRANSFORM_ID, 0L);
|
|
|
|
|
|
assertBusy(() -> {
|
|
|
- TransformStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID);
|
|
|
+ TransformStats stateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID);
|
|
|
assertThat(stateAndStats.getIndexerStats().getOutputDocuments(), equalTo((long)ENTITIES.size()));
|
|
|
assertThat(stateAndStats.getIndexerStats().getNumDocuments(), equalTo(totalDocsWritten));
|
|
|
// Even if we get back to started, we may periodically get set back to `indexing` when triggered.
|
|
@@ -156,7 +171,7 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
|
|
|
|
|
|
// We want to make sure our latest state is written before we turn the node off, this makes the testing more reliable
|
|
|
- awaitWrittenIndexerState(CONTINUOUS_DATA_FRAME_ID, IndexerState.STARTED.value());
|
|
|
+ awaitWrittenIndexerState(CONTINUOUS_TRANSFORM_ID, IndexerState.STARTED.value());
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
@@ -165,13 +180,13 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
// A continuous data frame should automatically become started when it gets assigned to a node
|
|
|
// if it was assigned to the node that was removed from the cluster
|
|
|
assertBusy(() -> {
|
|
|
- TransformStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID);
|
|
|
+ TransformStats stateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID);
|
|
|
assertThat(stateAndStats.getState(), oneOf(TransformStats.State.STARTED, TransformStats.State.INDEXING));
|
|
|
},
|
|
|
120,
|
|
|
TimeUnit.SECONDS);
|
|
|
|
|
|
- TransformStats previousStateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID);
|
|
|
+ TransformStats previousStateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID);
|
|
|
|
|
|
// Add a new user and write data to it
|
|
|
// This is so we can have more reliable data counts, as writing to existing entities requires
|
|
@@ -181,20 +196,20 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
int docs = 5;
|
|
|
// Index the data
|
|
|
// The frequency and delay should see the data once its indexed
|
|
|
- putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, TimeValue.timeValueSeconds(0), entities);
|
|
|
+ putData(CONTINUOUS_TRANSFORM_SOURCE, docs, TimeValue.timeValueSeconds(0), entities);
|
|
|
|
|
|
- waitUntilAfterCheckpoint(CONTINUOUS_DATA_FRAME_ID, expectedLastCheckpoint);
|
|
|
+ waitUntilAfterCheckpoint(CONTINUOUS_TRANSFORM_ID, expectedLastCheckpoint);
|
|
|
|
|
|
assertBusy(() -> assertThat(
|
|
|
- getTransformStats(CONTINUOUS_DATA_FRAME_ID).getIndexerStats().getNumDocuments(),
|
|
|
+ getTransformStats(CONTINUOUS_TRANSFORM_ID).getIndexerStats().getNumDocuments(),
|
|
|
greaterThanOrEqualTo(docs + previousStateAndStats.getIndexerStats().getNumDocuments())),
|
|
|
120,
|
|
|
TimeUnit.SECONDS);
|
|
|
- TransformStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID);
|
|
|
+ TransformStats stateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID);
|
|
|
|
|
|
assertThat(stateAndStats.getState(),
|
|
|
oneOf(TransformStats.State.STARTED, TransformStats.State.INDEXING));
|
|
|
- awaitWrittenIndexerState(CONTINUOUS_DATA_FRAME_ID, (responseBody) -> {
|
|
|
+ awaitWrittenIndexerState(CONTINUOUS_TRANSFORM_ID, (responseBody) -> {
|
|
|
Map<String, Object> indexerStats = (Map<String,Object>)((List<?>)XContentMapValues.extractValue("hits.hits._source.stats",
|
|
|
responseBody))
|
|
|
.get(0);
|
|
@@ -245,33 +260,37 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ private String getTransformEndpoint() {
|
|
|
+ return CLUSTER_TYPE == ClusterType.UPGRADED ? DATAFRAME_ENDPOINT : DATAFRAME_ENDPOINT_DEPRECATED;
|
|
|
+ }
|
|
|
+
|
|
|
private void putTransform(String id, TransformConfig config) throws IOException {
|
|
|
- final Request createDataframeTransformRequest = new Request("PUT", DATAFRAME_ENDPOINT + id);
|
|
|
+ final Request createDataframeTransformRequest = new Request("PUT", getTransformEndpoint() + id);
|
|
|
createDataframeTransformRequest.setJsonEntity(Strings.toString(config));
|
|
|
Response response = client().performRequest(createDataframeTransformRequest);
|
|
|
assertEquals(200, response.getStatusLine().getStatusCode());
|
|
|
}
|
|
|
|
|
|
private void deleteTransform(String id) throws IOException {
|
|
|
- Response response = client().performRequest(new Request("DELETE", DATAFRAME_ENDPOINT + id));
|
|
|
+ Response response = client().performRequest(new Request("DELETE", getTransformEndpoint() + id));
|
|
|
assertEquals(200, response.getStatusLine().getStatusCode());
|
|
|
}
|
|
|
|
|
|
private void startTransform(String id) throws IOException {
|
|
|
- final Request startDataframeTransformRequest = new Request("POST", DATAFRAME_ENDPOINT + id + "/_start");
|
|
|
+ final Request startDataframeTransformRequest = new Request("POST", getTransformEndpoint() + id + "/_start");
|
|
|
Response response = client().performRequest(startDataframeTransformRequest);
|
|
|
assertEquals(200, response.getStatusLine().getStatusCode());
|
|
|
}
|
|
|
|
|
|
private void stopTransform(String id) throws IOException {
|
|
|
final Request stopDataframeTransformRequest = new Request("POST",
|
|
|
- DATAFRAME_ENDPOINT + id + "/_stop?wait_for_completion=true");
|
|
|
+ getTransformEndpoint() + id + "/_stop?wait_for_completion=true");
|
|
|
Response response = client().performRequest(stopDataframeTransformRequest);
|
|
|
assertEquals(200, response.getStatusLine().getStatusCode());
|
|
|
}
|
|
|
|
|
|
private TransformStats getTransformStats(String id) throws IOException {
|
|
|
- final Request getStats = new Request("GET", DATAFRAME_ENDPOINT + id + "/_stats");
|
|
|
+ final Request getStats = new Request("GET", getTransformEndpoint() + id + "/_stats");
|
|
|
Response response = client().performRequest(getStats);
|
|
|
assertEquals(200, response.getStatusLine().getStatusCode());
|
|
|
XContentType xContentType = XContentType.fromMediaTypeOrFormat(response.getEntity().getContentType().getValue());
|