|
|
@@ -8,12 +8,19 @@
|
|
|
package org.elasticsearch.datastreams;
|
|
|
|
|
|
import org.elasticsearch.action.DocWriteRequest;
|
|
|
+import org.elasticsearch.action.DocWriteResponse;
|
|
|
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
|
|
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
|
|
|
+import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
|
|
|
+import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
|
|
+import org.elasticsearch.action.admin.indices.shrink.ResizeType;
|
|
|
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
|
|
|
+import org.elasticsearch.action.bulk.BulkRequest;
|
|
|
+import org.elasticsearch.action.delete.DeleteRequest;
|
|
|
import org.elasticsearch.action.get.GetRequest;
|
|
|
import org.elasticsearch.action.index.IndexRequest;
|
|
|
import org.elasticsearch.action.search.SearchRequest;
|
|
|
+import org.elasticsearch.action.support.WriteRequest;
|
|
|
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
|
|
|
import org.elasticsearch.cluster.metadata.Template;
|
|
|
import org.elasticsearch.common.compress.CompressedXContent;
|
|
|
@@ -151,6 +158,10 @@ public class TSDBPassthroughIndexingIT extends ESSingleNodeTestCase {
|
|
|
assertHitCount(searchResponse, 1);
|
|
|
assertThat(searchResponse.getHits().getHits()[0].getId(), equalTo(id));
|
|
|
});
|
|
|
+ var deleteResponse = client().delete(new DeleteRequest(index, id)).actionGet();
|
|
|
+ assertThat(deleteResponse.getIndex(), equalTo(index));
|
|
|
+ assertThat(deleteResponse.getId(), equalTo(id));
|
|
|
+ assertThat(deleteResponse.getResult(), equalTo(DocWriteResponse.Result.DELETED));
|
|
|
time = time.plusMillis(1);
|
|
|
}
|
|
|
|
|
|
@@ -187,6 +198,89 @@ public class TSDBPassthroughIndexingIT extends ESSingleNodeTestCase {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
+ public void testIndexingGettingAndSearchingShrunkIndex() throws Exception {
|
|
|
+ String dataStreamName = "k8s";
|
|
|
+ var templateSettings = Settings.builder()
|
|
|
+ .put("index.mode", "time_series")
|
|
|
+ .put("index.number_of_shards", 8)
|
|
|
+ .put("index.number_of_replicas", 0);
|
|
|
+
|
|
|
+ var request = new TransportPutComposableIndexTemplateAction.Request("id");
|
|
|
+ request.indexTemplate(
|
|
|
+ ComposableIndexTemplate.builder()
|
|
|
+ .indexPatterns(List.of("k8s*"))
|
|
|
+ .template(new Template(templateSettings.build(), new CompressedXContent(MAPPING_TEMPLATE), null))
|
|
|
+ .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
|
|
|
+ .build()
|
|
|
+ );
|
|
|
+ client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
|
|
|
+
|
|
|
+ Instant time = Instant.now();
|
|
|
+ int numBulkItems = randomIntBetween(16, 128);
|
|
|
+ var bulkRequest = new BulkRequest(dataStreamName);
|
|
|
+ for (int i = 0; i < numBulkItems; i++) {
|
|
|
+ var indexRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE);
|
|
|
+ indexRequest.source(
|
|
|
+ DOC.replace("$time", formatInstant(time))
|
|
|
+ .replace("$uid", randomUUID())
|
|
|
+ .replace("$name", randomAlphaOfLength(4))
|
|
|
+ .replace("$ip", InetAddresses.toAddrString(randomIp(randomBoolean()))),
|
|
|
+ XContentType.JSON
|
|
|
+ );
|
|
|
+ bulkRequest.add(indexRequest);
|
|
|
+ time = time.plusMillis(1);
|
|
|
+ }
|
|
|
+
|
|
|
+ bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
|
|
|
+ var bulkResponse = client().bulk(bulkRequest).actionGet();
|
|
|
+ for (var itemResponse : bulkResponse) {
|
|
|
+ String id = itemResponse.getId();
|
|
|
+ String index = itemResponse.getIndex();
|
|
|
+ var getResponse = client().get(new GetRequest(index, id)).actionGet();
|
|
|
+ assertThat(getResponse.isExists(), is(true));
|
|
|
+
|
|
|
+ var searchRequest = new SearchRequest(index);
|
|
|
+ searchRequest.source(new SearchSourceBuilder().query(new TermQueryBuilder("_id", id)));
|
|
|
+ assertResponse(client().search(searchRequest), searchResponse -> {
|
|
|
+ assertHitCount(searchResponse, 1);
|
|
|
+ assertThat(searchResponse.getHits().getHits()[0].getId(), equalTo(id));
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ var rolloverResponse = client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet();
|
|
|
+ assertThat(rolloverResponse.isRolledOver(), is(true));
|
|
|
+ String sourceIndex = rolloverResponse.getOldIndex();
|
|
|
+
|
|
|
+ var updateSettingsResponse = client().admin()
|
|
|
+ .indices()
|
|
|
+ .updateSettings(new UpdateSettingsRequest(sourceIndex).settings(Settings.builder().put("index.blocks.write", true)))
|
|
|
+ .actionGet();
|
|
|
+ assertThat(updateSettingsResponse.isAcknowledged(), is(true));
|
|
|
+
|
|
|
+ String shrunkenTarget = "k8s-shrunken";
|
|
|
+ var shrinkIndexResponse = client().admin()
|
|
|
+ .indices()
|
|
|
+ .prepareResizeIndex(sourceIndex, shrunkenTarget)
|
|
|
+ .setResizeType(ResizeType.SHRINK)
|
|
|
+ .setSettings(indexSettings(2, 0).build())
|
|
|
+ .get();
|
|
|
+ assertThat(shrinkIndexResponse.isAcknowledged(), is(true));
|
|
|
+ assertThat(shrinkIndexResponse.index(), equalTo(shrunkenTarget));
|
|
|
+
|
|
|
+ for (var itemResponse : bulkResponse) {
|
|
|
+ String id = itemResponse.getId();
|
|
|
+ var getResponse = client().get(new GetRequest(shrunkenTarget, id)).actionGet();
|
|
|
+ assertThat(getResponse.isExists(), is(true));
|
|
|
+
|
|
|
+ var searchRequest = new SearchRequest(shrunkenTarget);
|
|
|
+ searchRequest.source(new SearchSourceBuilder().query(new TermQueryBuilder("_id", id)));
|
|
|
+ assertResponse(client().search(searchRequest), searchResponse -> {
|
|
|
+ assertHitCount(searchResponse, 1);
|
|
|
+ assertThat(searchResponse.getHits().getHits()[0].getId(), equalTo(id));
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
static String formatInstant(Instant instant) {
|
|
|
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
|
|
|
}
|