|
@@ -18,6 +18,7 @@ import org.elasticsearch.common.network.NetworkAddress;
|
|
|
import org.elasticsearch.common.time.DateFormatter;
|
|
|
import org.elasticsearch.common.time.FormatNames;
|
|
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
|
|
+import org.elasticsearch.index.mapper.MapperFeatures;
|
|
|
import org.elasticsearch.test.rest.ObjectPath;
|
|
|
import org.elasticsearch.xcontent.XContentType;
|
|
|
|
|
@@ -30,6 +31,7 @@ import java.util.Map;
|
|
|
|
|
|
import static org.elasticsearch.upgrades.StandardToLogsDbIndexModeRollingUpgradeIT.enableLogsdbByDefault;
|
|
|
import static org.elasticsearch.upgrades.StandardToLogsDbIndexModeRollingUpgradeIT.getWriteBackingIndex;
|
|
|
+import static org.elasticsearch.upgrades.TextRollingUpgradeIT.randomAlphasDelimitedBySpace;
|
|
|
import static org.hamcrest.Matchers.containsString;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
|
@@ -37,8 +39,15 @@ import static org.hamcrest.Matchers.notNullValue;
|
|
|
|
|
|
public class MatchOnlyTextRollingUpgradeIT extends AbstractRollingUpgradeWithSecurityTestCase {
|
|
|
|
|
|
+ private static final String DATA_STREAM = "logs-bwc-test";
|
|
|
+
|
|
|
+ private static final int IGNORE_ABOVE_MAX = 256;
|
|
|
+ private static final int NUM_REQUESTS = 4;
|
|
|
+ private static final int NUM_DOCS_PER_REQUEST = 1024;
|
|
|
+
|
|
|
static String BULK_ITEM_TEMPLATE =
|
|
|
"""
|
|
|
+ { "create": {} }
|
|
|
{"@timestamp": "$now", "host.name": "$host", "method": "$method", "ip": "$ip", "message": "$message", "length": $length, "factor": $factor}
|
|
|
""";
|
|
|
|
|
@@ -53,7 +62,13 @@ public class MatchOnlyTextRollingUpgradeIT extends AbstractRollingUpgradeWithSec
|
|
|
"type": "keyword"
|
|
|
},
|
|
|
"message": {
|
|
|
- "type": "match_only_text"
|
|
|
+ "type": "match_only_text",
|
|
|
+ "fields": {
|
|
|
+ "keyword": {
|
|
|
+ "ignore_above": $IGNORE_ABOVE,
|
|
|
+ "type": "keyword"
|
|
|
+ }
|
|
|
+ }
|
|
|
},
|
|
|
"ip": {
|
|
|
"type": "ip"
|
|
@@ -68,55 +83,82 @@ public class MatchOnlyTextRollingUpgradeIT extends AbstractRollingUpgradeWithSec
|
|
|
}
|
|
|
}""";
|
|
|
|
|
|
+ // when sorted, this message will appear at the top and hence can be used to validate query results
|
|
|
+ private static String smallestMessage;
|
|
|
+
|
|
|
public MatchOnlyTextRollingUpgradeIT(@Name("upgradedNodes") int upgradedNodes) {
|
|
|
super(upgradedNodes);
|
|
|
}
|
|
|
|
|
|
public void testIndexing() throws Exception {
|
|
|
- String dataStreamName = "logs-bwc-test";
|
|
|
+ assumeTrue(
|
|
|
+ "Match only text block loader fix is not present in this cluster",
|
|
|
+ oldClusterHasFeature(MapperFeatures.MATCH_ONLY_TEXT_BLOCK_LOADER_FIX)
|
|
|
+ );
|
|
|
+
|
|
|
if (isOldCluster()) {
|
|
|
+ // given - enable logsdb and create a template
|
|
|
startTrial();
|
|
|
enableLogsdbByDefault();
|
|
|
- createTemplate(dataStreamName, getClass().getSimpleName().toLowerCase(Locale.ROOT), TEMPLATE);
|
|
|
+ String templateId = getClass().getSimpleName().toLowerCase(Locale.ROOT);
|
|
|
+ createTemplate(DATA_STREAM, templateId, prepareTemplate());
|
|
|
|
|
|
- Instant startTime = Instant.now().minusSeconds(60 * 60);
|
|
|
- bulkIndex(dataStreamName, 4, 1024, startTime);
|
|
|
+ // when - index some documents
|
|
|
+ bulkIndex(NUM_REQUESTS, NUM_DOCS_PER_REQUEST);
|
|
|
|
|
|
- String firstBackingIndex = getWriteBackingIndex(client(), dataStreamName, 0);
|
|
|
+ // then - verify that logsdb and synthetic source are both enabled
|
|
|
+ String firstBackingIndex = getWriteBackingIndex(client(), DATA_STREAM, 0);
|
|
|
var settings = (Map<?, ?>) getIndexSettingsWithDefaults(firstBackingIndex).get(firstBackingIndex);
|
|
|
assertThat(((Map<?, ?>) settings.get("settings")).get("index.mode"), equalTo("logsdb"));
|
|
|
assertThat(((Map<?, ?>) settings.get("defaults")).get("index.mapping.source.mode"), equalTo("SYNTHETIC"));
|
|
|
|
|
|
- ensureGreen(dataStreamName);
|
|
|
- search(dataStreamName);
|
|
|
- query(dataStreamName);
|
|
|
+ // when/then - run some queries and verify results
|
|
|
+ ensureGreen(DATA_STREAM);
|
|
|
+ search(DATA_STREAM);
|
|
|
+ query(DATA_STREAM);
|
|
|
+
|
|
|
} else if (isMixedCluster()) {
|
|
|
- Instant startTime = Instant.now().minusSeconds(60 * 30);
|
|
|
- bulkIndex(dataStreamName, 4, 1024, startTime);
|
|
|
+ // when
|
|
|
+ bulkIndex(NUM_REQUESTS, NUM_DOCS_PER_REQUEST);
|
|
|
+
|
|
|
+ // when/then
|
|
|
+ ensureGreen(DATA_STREAM);
|
|
|
+ search(DATA_STREAM);
|
|
|
+ query(DATA_STREAM);
|
|
|
|
|
|
- ensureGreen(dataStreamName);
|
|
|
- search(dataStreamName);
|
|
|
- query(dataStreamName);
|
|
|
} else if (isUpgradedCluster()) {
|
|
|
- ensureGreen(dataStreamName);
|
|
|
- Instant startTime = Instant.now();
|
|
|
- bulkIndex(dataStreamName, 4, 1024, startTime);
|
|
|
- search(dataStreamName);
|
|
|
- query(dataStreamName);
|
|
|
+ // when/then
|
|
|
+ ensureGreen(DATA_STREAM);
|
|
|
+ bulkIndex(NUM_REQUESTS, NUM_DOCS_PER_REQUEST);
|
|
|
+ search(DATA_STREAM);
|
|
|
+ query(DATA_STREAM);
|
|
|
|
|
|
- var forceMergeRequest = new Request("POST", "/" + dataStreamName + "/_forcemerge");
|
|
|
+ // when/then continued - force merge all shard segments into one
|
|
|
+ var forceMergeRequest = new Request("POST", "/" + DATA_STREAM + "/_forcemerge");
|
|
|
forceMergeRequest.addParameter("max_num_segments", "1");
|
|
|
assertOK(client().performRequest(forceMergeRequest));
|
|
|
|
|
|
- ensureGreen(dataStreamName);
|
|
|
- search(dataStreamName);
|
|
|
- query(dataStreamName);
|
|
|
+ // then continued
|
|
|
+ ensureGreen(DATA_STREAM);
|
|
|
+ search(DATA_STREAM);
|
|
|
+ query(DATA_STREAM);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private String prepareTemplate() {
|
|
|
+ boolean shouldSetIgnoreAbove = randomBoolean();
|
|
|
+ if (shouldSetIgnoreAbove) {
|
|
|
+ return TEMPLATE.replace("$IGNORE_ABOVE", String.valueOf(randomInt(IGNORE_ABOVE_MAX)));
|
|
|
}
|
|
|
+
|
|
|
+ // removes the entire line that defines ignore_above
|
|
|
+ return TEMPLATE.replaceAll("(?m)^\\s*\"ignore_above\":\\s*\\$IGNORE_ABOVE\\s*,?\\s*\\n?", "");
|
|
|
}
|
|
|
|
|
|
static void createTemplate(String dataStreamName, String id, String template) throws IOException {
|
|
|
final String INDEX_TEMPLATE = """
|
|
|
{
|
|
|
+ "priority": 500,
|
|
|
"index_patterns": ["$DATASTREAM"],
|
|
|
"template": $TEMPLATE,
|
|
|
"data_stream": {
|
|
@@ -127,46 +169,59 @@ public class MatchOnlyTextRollingUpgradeIT extends AbstractRollingUpgradeWithSec
|
|
|
assertOK(client().performRequest(putIndexTemplateRequest));
|
|
|
}
|
|
|
|
|
|
- static String bulkIndex(String dataStreamName, int numRequest, int numDocs, Instant startTime) throws Exception {
|
|
|
+ private void bulkIndex(int numRequest, int numDocs) throws Exception {
|
|
|
String firstIndex = null;
|
|
|
+ Instant startTime = Instant.now().minusSeconds(60 * 60);
|
|
|
+
|
|
|
for (int i = 0; i < numRequest; i++) {
|
|
|
- var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk");
|
|
|
- StringBuilder requestBody = new StringBuilder();
|
|
|
- for (int j = 0; j < numDocs; j++) {
|
|
|
- String hostName = "host" + j % 50; // Not realistic, but makes asserting search / query response easier.
|
|
|
- String methodName = "method" + j % 5;
|
|
|
- String ip = NetworkAddress.format(randomIp(true));
|
|
|
- String param = "chicken" + randomInt(5);
|
|
|
- String message = "the quick brown fox jumps over the " + param;
|
|
|
- long length = randomLong();
|
|
|
- double factor = randomDouble();
|
|
|
-
|
|
|
- requestBody.append("{\"create\": {}}");
|
|
|
- requestBody.append('\n');
|
|
|
- requestBody.append(
|
|
|
- BULK_ITEM_TEMPLATE.replace("$now", formatInstant(startTime))
|
|
|
- .replace("$host", hostName)
|
|
|
- .replace("$method", methodName)
|
|
|
- .replace("$ip", ip)
|
|
|
- .replace("$message", message)
|
|
|
- .replace("$length", Long.toString(length))
|
|
|
- .replace("$factor", Double.toString(factor))
|
|
|
- );
|
|
|
- requestBody.append('\n');
|
|
|
-
|
|
|
- startTime = startTime.plusMillis(1);
|
|
|
- }
|
|
|
- bulkRequest.setJsonEntity(requestBody.toString());
|
|
|
+ var bulkRequest = new Request("POST", "/" + DATA_STREAM + "/_bulk");
|
|
|
+ bulkRequest.setJsonEntity(bulkIndexRequestBody(numDocs, startTime));
|
|
|
bulkRequest.addParameter("refresh", "true");
|
|
|
+
|
|
|
var response = client().performRequest(bulkRequest);
|
|
|
- assertOK(response);
|
|
|
var responseBody = entityAsMap(response);
|
|
|
+
|
|
|
+ assertOK(response);
|
|
|
assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false));
|
|
|
if (firstIndex == null) {
|
|
|
firstIndex = (String) ((Map<?, ?>) ((Map<?, ?>) ((List<?>) responseBody.get("items")).get(0)).get("create")).get("_index");
|
|
|
}
|
|
|
}
|
|
|
- return firstIndex;
|
|
|
+ }
|
|
|
+
|
|
|
+ private String bulkIndexRequestBody(int numDocs, Instant startTime) {
|
|
|
+ StringBuilder requestBody = new StringBuilder();
|
|
|
+
|
|
|
+ for (int j = 0; j < numDocs; j++) {
|
|
|
+ String hostName = "host" + j % 50; // Not realistic, but makes asserting search / query response easier.
|
|
|
+ String methodName = "method" + j % 5;
|
|
|
+ String ip = NetworkAddress.format(randomIp(true));
|
|
|
+ String message = randomAlphasDelimitedBySpace(10, 1, 15);
|
|
|
+ recordSmallestMessage(message);
|
|
|
+ long length = randomLong();
|
|
|
+ double factor = randomDouble();
|
|
|
+
|
|
|
+ requestBody.append(
|
|
|
+ BULK_ITEM_TEMPLATE.replace("$now", formatInstant(startTime))
|
|
|
+ .replace("$host", hostName)
|
|
|
+ .replace("$method", methodName)
|
|
|
+ .replace("$ip", ip)
|
|
|
+ .replace("$message", message)
|
|
|
+ .replace("$length", Long.toString(length))
|
|
|
+ .replace("$factor", Double.toString(factor))
|
|
|
+ );
|
|
|
+ requestBody.append('\n');
|
|
|
+
|
|
|
+ startTime = startTime.plusMillis(1);
|
|
|
+ }
|
|
|
+
|
|
|
+ return requestBody.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void recordSmallestMessage(final String message) {
|
|
|
+ if (smallestMessage == null || message.compareTo(smallestMessage) < 0) {
|
|
|
+ smallestMessage = message;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
void search(String dataStreamName) throws Exception {
|
|
@@ -174,24 +229,19 @@ public class MatchOnlyTextRollingUpgradeIT extends AbstractRollingUpgradeWithSec
|
|
|
searchRequest.addParameter("pretty", "true");
|
|
|
searchRequest.setJsonEntity("""
|
|
|
{
|
|
|
- "size": 500,
|
|
|
- "query": {
|
|
|
- "match_phrase": {
|
|
|
- "message": "chicken"
|
|
|
- }
|
|
|
- }
|
|
|
+ "size": 500
|
|
|
}
|
|
|
- """.replace("chicken", "chicken" + randomInt(5)));
|
|
|
+ """);
|
|
|
var response = client().performRequest(searchRequest);
|
|
|
assertOK(response);
|
|
|
var responseBody = entityAsMap(response);
|
|
|
logger.info("{}", responseBody);
|
|
|
|
|
|
Integer totalCount = ObjectPath.evaluate(responseBody, "hits.total.value");
|
|
|
- assertThat(totalCount, greaterThanOrEqualTo(512));
|
|
|
+ assertThat(totalCount, greaterThanOrEqualTo(NUM_REQUESTS * NUM_DOCS_PER_REQUEST));
|
|
|
}
|
|
|
|
|
|
- void query(String dataStreamName) throws Exception {
|
|
|
+ private void query(String dataStreamName) throws Exception {
|
|
|
var queryRequest = new Request("POST", "/_query");
|
|
|
queryRequest.addParameter("pretty", "true");
|
|
|
queryRequest.setJsonEntity("""
|
|
@@ -205,18 +255,18 @@ public class MatchOnlyTextRollingUpgradeIT extends AbstractRollingUpgradeWithSec
|
|
|
logger.info("{}", responseBody);
|
|
|
|
|
|
String column1 = ObjectPath.evaluate(responseBody, "columns.0.name");
|
|
|
- String column2 = ObjectPath.evaluate(responseBody, "columns.1.name");
|
|
|
- String column3 = ObjectPath.evaluate(responseBody, "columns.2.name");
|
|
|
assertThat(column1, equalTo("max(length)"));
|
|
|
+ String column2 = ObjectPath.evaluate(responseBody, "columns.1.name");
|
|
|
assertThat(column2, equalTo("max(factor)"));
|
|
|
+ String column3 = ObjectPath.evaluate(responseBody, "columns.2.name");
|
|
|
assertThat(column3, equalTo("message"));
|
|
|
|
|
|
- String key = ObjectPath.evaluate(responseBody, "values.0.2");
|
|
|
- assertThat(key, equalTo("the quick brown fox jumps over the chicken0"));
|
|
|
Long maxRx = ObjectPath.evaluate(responseBody, "values.0.0");
|
|
|
assertThat(maxRx, notNullValue());
|
|
|
Double maxTx = ObjectPath.evaluate(responseBody, "values.0.1");
|
|
|
assertThat(maxTx, notNullValue());
|
|
|
+ String key = ObjectPath.evaluate(responseBody, "values.0.2");
|
|
|
+ assertThat(key, equalTo(smallestMessage));
|
|
|
}
|
|
|
|
|
|
protected static void startTrial() throws IOException {
|