|
@@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.indices.diskusage.AnalyzeIndexDiskUsageReq
|
|
|
import org.elasticsearch.action.admin.indices.diskusage.TransportAnalyzeIndexDiskUsageAction;
|
|
|
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
|
|
|
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
|
|
+import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
|
|
|
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
|
|
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
|
|
|
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
|
|
@@ -20,6 +21,8 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
|
|
|
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
|
|
|
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
|
|
|
import org.elasticsearch.action.bulk.BulkRequest;
|
|
|
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
|
|
+import org.elasticsearch.action.bulk.BulkResponse;
|
|
|
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
|
|
|
import org.elasticsearch.action.get.GetRequest;
|
|
|
import org.elasticsearch.action.index.IndexRequest;
|
|
@@ -27,6 +30,7 @@ import org.elasticsearch.action.search.SearchRequest;
|
|
|
import org.elasticsearch.action.support.WriteRequest;
|
|
|
import org.elasticsearch.cluster.metadata.ComponentTemplate;
|
|
|
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
|
|
|
+import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
import org.elasticsearch.cluster.metadata.Template;
|
|
|
import org.elasticsearch.common.Strings;
|
|
|
import org.elasticsearch.common.compress.CompressedXContent;
|
|
@@ -34,10 +38,15 @@ import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.time.DateFormatter;
|
|
|
import org.elasticsearch.common.time.FormatNames;
|
|
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
|
|
+import org.elasticsearch.index.IndexMode;
|
|
|
import org.elasticsearch.index.IndexSettings;
|
|
|
import org.elasticsearch.index.query.RangeQueryBuilder;
|
|
|
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
|
|
+import org.elasticsearch.index.reindex.ReindexAction;
|
|
|
+import org.elasticsearch.index.reindex.ReindexRequest;
|
|
|
import org.elasticsearch.indices.InvalidIndexTemplateException;
|
|
|
import org.elasticsearch.plugins.Plugin;
|
|
|
+import org.elasticsearch.reindex.ReindexPlugin;
|
|
|
import org.elasticsearch.rest.RestStatus;
|
|
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|
|
import org.elasticsearch.test.ESSingleNodeTestCase;
|
|
@@ -53,6 +62,7 @@ import java.util.concurrent.CountDownLatch;
|
|
|
|
|
|
import static org.elasticsearch.test.MapMatcher.assertMap;
|
|
|
import static org.elasticsearch.test.MapMatcher.matchesMap;
|
|
|
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
|
|
|
import static org.hamcrest.Matchers.containsString;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
@@ -98,7 +108,7 @@ public class TSDBIndexingIT extends ESSingleNodeTestCase {
|
|
|
|
|
|
@Override
|
|
|
protected Collection<Class<? extends Plugin>> getPlugins() {
|
|
|
- return List.of(DataStreamsPlugin.class, InternalSettingsPlugin.class);
|
|
|
+ return List.of(DataStreamsPlugin.class, InternalSettingsPlugin.class, ReindexPlugin.class);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -557,6 +567,60 @@ public class TSDBIndexingIT extends ESSingleNodeTestCase {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ public void testReindexing() throws Exception {
|
|
|
+ String dataStreamName = "my-ds";
|
|
|
+ String reindexedDataStreamName = "my-reindexed-ds";
|
|
|
+ var putTemplateRequest = new TransportPutComposableIndexTemplateAction.Request("id");
|
|
|
+ putTemplateRequest.indexTemplate(
|
|
|
+ ComposableIndexTemplate.builder()
|
|
|
+ .indexPatterns(List.of(dataStreamName, reindexedDataStreamName))
|
|
|
+ .template(
|
|
|
+ new Template(
|
|
|
+ Settings.builder().put("index.mode", "time_series").build(),
|
|
|
+ new CompressedXContent(MAPPING_TEMPLATE),
|
|
|
+ null
|
|
|
+ )
|
|
|
+ )
|
|
|
+ .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
|
|
|
+ .build()
|
|
|
+ );
|
|
|
+ assertAcked(client().execute(TransportPutComposableIndexTemplateAction.TYPE, putTemplateRequest));
|
|
|
+
|
|
|
+ // index doc
|
|
|
+ long docCount = randomLongBetween(10, 50);
|
|
|
+ Instant startTime = Instant.now();
|
|
|
+ BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
|
|
|
+ bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
|
|
+ for (int i = 0; i < docCount; i++) {
|
|
|
+ IndexRequest indexRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE);
|
|
|
+ indexRequest.source(DOC.replace("$time", formatInstant(startTime.plusSeconds(i))), XContentType.JSON);
|
|
|
+ bulkRequestBuilder.add(indexRequest);
|
|
|
+ }
|
|
|
+ BulkResponse bulkResponse = bulkRequestBuilder.get();
|
|
|
+ assertThat(bulkResponse.hasFailures(), is(false));
|
|
|
+
|
|
|
+ BulkByScrollResponse reindexResponse = safeGet(
|
|
|
+ client().execute(
|
|
|
+ ReindexAction.INSTANCE,
|
|
|
+ new ReindexRequest().setSourceIndices(dataStreamName).setDestIndex(reindexedDataStreamName).setDestOpType("create")
|
|
|
+ )
|
|
|
+ );
|
|
|
+ assertThat(reindexResponse.getCreated(), equalTo(docCount));
|
|
|
+
|
|
|
+ GetIndexResponse getIndexResponse = safeGet(
|
|
|
+ indicesAdmin().getIndex(new GetIndexRequest(TEST_REQUEST_TIMEOUT).indices(dataStreamName, reindexedDataStreamName))
|
|
|
+ );
|
|
|
+ assertThat(getIndexResponse.getIndices().length, equalTo(2));
|
|
|
+ var index1 = getIndexResponse.getIndices()[0];
|
|
|
+ var index2 = getIndexResponse.getIndices()[1];
|
|
|
+ assertThat(getIndexResponse.getSetting(index1, IndexSettings.MODE.getKey()), equalTo(IndexMode.TIME_SERIES.getName()));
|
|
|
+ assertThat(getIndexResponse.getSetting(index2, IndexSettings.MODE.getKey()), equalTo(IndexMode.TIME_SERIES.getName()));
|
|
|
+ assertThat(
|
|
|
+ getIndexResponse.getSetting(index2, IndexMetadata.INDEX_ROUTING_PATH.getKey()),
|
|
|
+ equalTo(getIndexResponse.getSetting(index1, IndexMetadata.INDEX_ROUTING_PATH.getKey()))
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
static String formatInstant(Instant instant) {
|
|
|
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
|
|
|
}
|