Selaa lähdekoodia

[ML] Use unique named pipes for data frame analytics (#70918)

When a data frame analytics job is stopped because of
a call to the _stop API, if the process is running it is killed.
Depending on the OS, it may take some time to delete all the
used named pipes. This means that in a scenario where the job
is restarted immediately after it is possible that the old
named pipes are used which results to the new process not
properly communicating with java.

This has been the underlying issue of #70698 and #67581.

This commit fixes it by using unique identifiers for the named
pipes.

Closes #70698
Dimitris Athanasiou 4 vuotta sitten
vanhempi
commit
e083862572

+ 7 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java

@@ -32,6 +32,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 import java.util.Objects;
 import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Consumer;
 
 
 public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<AnalyticsResult> {
 public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<AnalyticsResult> {
@@ -45,6 +46,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<An
     private final NamedXContentRegistry namedXContentRegistry;
     private final NamedXContentRegistry namedXContentRegistry;
     private final ResultsPersisterService resultsPersisterService;
     private final ResultsPersisterService resultsPersisterService;
     private final DataFrameAnalyticsAuditor auditor;
     private final DataFrameAnalyticsAuditor auditor;
+    private final AtomicLong counter;
     private volatile Duration processConnectTimeout;
     private volatile Duration processConnectTimeout;
 
 
     public NativeAnalyticsProcessFactory(Environment env,
     public NativeAnalyticsProcessFactory(Environment env,
@@ -58,6 +60,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<An
         this.namedXContentRegistry = Objects.requireNonNull(namedXContentRegistry);
         this.namedXContentRegistry = Objects.requireNonNull(namedXContentRegistry);
         this.auditor = auditor;
         this.auditor = auditor;
         this.resultsPersisterService = resultsPersisterService;
         this.resultsPersisterService = resultsPersisterService;
+        this.counter = new AtomicLong(0);
         setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings()));
         setProcessConnectTimeout(MachineLearning.PROCESS_CONNECT_TIMEOUT.get(env.settings()));
         clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
         clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.PROCESS_CONNECT_TIMEOUT,
             this::setProcessConnectTimeout);
             this::setProcessConnectTimeout);
@@ -73,8 +76,11 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<An
                                                          Consumer<String> onProcessCrash) {
                                                          Consumer<String> onProcessCrash) {
         String jobId = config.getId();
         String jobId = config.getId();
         List<Path> filesToDelete = new ArrayList<>();
         List<Path> filesToDelete = new ArrayList<>();
+        // When the stop API is called the process is killed. As it may take some time for the OS (especially Windows)
+        // to delete the named pipes, we use a unique identifier to avoid reusing an older named pipe if the task
+        // gets restarted immediately after stopping.
         ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, jobId,
         ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, jobId,
-            null, false, true, true, hasState, config.getAnalysis().persistsState());
+            counter.incrementAndGet(), false, true, true, hasState, config.getAnalysis().persistsState());
 
 
         // The extra 2 are for the checksum and the control field
         // The extra 2 are for the checksum and the control field
         int numberOfFields = analyticsProcessConfig.cols() + 2;
         int numberOfFields = analyticsProcessConfig.cols() + 2;