|  | @@ -8,8 +8,10 @@
 | 
	
		
			
				|  |  |   */
 | 
	
		
			
				|  |  |  package org.elasticsearch.logstashbridge.ingest;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +import org.elasticsearch.cluster.metadata.ProjectId;
 | 
	
		
			
				|  |  |  import org.elasticsearch.core.FixForMultiProject;
 | 
	
		
			
				|  |  |  import org.elasticsearch.core.TimeValue;
 | 
	
		
			
				|  |  | +import org.elasticsearch.ingest.IngestDocument;
 | 
	
		
			
				|  |  |  import org.elasticsearch.ingest.IngestService;
 | 
	
		
			
				|  |  |  import org.elasticsearch.ingest.Processor;
 | 
	
		
			
				|  |  |  import org.elasticsearch.logstashbridge.StableBridgeAPI;
 | 
	
	
		
			
				|  | @@ -20,7 +22,11 @@ import org.elasticsearch.logstashbridge.threadpool.ThreadPoolBridge;
 | 
	
		
			
				|  |  |  import java.util.Map;
 | 
	
		
			
				|  |  |  import java.util.function.BiConsumer;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +/**
 | 
	
		
			
				|  |  | + * An external bridge for {@link Processor}
 | 
	
		
			
				|  |  | + */
 | 
	
		
			
				|  |  |  public interface ProcessorBridge extends StableBridgeAPI<Processor> {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      String getType();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      String getTag();
 | 
	
	
		
			
				|  | @@ -29,48 +35,108 @@ public interface ProcessorBridge extends StableBridgeAPI<Processor> {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      boolean isAsync();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    void execute(IngestDocumentBridge ingestDocumentBridge, BiConsumer<IngestDocumentBridge, Exception> handler) throws Exception;
 | 
	
		
			
				|  |  | +    void execute(IngestDocumentBridge ingestDocumentBridge, BiConsumer<IngestDocumentBridge, Exception> handler);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    static ProcessorBridge fromInternal(final Processor internalProcessor) {
 | 
	
		
			
				|  |  | +        if (internalProcessor instanceof AbstractExternal.ProxyExternal externalProxy) {
 | 
	
		
			
				|  |  | +            return externalProxy.getProcessorBridge();
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        return new ProxyInternal(internalProcessor);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * The {@code ProcessorBridge.AbstractExternal} is an abstract base class for implementing
 | 
	
		
			
				|  |  | +     * the {@link ProcessorBridge} externally to the Elasticsearch code-base. It takes care of
 | 
	
		
			
				|  |  | +     * the details of maintaining a singular internal-form implementation of {@link Processor}
 | 
	
		
			
				|  |  | +     * that proxies calls through the external implementation.
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    abstract class AbstractExternal implements ProcessorBridge {
 | 
	
		
			
				|  |  | +        private ProxyExternal internalProcessor;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        public Processor toInternal() {
 | 
	
		
			
				|  |  | +            if (internalProcessor == null) {
 | 
	
		
			
				|  |  | +                internalProcessor = new ProxyExternal();
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +            return internalProcessor;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        private class ProxyExternal implements Processor {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    static ProcessorBridge wrap(final Processor delegate) {
 | 
	
		
			
				|  |  | -        return new Wrapped(delegate);
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public String getType() {
 | 
	
		
			
				|  |  | +                return AbstractExternal.this.getType();
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public String getTag() {
 | 
	
		
			
				|  |  | +                return AbstractExternal.this.getTag();
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public String getDescription() {
 | 
	
		
			
				|  |  | +                return AbstractExternal.this.getDescription();
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
 | 
	
		
			
				|  |  | +                AbstractExternal.this.execute(
 | 
	
		
			
				|  |  | +                    IngestDocumentBridge.fromInternalNullable(ingestDocument),
 | 
	
		
			
				|  |  | +                    (idb, e) -> handler.accept(idb.toInternal(), e)
 | 
	
		
			
				|  |  | +                );
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public boolean isAsync() {
 | 
	
		
			
				|  |  | +                return AbstractExternal.this.isAsync();
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            private AbstractExternal getProcessorBridge() {
 | 
	
		
			
				|  |  | +                return AbstractExternal.this;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    class Wrapped extends StableBridgeAPI.Proxy<Processor> implements ProcessorBridge {
 | 
	
		
			
				|  |  | -        public Wrapped(final Processor delegate) {
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * An implementation of {@link ProcessorBridge} that proxies to an internal {@link Processor}
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    class ProxyInternal extends StableBridgeAPI.ProxyInternal<Processor> implements ProcessorBridge {
 | 
	
		
			
				|  |  | +        public ProxyInternal(final Processor delegate) {
 | 
	
		
			
				|  |  |              super(delegate);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          @Override
 | 
	
		
			
				|  |  |          public String getType() {
 | 
	
		
			
				|  |  | -            return unwrap().getType();
 | 
	
		
			
				|  |  | +            return toInternal().getType();
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          @Override
 | 
	
		
			
				|  |  |          public String getTag() {
 | 
	
		
			
				|  |  | -            return unwrap().getTag();
 | 
	
		
			
				|  |  | +            return toInternal().getTag();
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          @Override
 | 
	
		
			
				|  |  |          public String getDescription() {
 | 
	
		
			
				|  |  | -            return unwrap().getDescription();
 | 
	
		
			
				|  |  | +            return toInternal().getDescription();
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          @Override
 | 
	
		
			
				|  |  |          public boolean isAsync() {
 | 
	
		
			
				|  |  | -            return unwrap().isAsync();
 | 
	
		
			
				|  |  | +            return toInternal().isAsync();
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          @Override
 | 
	
		
			
				|  |  | -        public void execute(final IngestDocumentBridge ingestDocumentBridge, final BiConsumer<IngestDocumentBridge, Exception> handler)
 | 
	
		
			
				|  |  | -            throws Exception {
 | 
	
		
			
				|  |  | -            delegate.execute(
 | 
	
		
			
				|  |  | -                StableBridgeAPI.unwrapNullable(ingestDocumentBridge),
 | 
	
		
			
				|  |  | -                (id, e) -> handler.accept(IngestDocumentBridge.wrap(id), e)
 | 
	
		
			
				|  |  | +        public void execute(final IngestDocumentBridge ingestDocumentBridge, final BiConsumer<IngestDocumentBridge, Exception> handler) {
 | 
	
		
			
				|  |  | +            internalDelegate.execute(
 | 
	
		
			
				|  |  | +                StableBridgeAPI.toInternalNullable(ingestDocumentBridge),
 | 
	
		
			
				|  |  | +                (id, e) -> handler.accept(IngestDocumentBridge.fromInternalNullable(id), e)
 | 
	
		
			
				|  |  |              );
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    class Parameters extends StableBridgeAPI.Proxy<Processor.Parameters> {
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * An external bridge for {@link Processor.Parameters}
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    class Parameters extends StableBridgeAPI.ProxyInternal<Processor.Parameters> {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          public Parameters(
 | 
	
		
			
				|  |  |              final EnvironmentBridge environmentBridge,
 | 
	
	
		
			
				|  | @@ -79,17 +145,17 @@ public interface ProcessorBridge extends StableBridgeAPI<Processor> {
 | 
	
		
			
				|  |  |          ) {
 | 
	
		
			
				|  |  |              this(
 | 
	
		
			
				|  |  |                  new Processor.Parameters(
 | 
	
		
			
				|  |  | -                    environmentBridge.unwrap(),
 | 
	
		
			
				|  |  | -                    scriptServiceBridge.unwrap(),
 | 
	
		
			
				|  |  | +                    environmentBridge.toInternal(),
 | 
	
		
			
				|  |  | +                    scriptServiceBridge.toInternal(),
 | 
	
		
			
				|  |  |                      null,
 | 
	
		
			
				|  |  | -                    threadPoolBridge.unwrap().getThreadContext(),
 | 
	
		
			
				|  |  | -                    threadPoolBridge.unwrap()::relativeTimeInMillis,
 | 
	
		
			
				|  |  | -                    (delay, command) -> threadPoolBridge.unwrap()
 | 
	
		
			
				|  |  | -                        .schedule(command, TimeValue.timeValueMillis(delay), threadPoolBridge.unwrap().generic()),
 | 
	
		
			
				|  |  | +                    threadPoolBridge.toInternal().getThreadContext(),
 | 
	
		
			
				|  |  | +                    threadPoolBridge.toInternal()::relativeTimeInMillis,
 | 
	
		
			
				|  |  | +                    (delay, command) -> threadPoolBridge.toInternal()
 | 
	
		
			
				|  |  | +                        .schedule(command, TimeValue.timeValueMillis(delay), threadPoolBridge.toInternal().generic()),
 | 
	
		
			
				|  |  |                      null,
 | 
	
		
			
				|  |  |                      null,
 | 
	
		
			
				|  |  | -                    threadPoolBridge.unwrap().generic()::execute,
 | 
	
		
			
				|  |  | -                    IngestService.createGrokThreadWatchdog(environmentBridge.unwrap(), threadPoolBridge.unwrap())
 | 
	
		
			
				|  |  | +                    threadPoolBridge.toInternal().generic()::execute,
 | 
	
		
			
				|  |  | +                    IngestService.createGrokThreadWatchdog(environmentBridge.toInternal(), threadPoolBridge.toInternal())
 | 
	
		
			
				|  |  |                  )
 | 
	
		
			
				|  |  |              );
 | 
	
		
			
				|  |  |          }
 | 
	
	
		
			
				|  | @@ -99,11 +165,14 @@ public interface ProcessorBridge extends StableBridgeAPI<Processor> {
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          @Override
 | 
	
		
			
				|  |  | -        public Processor.Parameters unwrap() {
 | 
	
		
			
				|  |  | -            return this.delegate;
 | 
	
		
			
				|  |  | +        public Processor.Parameters toInternal() {
 | 
	
		
			
				|  |  | +            return this.internalDelegate;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * An external bridge for {@link Processor.Factory}
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  |      interface Factory extends StableBridgeAPI<Processor.Factory> {
 | 
	
		
			
				|  |  |          ProcessorBridge create(
 | 
	
		
			
				|  |  |              Map<String, ProcessorBridge.Factory> registry,
 | 
	
	
		
			
				|  | @@ -112,23 +181,26 @@ public interface ProcessorBridge extends StableBridgeAPI<Processor> {
 | 
	
		
			
				|  |  |              Map<String, Object> config
 | 
	
		
			
				|  |  |          ) throws Exception;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        static Factory wrap(final Processor.Factory delegate) {
 | 
	
		
			
				|  |  | -            return new Wrapped(delegate);
 | 
	
		
			
				|  |  | +        static Factory fromInternal(final Processor.Factory delegate) {
 | 
	
		
			
				|  |  | +            return new ProxyInternal(delegate);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          @Override
 | 
	
		
			
				|  |  | -        default Processor.Factory unwrap() {
 | 
	
		
			
				|  |  | +        default Processor.Factory toInternal() {
 | 
	
		
			
				|  |  |              final Factory stableAPIFactory = this;
 | 
	
		
			
				|  |  |              return (registry, tag, description, config, projectId) -> stableAPIFactory.create(
 | 
	
		
			
				|  |  | -                StableBridgeAPI.wrap(registry, Factory::wrap),
 | 
	
		
			
				|  |  | +                StableBridgeAPI.fromInternal(registry, Factory::fromInternal),
 | 
	
		
			
				|  |  |                  tag,
 | 
	
		
			
				|  |  |                  description,
 | 
	
		
			
				|  |  |                  config
 | 
	
		
			
				|  |  | -            ).unwrap();
 | 
	
		
			
				|  |  | +            ).toInternal();
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        class Wrapped extends StableBridgeAPI.Proxy<Processor.Factory> implements Factory {
 | 
	
		
			
				|  |  | -            private Wrapped(final Processor.Factory delegate) {
 | 
	
		
			
				|  |  | +        /**
 | 
	
		
			
				|  |  | +         * An implementation of {@link ProcessorBridge.Factory} that proxies to an internal {@link Processor.Factory}
 | 
	
		
			
				|  |  | +         */
 | 
	
		
			
				|  |  | +        class ProxyInternal extends StableBridgeAPI.ProxyInternal<Processor.Factory> implements Factory {
 | 
	
		
			
				|  |  | +            private ProxyInternal(final Processor.Factory delegate) {
 | 
	
		
			
				|  |  |                  super(delegate);
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -140,14 +212,14 @@ public interface ProcessorBridge extends StableBridgeAPI<Processor> {
 | 
	
		
			
				|  |  |                  final String description,
 | 
	
		
			
				|  |  |                  final Map<String, Object> config
 | 
	
		
			
				|  |  |              ) throws Exception {
 | 
	
		
			
				|  |  | -                return ProcessorBridge.wrap(
 | 
	
		
			
				|  |  | -                    this.delegate.create(StableBridgeAPI.unwrap(registry), processorTag, description, config, null)
 | 
	
		
			
				|  |  | +                return ProcessorBridge.fromInternal(
 | 
	
		
			
				|  |  | +                    this.internalDelegate.create(StableBridgeAPI.toInternal(registry), processorTag, description, config, ProjectId.DEFAULT)
 | 
	
		
			
				|  |  |                  );
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              @Override
 | 
	
		
			
				|  |  | -            public Processor.Factory unwrap() {
 | 
	
		
			
				|  |  | -                return this.delegate;
 | 
	
		
			
				|  |  | +            public Processor.Factory toInternal() {
 | 
	
		
			
				|  |  | +                return this.internalDelegate;
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 |