|
@@ -52,7 +52,6 @@ import org.elasticsearch.common.time.DateFormatter;
|
|
|
import org.elasticsearch.common.util.Maps;
|
|
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
|
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
|
|
-import org.elasticsearch.core.FixForMultiProject;
|
|
|
import org.elasticsearch.core.Predicates;
|
|
|
import org.elasticsearch.core.Strings;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
@@ -2132,19 +2131,22 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
// n.b. this 'pipeline' processor will always run the '_id3' pipeline, see the mocking/plumbing above and below
|
|
|
PutPipelineRequest putRequest2 = putJsonPipelineRequest("_id2", "{\"processors\": [{\"pipeline\" : {}}]}");
|
|
|
PutPipelineRequest putRequest3 = putJsonPipelineRequest("_id3", "{\"processors\": [{\"mock\" : {}}]}");
|
|
|
- @FixForMultiProject(description = "Do not use default project id once stats are project aware")
|
|
|
- var projectId = DEFAULT_PROJECT_ID;
|
|
|
+ var projectId1 = randomProjectIdOrDefault();
|
|
|
+ var projectId2 = randomProjectIdOrDefault();
|
|
|
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
|
|
- .putProjectMetadata(ProjectMetadata.builder(projectId).build())
|
|
|
+ .putProjectMetadata(ProjectMetadata.builder(projectId1).build())
|
|
|
.build();
|
|
|
+ if (projectId2.equals(projectId1) == false) {
|
|
|
+ clusterState = ClusterState.builder(clusterState).putProjectMetadata(ProjectMetadata.builder(projectId2).build()).build();
|
|
|
+ }
|
|
|
ClusterState previousClusterState = clusterState;
|
|
|
- clusterState = executePut(projectId, putRequest1, clusterState);
|
|
|
- clusterState = executePut(projectId, putRequest2, clusterState);
|
|
|
- clusterState = executePut(projectId, putRequest3, clusterState);
|
|
|
+ clusterState = executePut(projectId1, putRequest1, clusterState);
|
|
|
+ clusterState = executePut(projectId1, putRequest2, clusterState);
|
|
|
+ clusterState = executePut(projectId2, putRequest3, clusterState);
|
|
|
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
|
|
|
|
|
// hook up the mock ingest service to return pipeline3 when asked by the pipeline processor
|
|
|
- pipelineToReturn[0] = ingestService.getPipeline(projectId, "_id3");
|
|
|
+ pipelineToReturn[0] = ingestService.getPipeline(projectId2, "_id3");
|
|
|
|
|
|
{
|
|
|
final IngestStats ingestStats = ingestService.stats();
|
|
@@ -2153,13 +2155,13 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
// total
|
|
|
assertStats(ingestStats.totalStats(), 0, 0, 0);
|
|
|
// pipeline
|
|
|
- assertPipelineStats(ingestStats.pipelineStats(), "_id1", 0, 0, 0, 0, 0);
|
|
|
- assertPipelineStats(ingestStats.pipelineStats(), "_id2", 0, 0, 0, 0, 0);
|
|
|
- assertPipelineStats(ingestStats.pipelineStats(), "_id3", 0, 0, 0, 0, 0);
|
|
|
+ assertPipelineStats(ingestStats.pipelineStats(), projectId1, "_id1", 0, 0, 0, 0, 0);
|
|
|
+ assertPipelineStats(ingestStats.pipelineStats(), projectId1, "_id2", 0, 0, 0, 0, 0);
|
|
|
+ assertPipelineStats(ingestStats.pipelineStats(), projectId2, "_id3", 0, 0, 0, 0, 0);
|
|
|
// processor
|
|
|
- assertProcessorStats(0, ingestStats, "_id1", 0, 0, 0);
|
|
|
- assertProcessorStats(0, ingestStats, "_id2", 0, 0, 0);
|
|
|
- assertProcessorStats(0, ingestStats, "_id3", 0, 0, 0);
|
|
|
+ assertProcessorStats(0, ingestStats, projectId1, "_id1", 0, 0, 0);
|
|
|
+ assertProcessorStats(0, ingestStats, projectId1, "_id2", 0, 0, 0);
|
|
|
+ assertProcessorStats(0, ingestStats, projectId2, "_id3", 0, 0, 0);
|
|
|
}
|
|
|
|
|
|
// put a single document through ingest processing
|
|
@@ -2168,7 +2170,7 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10));
|
|
|
var startSize = indexRequest.ramBytesUsed();
|
|
|
ingestService.executeBulkRequest(
|
|
|
- projectId,
|
|
|
+ projectId1,
|
|
|
1,
|
|
|
List.of(indexRequest),
|
|
|
indexReq -> {},
|
|
@@ -2186,13 +2188,13 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
// total
|
|
|
assertStats(ingestStats.totalStats(), 1, 0, 0);
|
|
|
// pipeline
|
|
|
- assertPipelineStats(ingestStats.pipelineStats(), "_id1", 1, 0, 0, startSize, indexRequest.ramBytesUsed());
|
|
|
- assertPipelineStats(ingestStats.pipelineStats(), "_id2", 1, 0, 0, 0, 0);
|
|
|
- assertPipelineStats(ingestStats.pipelineStats(), "_id3", 1, 0, 0, 0, 0);
|
|
|
+ assertPipelineStats(ingestStats.pipelineStats(), projectId1, "_id1", 1, 0, 0, startSize, indexRequest.ramBytesUsed());
|
|
|
+ assertPipelineStats(ingestStats.pipelineStats(), projectId1, "_id2", 1, 0, 0, 0, 0);
|
|
|
+ assertPipelineStats(ingestStats.pipelineStats(), projectId2, "_id3", 1, 0, 0, 0, 0);
|
|
|
// processor
|
|
|
- assertProcessorStats(0, ingestStats, "_id1", 1, 0, 0);
|
|
|
- assertProcessorStats(0, ingestStats, "_id2", 1, 0, 0);
|
|
|
- assertProcessorStats(0, ingestStats, "_id3", 1, 0, 0);
|
|
|
+ assertProcessorStats(0, ingestStats, projectId1, "_id1", 1, 0, 0);
|
|
|
+ assertProcessorStats(0, ingestStats, projectId1, "_id2", 1, 0, 0);
|
|
|
+ assertProcessorStats(0, ingestStats, projectId2, "_id3", 1, 0, 0);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2228,17 +2230,20 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
assertStats(initialStats.totalStats(), 0, 0, 0);
|
|
|
|
|
|
PutPipelineRequest putRequest = putJsonPipelineRequest("_id1", "{\"processors\": [{\"mock\" : {}}]}");
|
|
|
- @FixForMultiProject(description = "Do not use default project id once stats are project aware")
|
|
|
- var projectId = DEFAULT_PROJECT_ID;
|
|
|
+ var projectId1 = randomProjectIdOrDefault();
|
|
|
+ var projectId2 = randomProjectIdOrDefault();
|
|
|
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
|
|
- .putProjectMetadata(ProjectMetadata.builder(projectId).build())
|
|
|
+ .putProjectMetadata(ProjectMetadata.builder(projectId1).build())
|
|
|
.build();
|
|
|
+ if (projectId2.equals(projectId1) == false) {
|
|
|
+ clusterState = ClusterState.builder(clusterState).putProjectMetadata(ProjectMetadata.builder(projectId2).build()).build();
|
|
|
+ }
|
|
|
ClusterState previousClusterState = clusterState;
|
|
|
- clusterState = executePut(projectId, putRequest, clusterState);
|
|
|
+ clusterState = executePut(projectId1, putRequest, clusterState);
|
|
|
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
|
|
putRequest = putJsonPipelineRequest("_id2", "{\"processors\": [{\"mock\" : {}}]}");
|
|
|
previousClusterState = clusterState;
|
|
|
- clusterState = executePut(projectId, putRequest, clusterState);
|
|
|
+ clusterState = executePut(projectId2, putRequest, clusterState);
|
|
|
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
@@ -2251,7 +2256,7 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10));
|
|
|
var startSize1 = indexRequest.ramBytesUsed();
|
|
|
ingestService.executeBulkRequest(
|
|
|
- projectId,
|
|
|
+ projectId1,
|
|
|
1,
|
|
|
List.of(indexRequest),
|
|
|
indexReq -> {},
|
|
@@ -2265,22 +2270,22 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
var endSize1 = indexRequest.ramBytesUsed();
|
|
|
assertThat(afterFirstRequestStats.pipelineStats().size(), equalTo(2));
|
|
|
|
|
|
- afterFirstRequestStats.processorStats().get("_id1").forEach(p -> assertEquals(p.name(), "mock:mockTag"));
|
|
|
- afterFirstRequestStats.processorStats().get("_id2").forEach(p -> assertEquals(p.name(), "mock:mockTag"));
|
|
|
+ afterFirstRequestStats.processorStats().get(projectId1).get("_id1").forEach(p -> assertEquals(p.name(), "mock:mockTag"));
|
|
|
+ afterFirstRequestStats.processorStats().get(projectId2).get("_id2").forEach(p -> assertEquals(p.name(), "mock:mockTag"));
|
|
|
|
|
|
// total
|
|
|
assertStats(afterFirstRequestStats.totalStats(), 1, 0, 0);
|
|
|
// pipeline
|
|
|
- assertPipelineStats(afterFirstRequestStats.pipelineStats(), "_id1", 1, 0, 0, startSize1, endSize1);
|
|
|
- assertPipelineStats(afterFirstRequestStats.pipelineStats(), "_id2", 0, 0, 0, 0, 0);
|
|
|
+ assertPipelineStats(afterFirstRequestStats.pipelineStats(), projectId1, "_id1", 1, 0, 0, startSize1, endSize1);
|
|
|
+ assertPipelineStats(afterFirstRequestStats.pipelineStats(), projectId2, "_id2", 0, 0, 0, 0, 0);
|
|
|
// processor
|
|
|
- assertProcessorStats(0, afterFirstRequestStats, "_id1", 1, 0, 0);
|
|
|
- assertProcessorStats(0, afterFirstRequestStats, "_id2", 0, 0, 0);
|
|
|
+ assertProcessorStats(0, afterFirstRequestStats, projectId1, "_id1", 1, 0, 0);
|
|
|
+ assertProcessorStats(0, afterFirstRequestStats, projectId2, "_id2", 0, 0, 0);
|
|
|
|
|
|
indexRequest.setPipeline("_id2");
|
|
|
var startSize2 = indexRequest.ramBytesUsed();
|
|
|
ingestService.executeBulkRequest(
|
|
|
- projectId,
|
|
|
+ projectId2,
|
|
|
1,
|
|
|
List.of(indexRequest),
|
|
|
indexReq -> {},
|
|
@@ -2296,21 +2301,21 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
// total
|
|
|
assertStats(afterSecondRequestStats.totalStats(), 2, 0, 0);
|
|
|
// pipeline
|
|
|
- assertPipelineStats(afterSecondRequestStats.pipelineStats(), "_id1", 1, 0, 0, startSize1, endSize1);
|
|
|
- assertPipelineStats(afterSecondRequestStats.pipelineStats(), "_id2", 1, 0, 0, startSize2, endSize2);
|
|
|
+ assertPipelineStats(afterSecondRequestStats.pipelineStats(), projectId1, "_id1", 1, 0, 0, startSize1, endSize1);
|
|
|
+ assertPipelineStats(afterSecondRequestStats.pipelineStats(), projectId2, "_id2", 1, 0, 0, startSize2, endSize2);
|
|
|
// processor
|
|
|
- assertProcessorStats(0, afterSecondRequestStats, "_id1", 1, 0, 0);
|
|
|
- assertProcessorStats(0, afterSecondRequestStats, "_id2", 1, 0, 0);
|
|
|
+ assertProcessorStats(0, afterSecondRequestStats, projectId1, "_id1", 1, 0, 0);
|
|
|
+ assertProcessorStats(0, afterSecondRequestStats, projectId2, "_id2", 1, 0, 0);
|
|
|
|
|
|
// update cluster state and ensure that new stats are added to old stats
|
|
|
putRequest = putJsonPipelineRequest("_id1", "{\"processors\": [{\"mock\" : {}}, {\"mock\" : {}}]}");
|
|
|
previousClusterState = clusterState;
|
|
|
- clusterState = executePut(projectId, putRequest, clusterState);
|
|
|
+ clusterState = executePut(projectId1, putRequest, clusterState);
|
|
|
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
|
|
indexRequest.setPipeline("_id1");
|
|
|
startSize1 += indexRequest.ramBytesUsed();
|
|
|
ingestService.executeBulkRequest(
|
|
|
- projectId,
|
|
|
+ projectId1,
|
|
|
1,
|
|
|
List.of(indexRequest),
|
|
|
indexReq -> {},
|
|
@@ -2326,26 +2331,26 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
// total
|
|
|
assertStats(afterThirdRequestStats.totalStats(), 3, 0, 0);
|
|
|
// pipeline
|
|
|
- assertPipelineStats(afterThirdRequestStats.pipelineStats(), "_id1", 2, 0, 0, startSize1, endSize1);
|
|
|
- assertPipelineStats(afterThirdRequestStats.pipelineStats(), "_id2", 1, 0, 0, startSize2, endSize2);
|
|
|
+ assertPipelineStats(afterThirdRequestStats.pipelineStats(), projectId1, "_id1", 2, 0, 0, startSize1, endSize1);
|
|
|
+ assertPipelineStats(afterThirdRequestStats.pipelineStats(), projectId2, "_id2", 1, 0, 0, startSize2, endSize2);
|
|
|
// The number of processors for the "id1" pipeline changed, so the per-processor metrics are not carried forward. This is
|
|
|
// due to the parallel array's used to identify which metrics to carry forward. Without unique ids or semantic equals for each
|
|
|
// processor, parallel arrays are the best option for of carrying forward metrics between pipeline changes. However, in some cases,
|
|
|
// like this one it may not be readily obvious why the metrics were not carried forward.
|
|
|
- assertProcessorStats(0, afterThirdRequestStats, "_id1", 1, 0, 0);
|
|
|
- assertProcessorStats(1, afterThirdRequestStats, "_id1", 1, 0, 0);
|
|
|
- assertProcessorStats(0, afterThirdRequestStats, "_id2", 1, 0, 0);
|
|
|
+ assertProcessorStats(0, afterThirdRequestStats, projectId1, "_id1", 1, 0, 0);
|
|
|
+ assertProcessorStats(1, afterThirdRequestStats, projectId1, "_id1", 1, 0, 0);
|
|
|
+ assertProcessorStats(0, afterThirdRequestStats, projectId2, "_id2", 1, 0, 0);
|
|
|
|
|
|
// test a failure, and that the processor stats are added from the old stats
|
|
|
putRequest = putJsonPipelineRequest("_id1", """
|
|
|
{"processors": [{"failure-mock" : { "on_failure": [{"mock" : {}}]}}, {"mock" : {}}]}""");
|
|
|
previousClusterState = clusterState;
|
|
|
- clusterState = executePut(projectId, putRequest, clusterState);
|
|
|
+ clusterState = executePut(projectId1, putRequest, clusterState);
|
|
|
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
|
|
indexRequest.setPipeline("_id1");
|
|
|
startSize1 += indexRequest.ramBytesUsed();
|
|
|
ingestService.executeBulkRequest(
|
|
|
- projectId,
|
|
|
+ projectId1,
|
|
|
1,
|
|
|
List.of(indexRequest),
|
|
|
indexReq -> {},
|
|
@@ -2361,22 +2366,22 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
// total
|
|
|
assertStats(afterForthRequestStats.totalStats(), 4, 0, 0);
|
|
|
// pipeline
|
|
|
- assertPipelineStats(afterForthRequestStats.pipelineStats(), "_id1", 3, 0, 0, startSize1, endSize1);
|
|
|
- assertPipelineStats(afterForthRequestStats.pipelineStats(), "_id2", 1, 0, 0, startSize2, endSize2);
|
|
|
+ assertPipelineStats(afterForthRequestStats.pipelineStats(), projectId1, "_id1", 3, 0, 0, startSize1, endSize1);
|
|
|
+ assertPipelineStats(afterForthRequestStats.pipelineStats(), projectId2, "_id2", 1, 0, 0, startSize2, endSize2);
|
|
|
// processor
|
|
|
- assertProcessorStats(0, afterForthRequestStats, "_id1", 1, 1, 0); // not carried forward since type changed
|
|
|
- assertProcessorStats(1, afterForthRequestStats, "_id1", 2, 0, 0); // carried forward and added from old stats
|
|
|
- assertProcessorStats(0, afterForthRequestStats, "_id2", 1, 0, 0);
|
|
|
+ assertProcessorStats(0, afterForthRequestStats, projectId1, "_id1", 1, 1, 0); // not carried forward since type changed
|
|
|
+ assertProcessorStats(1, afterForthRequestStats, projectId1, "_id1", 2, 0, 0); // carried forward and added from old stats
|
|
|
+ assertProcessorStats(0, afterForthRequestStats, projectId2, "_id2", 1, 0, 0);
|
|
|
|
|
|
// test with drop processor
|
|
|
putRequest = putJsonPipelineRequest("_id3", "{\"processors\": [{\"drop\" : {}}]}");
|
|
|
previousClusterState = clusterState;
|
|
|
- clusterState = executePut(projectId, putRequest, clusterState);
|
|
|
+ clusterState = executePut(projectId1, putRequest, clusterState);
|
|
|
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
|
|
|
indexRequest.setPipeline("_id3");
|
|
|
long startSize3 = indexRequest.ramBytesUsed();
|
|
|
ingestService.executeBulkRequest(
|
|
|
- projectId,
|
|
|
+ projectId1,
|
|
|
1,
|
|
|
List.of(indexRequest),
|
|
|
indexReq -> {},
|
|
@@ -2391,13 +2396,13 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
// total
|
|
|
assertStats(afterFifthRequestStats.totalStats(), 5, 0, 0);
|
|
|
// pipeline
|
|
|
- assertPipelineStats(afterFifthRequestStats.pipelineStats(), "_id1", 3, 0, 0, startSize1, endSize1);
|
|
|
- assertPipelineStats(afterFifthRequestStats.pipelineStats(), "_id2", 1, 0, 0, startSize2, endSize2);
|
|
|
- assertPipelineStats(afterFifthRequestStats.pipelineStats(), "_id3", 1, 0, 0, startSize3, 0);
|
|
|
+ assertPipelineStats(afterFifthRequestStats.pipelineStats(), projectId1, "_id1", 3, 0, 0, startSize1, endSize1);
|
|
|
+ assertPipelineStats(afterFifthRequestStats.pipelineStats(), projectId2, "_id2", 1, 0, 0, startSize2, endSize2);
|
|
|
+ assertPipelineStats(afterFifthRequestStats.pipelineStats(), projectId1, "_id3", 1, 0, 0, startSize3, 0);
|
|
|
// processor
|
|
|
- assertProcessorStats(0, afterFifthRequestStats, "_id1", 1, 1, 0);
|
|
|
- assertProcessorStats(1, afterFifthRequestStats, "_id1", 2, 0, 0);
|
|
|
- assertProcessorStats(0, afterFifthRequestStats, "_id2", 1, 0, 0);
|
|
|
+ assertProcessorStats(0, afterFifthRequestStats, projectId1, "_id1", 1, 1, 0);
|
|
|
+ assertProcessorStats(1, afterFifthRequestStats, projectId1, "_id1", 2, 0, 0);
|
|
|
+ assertProcessorStats(0, afterFifthRequestStats, projectId2, "_id2", 1, 0, 0);
|
|
|
}
|
|
|
|
|
|
public void testStatName() {
|
|
@@ -3405,12 +3410,21 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void assertProcessorStats(int processor, IngestStats stats, String pipelineId, long count, long failed, long time) {
|
|
|
- assertStats(stats.processorStats().get(pipelineId).get(processor).stats(), count, failed, time);
|
|
|
+ private void assertProcessorStats(
|
|
|
+ int processor,
|
|
|
+ IngestStats stats,
|
|
|
+ ProjectId projectId,
|
|
|
+ String pipelineId,
|
|
|
+ long count,
|
|
|
+ long failed,
|
|
|
+ long time
|
|
|
+ ) {
|
|
|
+ assertStats(stats.processorStats().get(projectId).get(pipelineId).get(processor).stats(), count, failed, time);
|
|
|
}
|
|
|
|
|
|
private void assertPipelineStats(
|
|
|
List<IngestStats.PipelineStat> pipelineStats,
|
|
|
+ ProjectId projectId,
|
|
|
String pipelineId,
|
|
|
long count,
|
|
|
long failed,
|
|
@@ -3418,7 +3432,7 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
long ingested,
|
|
|
long produced
|
|
|
) {
|
|
|
- var pipeline = getPipeline(pipelineStats, pipelineId);
|
|
|
+ var pipeline = getPipeline(pipelineStats, projectId, pipelineId);
|
|
|
assertStats(pipeline.stats(), count, failed, time);
|
|
|
assertByteStats(pipeline.byteStats(), ingested, produced);
|
|
|
}
|
|
@@ -3435,8 +3449,8 @@ public class IngestServiceTests extends ESTestCase {
|
|
|
assertThat(byteStats.bytesProduced(), equalTo(produced));
|
|
|
}
|
|
|
|
|
|
- private IngestStats.PipelineStat getPipeline(List<IngestStats.PipelineStat> pipelineStats, String id) {
|
|
|
- return pipelineStats.stream().filter(p1 -> p1.pipelineId().equals(id)).findFirst().orElse(null);
|
|
|
+ private IngestStats.PipelineStat getPipeline(List<IngestStats.PipelineStat> pipelineStats, ProjectId projectId, String id) {
|
|
|
+ return pipelineStats.stream().filter(p1 -> p1.projectId().equals(projectId) && p1.pipelineId().equals(id)).findFirst().orElse(null);
|
|
|
}
|
|
|
|
|
|
private static List<IngestService.PipelineClusterStateUpdateTask> oneTask(ProjectId projectId, DeletePipelineRequest request) {
|