|
|
@@ -207,7 +207,21 @@ public class IngestService implements ClusterStateApplier {
|
|
|
*/
|
|
|
public void putPipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineRequest request,
|
|
|
ActionListener<AcknowledgedResponse> listener) throws Exception {
|
|
|
- put(clusterService, ingestInfos, request, listener);
|
|
|
+ // validates the pipeline and processor configuration before submitting a cluster update task:
|
|
|
+ validatePipeline(ingestInfos, request);
|
|
|
+ clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(),
|
|
|
+ new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected AcknowledgedResponse newResponse(boolean acknowledged) {
|
|
|
+ return new AcknowledgedResponse(acknowledged);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ClusterState execute(ClusterState currentState) {
|
|
|
+ return innerPut(request, currentState);
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -280,28 +294,6 @@ public class IngestService implements ClusterStateApplier {
|
|
|
return newState.build();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Stores the specified pipeline definition in the request.
|
|
|
- */
|
|
|
- public void put(ClusterService clusterService, Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineRequest request,
|
|
|
- ActionListener<AcknowledgedResponse> listener) throws Exception {
|
|
|
- // validates the pipeline and processor configuration before submitting a cluster update task:
|
|
|
- validatePipeline(ingestInfos, request);
|
|
|
- clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(),
|
|
|
- new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
|
|
|
-
|
|
|
- @Override
|
|
|
- protected AcknowledgedResponse newResponse(boolean acknowledged) {
|
|
|
- return new AcknowledgedResponse(acknowledged);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public ClusterState execute(ClusterState currentState) {
|
|
|
- return innerPut(request, currentState);
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineRequest request) throws Exception {
|
|
|
if (ingestInfos.isEmpty()) {
|
|
|
throw new IllegalStateException("Ingest info is empty");
|