|
@@ -76,7 +76,13 @@ public class CategorizeBlockHashTests extends BlockHashTestCase {
|
|
|
).getAnalysisRegistry();
|
|
|
}
|
|
|
|
|
|
+ private BlockHash.CategorizeDef getCategorizeDef() {
|
|
|
+ return new BlockHash.CategorizeDef(null, randomFrom(BlockHash.CategorizeDef.OutputFormat.values()), 70);
|
|
|
+ }
|
|
|
+
|
|
|
public void testCategorizeRaw() {
|
|
|
+ BlockHash.CategorizeDef categorizeDef = getCategorizeDef();
|
|
|
+
|
|
|
final Page page;
|
|
|
boolean withNull = randomBoolean();
|
|
|
final int positions = 7 + (withNull ? 1 : 0);
|
|
@@ -98,7 +104,7 @@ public class CategorizeBlockHashTests extends BlockHashTestCase {
|
|
|
page = new Page(builder.build());
|
|
|
}
|
|
|
|
|
|
- try (var hash = new CategorizeBlockHash(blockFactory, 0, AggregatorMode.SINGLE, analysisRegistry)) {
|
|
|
+ try (var hash = new CategorizeBlockHash(blockFactory, 0, AggregatorMode.SINGLE, categorizeDef, analysisRegistry)) {
|
|
|
for (int i = randomInt(2); i < 3; i++) {
|
|
|
hash.add(page, new GroupingAggregatorFunction.AddInput() {
|
|
|
private void addBlock(int positionOffset, IntBlock groupIds) {
|
|
@@ -137,7 +143,10 @@ public class CategorizeBlockHashTests extends BlockHashTestCase {
|
|
|
}
|
|
|
});
|
|
|
|
|
|
- assertHashState(hash, withNull, ".*?Connected.+?to.*?", ".*?Connection.+?error.*?", ".*?Disconnected.*?");
|
|
|
+ switch (categorizeDef.outputFormat()) {
|
|
|
+ case REGEX -> assertHashState(hash, withNull, ".*?Connected.+?to.*?", ".*?Connection.+?error.*?", ".*?Disconnected.*?");
|
|
|
+ case TOKENS -> assertHashState(hash, withNull, "Connected to", "Connection error", "Disconnected");
|
|
|
+ }
|
|
|
}
|
|
|
} finally {
|
|
|
page.releaseBlocks();
|
|
@@ -145,6 +154,8 @@ public class CategorizeBlockHashTests extends BlockHashTestCase {
|
|
|
}
|
|
|
|
|
|
public void testCategorizeRawMultivalue() {
|
|
|
+ BlockHash.CategorizeDef categorizeDef = getCategorizeDef();
|
|
|
+
|
|
|
final Page page;
|
|
|
boolean withNull = randomBoolean();
|
|
|
final int positions = 3 + (withNull ? 1 : 0);
|
|
@@ -170,7 +181,7 @@ public class CategorizeBlockHashTests extends BlockHashTestCase {
|
|
|
page = new Page(builder.build());
|
|
|
}
|
|
|
|
|
|
- try (var hash = new CategorizeBlockHash(blockFactory, 0, AggregatorMode.SINGLE, analysisRegistry)) {
|
|
|
+ try (var hash = new CategorizeBlockHash(blockFactory, 0, AggregatorMode.SINGLE, categorizeDef, analysisRegistry)) {
|
|
|
for (int i = randomInt(2); i < 3; i++) {
|
|
|
hash.add(page, new GroupingAggregatorFunction.AddInput() {
|
|
|
private void addBlock(int positionOffset, IntBlock groupIds) {
|
|
@@ -216,7 +227,10 @@ public class CategorizeBlockHashTests extends BlockHashTestCase {
|
|
|
}
|
|
|
});
|
|
|
|
|
|
- assertHashState(hash, withNull, ".*?Connected.+?to.*?", ".*?Connection.+?error.*?", ".*?Disconnected.*?");
|
|
|
+ switch (categorizeDef.outputFormat()) {
|
|
|
+ case REGEX -> assertHashState(hash, withNull, ".*?Connected.+?to.*?", ".*?Connection.+?error.*?", ".*?Disconnected.*?");
|
|
|
+ case TOKENS -> assertHashState(hash, withNull, "Connected to", "Connection error", "Disconnected");
|
|
|
+ }
|
|
|
}
|
|
|
} finally {
|
|
|
page.releaseBlocks();
|
|
@@ -224,6 +238,8 @@ public class CategorizeBlockHashTests extends BlockHashTestCase {
|
|
|
}
|
|
|
|
|
|
public void testCategorizeIntermediate() {
|
|
|
+ BlockHash.CategorizeDef categorizeDef = getCategorizeDef();
|
|
|
+
|
|
|
Page page1;
|
|
|
boolean withNull = randomBoolean();
|
|
|
int positions1 = 7 + (withNull ? 1 : 0);
|
|
@@ -259,8 +275,8 @@ public class CategorizeBlockHashTests extends BlockHashTestCase {
|
|
|
|
|
|
// Fill intermediatePages with the intermediate state from the raw hashes
|
|
|
try (
|
|
|
- BlockHash rawHash1 = new CategorizeBlockHash(blockFactory, 0, AggregatorMode.INITIAL, analysisRegistry);
|
|
|
- BlockHash rawHash2 = new CategorizeBlockHash(blockFactory, 0, AggregatorMode.INITIAL, analysisRegistry);
|
|
|
+ BlockHash rawHash1 = new CategorizeBlockHash(blockFactory, 0, AggregatorMode.INITIAL, categorizeDef, analysisRegistry);
|
|
|
+ BlockHash rawHash2 = new CategorizeBlockHash(blockFactory, 0, AggregatorMode.INITIAL, categorizeDef, analysisRegistry);
|
|
|
) {
|
|
|
rawHash1.add(page1, new GroupingAggregatorFunction.AddInput() {
|
|
|
private void addBlock(int positionOffset, IntBlock groupIds) {
|
|
@@ -335,7 +351,7 @@ public class CategorizeBlockHashTests extends BlockHashTestCase {
|
|
|
page2.releaseBlocks();
|
|
|
}
|
|
|
|
|
|
- try (var intermediateHash = new CategorizeBlockHash(blockFactory, 0, AggregatorMode.FINAL, null)) {
|
|
|
+ try (var intermediateHash = new CategorizeBlockHash(blockFactory, 0, AggregatorMode.FINAL, categorizeDef, null)) {
|
|
|
intermediateHash.add(intermediatePage1, new GroupingAggregatorFunction.AddInput() {
|
|
|
private void addBlock(int positionOffset, IntBlock groupIds) {
|
|
|
List<Integer> values = IntStream.range(0, groupIds.getPositionCount())
|
|
@@ -403,14 +419,24 @@ public class CategorizeBlockHashTests extends BlockHashTestCase {
|
|
|
}
|
|
|
});
|
|
|
|
|
|
- assertHashState(
|
|
|
- intermediateHash,
|
|
|
- withNull,
|
|
|
- ".*?Connected.+?to.*?",
|
|
|
- ".*?Connection.+?error.*?",
|
|
|
- ".*?Disconnected.*?",
|
|
|
- ".*?System.+?shutdown.*?"
|
|
|
- );
|
|
|
+ switch (categorizeDef.outputFormat()) {
|
|
|
+ case REGEX -> assertHashState(
|
|
|
+ intermediateHash,
|
|
|
+ withNull,
|
|
|
+ ".*?Connected.+?to.*?",
|
|
|
+ ".*?Connection.+?error.*?",
|
|
|
+ ".*?Disconnected.*?",
|
|
|
+ ".*?System.+?shutdown.*?"
|
|
|
+ );
|
|
|
+ case TOKENS -> assertHashState(
|
|
|
+ intermediateHash,
|
|
|
+ withNull,
|
|
|
+ "Connected to",
|
|
|
+ "Connection error",
|
|
|
+ "Disconnected",
|
|
|
+ "System shutdown"
|
|
|
+ );
|
|
|
+ }
|
|
|
}
|
|
|
} finally {
|
|
|
intermediatePage1.releaseBlocks();
|
|
@@ -419,6 +445,9 @@ public class CategorizeBlockHashTests extends BlockHashTestCase {
|
|
|
}
|
|
|
|
|
|
public void testCategorize_withDriver() {
|
|
|
+ BlockHash.CategorizeDef categorizeDef = getCategorizeDef();
|
|
|
+ BlockHash.GroupSpec groupSpec = new BlockHash.GroupSpec(0, ElementType.BYTES_REF, categorizeDef);
|
|
|
+
|
|
|
BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofMb(256)).withCircuitBreaking();
|
|
|
CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST);
|
|
|
DriverContext driverContext = new DriverContext(bigArrays, new BlockFactory(breaker, bigArrays));
|
|
@@ -477,7 +506,7 @@ public class CategorizeBlockHashTests extends BlockHashTestCase {
|
|
|
new LocalSourceOperator(input1),
|
|
|
List.of(
|
|
|
new HashAggregationOperator.HashAggregationOperatorFactory(
|
|
|
- List.of(makeGroupSpec()),
|
|
|
+ List.of(groupSpec),
|
|
|
AggregatorMode.INITIAL,
|
|
|
List.of(
|
|
|
new SumLongAggregatorFunctionSupplier().groupingAggregatorFactory(AggregatorMode.INITIAL, List.of(1)),
|
|
@@ -496,7 +525,7 @@ public class CategorizeBlockHashTests extends BlockHashTestCase {
|
|
|
new LocalSourceOperator(input2),
|
|
|
List.of(
|
|
|
new HashAggregationOperator.HashAggregationOperatorFactory(
|
|
|
- List.of(makeGroupSpec()),
|
|
|
+ List.of(groupSpec),
|
|
|
AggregatorMode.INITIAL,
|
|
|
List.of(
|
|
|
new SumLongAggregatorFunctionSupplier().groupingAggregatorFactory(AggregatorMode.INITIAL, List.of(1)),
|
|
@@ -517,7 +546,7 @@ public class CategorizeBlockHashTests extends BlockHashTestCase {
|
|
|
new CannedSourceOperator(intermediateOutput.iterator()),
|
|
|
List.of(
|
|
|
new HashAggregationOperator.HashAggregationOperatorFactory(
|
|
|
- List.of(makeGroupSpec()),
|
|
|
+ List.of(groupSpec),
|
|
|
AggregatorMode.FINAL,
|
|
|
List.of(
|
|
|
new SumLongAggregatorFunctionSupplier().groupingAggregatorFactory(AggregatorMode.FINAL, List.of(1, 2)),
|
|
@@ -544,23 +573,36 @@ public class CategorizeBlockHashTests extends BlockHashTestCase {
|
|
|
sums.put(outputTexts.getBytesRef(i, new BytesRef()).utf8ToString(), outputSums.getLong(i));
|
|
|
maxs.put(outputTexts.getBytesRef(i, new BytesRef()).utf8ToString(), outputMaxs.getLong(i));
|
|
|
}
|
|
|
+ List<String> keys = switch (categorizeDef.outputFormat()) {
|
|
|
+ case REGEX -> List.of(
|
|
|
+ ".*?aaazz.*?",
|
|
|
+ ".*?bbbzz.*?",
|
|
|
+ ".*?ccczz.*?",
|
|
|
+ ".*?dddzz.*?",
|
|
|
+ ".*?eeezz.*?",
|
|
|
+ ".*?words.+?words.+?words.+?goodbye.*?",
|
|
|
+ ".*?words.+?words.+?words.+?hello.*?"
|
|
|
+ );
|
|
|
+ case TOKENS -> List.of("aaazz", "bbbzz", "ccczz", "dddzz", "eeezz", "words words words goodbye", "words words words hello");
|
|
|
+ };
|
|
|
+
|
|
|
assertThat(
|
|
|
sums,
|
|
|
equalTo(
|
|
|
Map.of(
|
|
|
- ".*?aaazz.*?",
|
|
|
+ keys.get(0),
|
|
|
1L,
|
|
|
- ".*?bbbzz.*?",
|
|
|
+ keys.get(1),
|
|
|
2L,
|
|
|
- ".*?ccczz.*?",
|
|
|
+ keys.get(2),
|
|
|
33L,
|
|
|
- ".*?dddzz.*?",
|
|
|
+ keys.get(3),
|
|
|
44L,
|
|
|
- ".*?eeezz.*?",
|
|
|
+ keys.get(4),
|
|
|
5L,
|
|
|
- ".*?words.+?words.+?words.+?goodbye.*?",
|
|
|
+ keys.get(5),
|
|
|
8888L,
|
|
|
- ".*?words.+?words.+?words.+?hello.*?",
|
|
|
+ keys.get(6),
|
|
|
999L
|
|
|
)
|
|
|
)
|
|
@@ -569,19 +611,19 @@ public class CategorizeBlockHashTests extends BlockHashTestCase {
|
|
|
maxs,
|
|
|
equalTo(
|
|
|
Map.of(
|
|
|
- ".*?aaazz.*?",
|
|
|
+ keys.get(0),
|
|
|
1L,
|
|
|
- ".*?bbbzz.*?",
|
|
|
+ keys.get(1),
|
|
|
2L,
|
|
|
- ".*?ccczz.*?",
|
|
|
+ keys.get(2),
|
|
|
30L,
|
|
|
- ".*?dddzz.*?",
|
|
|
+ keys.get(3),
|
|
|
40L,
|
|
|
- ".*?eeezz.*?",
|
|
|
+ keys.get(4),
|
|
|
5L,
|
|
|
- ".*?words.+?words.+?words.+?goodbye.*?",
|
|
|
+ keys.get(5),
|
|
|
8000L,
|
|
|
- ".*?words.+?words.+?words.+?hello.*?",
|
|
|
+ keys.get(6),
|
|
|
900L
|
|
|
)
|
|
|
)
|
|
@@ -589,10 +631,6 @@ public class CategorizeBlockHashTests extends BlockHashTestCase {
|
|
|
Releasables.close(() -> Iterators.map(finalOutput.iterator(), (Page p) -> p::releaseBlocks));
|
|
|
}
|
|
|
|
|
|
- private BlockHash.GroupSpec makeGroupSpec() {
|
|
|
- return new BlockHash.GroupSpec(0, ElementType.BYTES_REF, true);
|
|
|
- }
|
|
|
-
|
|
|
private void assertHashState(CategorizeBlockHash hash, boolean withNull, String... expectedKeys) {
|
|
|
// Check the keys
|
|
|
Block[] blocks = null;
|