|
@@ -22,9 +22,12 @@ import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase;
|
|
|
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Comparator;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Objects;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.stream.Collectors;
|
|
|
+import java.util.stream.IntStream;
|
|
|
|
|
|
import static org.hamcrest.Matchers.anEmptyMap;
|
|
|
import static org.hamcrest.Matchers.instanceOf;
|
|
@@ -39,10 +42,10 @@ public class TransformGetCheckpointIT extends TransformSingleNodeTestCase {
|
|
|
public void testGetCheckpoint() throws Exception {
|
|
|
final String indexNamePrefix = "test_index-";
|
|
|
final int shards = randomIntBetween(1, 5);
|
|
|
- final int indices = randomIntBetween(1, 5);
|
|
|
+ var indices = indices(indexNamePrefix, randomIntBetween(1, 5));
|
|
|
|
|
|
- for (int i = 0; i < indices; ++i) {
|
|
|
- indicesAdmin().prepareCreate(indexNamePrefix + i).setSettings(indexSettings(shards, 1)).get();
|
|
|
+ for (var index : indices) {
|
|
|
+ indicesAdmin().prepareCreate(index).setSettings(indexSettings(shards, 1)).get();
|
|
|
}
|
|
|
|
|
|
final GetCheckpointAction.Request request = new GetCheckpointAction.Request(
|
|
@@ -54,7 +57,7 @@ public class TransformGetCheckpointIT extends TransformSingleNodeTestCase {
|
|
|
);
|
|
|
|
|
|
final GetCheckpointAction.Response response = client().execute(GetCheckpointAction.INSTANCE, request).get();
|
|
|
- assertEquals(indices, response.getCheckpoints().size());
|
|
|
+ assertEquals(indices.size(), response.getCheckpoints().size());
|
|
|
|
|
|
// empty indices should report -1 as sequence id
|
|
|
assertFalse(
|
|
@@ -63,9 +66,9 @@ public class TransformGetCheckpointIT extends TransformSingleNodeTestCase {
|
|
|
|
|
|
final int docsToCreatePerShard = randomIntBetween(0, 10);
|
|
|
for (int d = 0; d < docsToCreatePerShard; ++d) {
|
|
|
- for (int i = 0; i < indices; ++i) {
|
|
|
+ for (var index : indices) {
|
|
|
for (int j = 0; j < shards; ++j) {
|
|
|
- prepareIndex(indexNamePrefix + i).setSource("{" + "\"field\":" + j + "}", XContentType.JSON).get();
|
|
|
+ prepareIndex(index).setSource("{" + "\"field\":" + j + "}", XContentType.JSON).get();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -73,20 +76,20 @@ public class TransformGetCheckpointIT extends TransformSingleNodeTestCase {
|
|
|
indicesAdmin().refresh(new RefreshRequest(indexNamePrefix + "*"));
|
|
|
|
|
|
final GetCheckpointAction.Response response2 = client().execute(GetCheckpointAction.INSTANCE, request).get();
|
|
|
- assertEquals(indices, response2.getCheckpoints().size());
|
|
|
+ assertEquals(indices.size(), response2.getCheckpoints().size());
|
|
|
|
|
|
// check the sum, counting starts with 0, so we have to take docsToCreatePerShard - 1
|
|
|
long checkpointSum = response2.getCheckpoints().values().stream().map(l -> Arrays.stream(l).sum()).mapToLong(Long::valueOf).sum();
|
|
|
assertEquals(
|
|
|
"Expected "
|
|
|
- + (docsToCreatePerShard - 1) * shards * indices
|
|
|
+ + (docsToCreatePerShard - 1) * shards * indices.size()
|
|
|
+ " as sum of "
|
|
|
+ response2.getCheckpoints()
|
|
|
.entrySet()
|
|
|
.stream()
|
|
|
.map(e -> e.getKey() + ": {" + Strings.arrayToCommaDelimitedString(Arrays.stream(e.getValue()).boxed().toArray()) + "}")
|
|
|
.collect(Collectors.joining(",")),
|
|
|
- (docsToCreatePerShard - 1) * shards * indices,
|
|
|
+ (docsToCreatePerShard - 1) * shards * indices.size(),
|
|
|
checkpointSum
|
|
|
);
|
|
|
|
|
@@ -98,25 +101,28 @@ public class TransformGetCheckpointIT extends TransformSingleNodeTestCase {
|
|
|
.filter(i -> i.getShardRouting().primary())
|
|
|
.sorted(Comparator.comparingInt(value -> value.getShardRouting().id()))
|
|
|
.mapToLong(s -> s.getSeqNoStats().getGlobalCheckpoint())
|
|
|
+ .filter(Objects::nonNull)
|
|
|
.sum(),
|
|
|
checkpointSum
|
|
|
);
|
|
|
+ deleteIndices(indices);
|
|
|
}
|
|
|
|
|
|
public void testGetCheckpointWithQueryThatFiltersOutEverything() throws Exception {
|
|
|
final String indexNamePrefix = "test_index-";
|
|
|
- final int indices = randomIntBetween(1, 5);
|
|
|
+ var indices = indices(indexNamePrefix, randomIntBetween(1, 5));
|
|
|
final int shards = randomIntBetween(1, 5);
|
|
|
final int docsToCreatePerShard = randomIntBetween(0, 10);
|
|
|
|
|
|
- for (int i = 0; i < indices; ++i) {
|
|
|
- indicesAdmin().prepareCreate(indexNamePrefix + i)
|
|
|
+ for (int i = 0; i < indices.size(); ++i) {
|
|
|
+ var index = indices.get(i);
|
|
|
+ indicesAdmin().prepareCreate(index)
|
|
|
.setSettings(indexSettings(shards, 1))
|
|
|
.setMapping("field", "type=long", "@timestamp", "type=date")
|
|
|
.get();
|
|
|
for (int j = 0; j < shards; ++j) {
|
|
|
for (int d = 0; d < docsToCreatePerShard; ++d) {
|
|
|
- client().prepareIndex(indexNamePrefix + i)
|
|
|
+ client().prepareIndex(index)
|
|
|
.setSource(Strings.format("{ \"field\":%d, \"@timestamp\": %d }", j, 10_000_000 + d + i + j), XContentType.JSON)
|
|
|
.get();
|
|
|
}
|
|
@@ -135,6 +141,7 @@ public class TransformGetCheckpointIT extends TransformSingleNodeTestCase {
|
|
|
|
|
|
final GetCheckpointAction.Response response = client().execute(GetCheckpointAction.INSTANCE, request).get();
|
|
|
assertThat("Response was: " + response.getCheckpoints(), response.getCheckpoints(), is(anEmptyMap()));
|
|
|
+ deleteIndices(indices);
|
|
|
}
|
|
|
|
|
|
public void testGetCheckpointWithMissingIndex() throws Exception {
|
|
@@ -163,11 +170,11 @@ public class TransformGetCheckpointIT extends TransformSingleNodeTestCase {
|
|
|
|
|
|
public void testGetCheckpointTimeoutExceeded() throws Exception {
|
|
|
final String indexNamePrefix = "test_index-";
|
|
|
- final int indices = 100;
|
|
|
+ var indices = indices(indexNamePrefix, 100);
|
|
|
final int shards = 5;
|
|
|
|
|
|
- for (int i = 0; i < indices; ++i) {
|
|
|
- indicesAdmin().prepareCreate(indexNamePrefix + i).setSettings(indexSettings(shards, 0)).get();
|
|
|
+ for (var index : indices) {
|
|
|
+ indicesAdmin().prepareCreate(index).setSettings(indexSettings(shards, 0)).get();
|
|
|
}
|
|
|
|
|
|
final GetCheckpointAction.Request request = new GetCheckpointAction.Request(
|
|
@@ -184,7 +191,7 @@ public class TransformGetCheckpointIT extends TransformSingleNodeTestCase {
|
|
|
finalException.set(e);
|
|
|
latch.countDown();
|
|
|
}));
|
|
|
- latch.await(10, TimeUnit.SECONDS);
|
|
|
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
|
|
|
|
|
|
Exception e = finalException.get();
|
|
|
if (e != null) {
|
|
@@ -198,5 +205,19 @@ public class TransformGetCheckpointIT extends TransformSingleNodeTestCase {
|
|
|
// Due to system clock usage, the timeout does not always occur where it should.
|
|
|
// We cannot mock the clock so we just have to live with it.
|
|
|
}
|
|
|
+ deleteIndices(indices);
|
|
|
+ }
|
|
|
+
|
|
|
+ private List<String> indices(String prefix, int numberOfIndices) {
|
|
|
+ return IntStream.range(0, numberOfIndices).mapToObj(i -> prefix + i).toList();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void deleteIndices(List<String> indices) {
|
|
|
+ try {
|
|
|
+ indicesAdmin().prepareDelete(indices.toArray(new String[0])).get();
|
|
|
+ } catch (Exception e) {
|
|
|
+ // we can fail to clean up the indices, but this wouldn't impact other tests since the node gets torn down anyway
|
|
|
+ // the index delete is to help the node tear down go smoother
|
|
|
+ }
|
|
|
}
|
|
|
}
|