|
@@ -49,7 +49,7 @@ import org.elasticsearch.test.transport.MockTransportService;
|
|
|
import org.elasticsearch.xcontent.XContentType;
|
|
|
import org.elasticsearch.xpack.migrate.MigratePlugin;
|
|
|
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
|
|
|
-import org.junit.After;
|
|
|
+import org.junit.Before;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.time.Instant;
|
|
@@ -67,9 +67,11 @@ import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
|
|
|
public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
|
|
|
- @After
|
|
|
- private void cleanup() {
|
|
|
+
|
|
|
+ @Before
|
|
|
+ private void setup() throws Exception {
|
|
|
deletePipeline(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME);
|
|
|
+ assertBusy(() -> { assertTrue(getPipelines(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME).isFound()); });
|
|
|
}
|
|
|
|
|
|
private static final String MAPPING = """
|
|
@@ -114,6 +116,9 @@ public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
|
|
|
// add doc without timestamp
|
|
|
addDoc(sourceIndex, "{\"foo\":\"baz\"}");
|
|
|
|
|
|
+ // wait until doc is written to all shards before adding mapping
|
|
|
+ ensureHealth(sourceIndex);
|
|
|
+
|
|
|
// add timestamp to source mapping
|
|
|
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
|
|
|
|
|
@@ -129,6 +134,7 @@ public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
|
|
|
}
|
|
|
|
|
|
public void testTimestampNotAddedIfExists() {
|
|
|
+
|
|
|
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
|
|
|
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
|
|
|
|
|
@@ -137,6 +143,9 @@ public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
|
|
|
var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time);
|
|
|
addDoc(sourceIndex, doc);
|
|
|
|
|
|
+ // wait until doc is written to all shards before adding mapping
|
|
|
+ ensureHealth(sourceIndex);
|
|
|
+
|
|
|
// add timestamp to source mapping
|
|
|
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
|
|
|
|
|
@@ -184,6 +193,9 @@ public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
|
|
|
var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time);
|
|
|
addDoc(sourceIndex, doc);
|
|
|
|
|
|
+ // wait until doc is written to all shards before adding mapping
|
|
|
+ ensureHealth(sourceIndex);
|
|
|
+
|
|
|
// add timestamp to source mapping
|
|
|
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();
|
|
|
|
|
@@ -293,7 +305,7 @@ public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- public void testSettingsAddedBeforeReindex() throws Exception {
|
|
|
+ public void testSettingsAddedBeforeReindex() {
|
|
|
// start with a static setting
|
|
|
var numShards = randomIntBetween(1, 10);
|
|
|
var staticSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build();
|
|
@@ -604,4 +616,12 @@ public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
|
|
|
bulkRequest.add(new IndexRequest(index).opType(DocWriteRequest.OpType.CREATE).source(doc, XContentType.JSON));
|
|
|
safeGet(client().bulk(bulkRequest));
|
|
|
}
|
|
|
+
|
|
|
+ private void ensureHealth(String index) {
|
|
|
+ if (cluster().numDataNodes() > 1) {
|
|
|
+ ensureGreen(index);
|
|
|
+ } else {
|
|
|
+ ensureYellow(index);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|