Browse Source

[ML] Fix job ID in C++ logs for normalize and memory estimation (#63874)

The changes of #54636 and #60395 were incorrect in their assertion
that "the job ID passed to the process pipes is only used to make
the file names unique".  In fact it is also passed to the C++ log
handler and gets logged with every message logged by the C++
processes.

This PR splits the job ID and unique IDs (added in #54636 and #60395)
so that the correct job ID is passed to the log handler.

Nothing really bad happened as a result of this problem - it was
just cosmetic.
David Roberts 5 years ago
parent
commit
d613a1a1d1

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java

@@ -73,7 +73,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<An
         String jobId = config.getId();
         List<Path> filesToDelete = new ArrayList<>();
         ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, jobId,
-                false, true, true, hasState, config.getAnalysis().persistsState());
+            null, false, true, true, hasState, config.getAnalysis().persistsState());
 
         // The extra 2 are for the checksum and the control field
         int numberOfFields = analyticsProcessConfig.cols() + 2;

+ 4 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java

@@ -62,12 +62,11 @@ public class NativeMemoryUsageEstimationProcessFactory implements AnalyticsProce
             ExecutorService executorService,
             Consumer<String> onProcessCrash) {
         List<Path> filesToDelete = new ArrayList<>();
-        // The config ID passed to the process pipes is only used to make the file names unique.  Since memory estimation can be
-        // called many times in quick succession for the same config the config ID alone is not sufficient to guarantee that the
-        // memory estimation process pipe names are unique.  Therefore an increasing counter value is appended to the config ID
-        // to ensure uniqueness between calls.
+        // Since memory estimation can be called many times in quick succession for the same config the config ID alone is not
+        // sufficient to guarantee that the memory estimation process pipe names are unique.  Therefore an increasing counter
+        // value is passed as well as the config ID to ensure uniqueness between calls.
         ProcessPipes processPipes = new ProcessPipes(
-            env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, config.getId() + "_" + counter.incrementAndGet(),
+            env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, config.getId(), counter.incrementAndGet(),
             false, false, true, false, false);
 
         createNativeProcess(config.getId(), analyticsProcessConfig, filesToDelete, processPipes);

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java

@@ -77,7 +77,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
                                                      Consumer<String> onProcessCrash) {
         List<Path> filesToDelete = new ArrayList<>();
         ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AutodetectBuilder.AUTODETECT,
-            job.getId(), false, true, true, params.modelSnapshot() != null,
+            job.getId(), null, false, true, true, params.modelSnapshot() != null,
             AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING.get(settings) == false);
         createNativeProcess(job, params, processPipes, filesToDelete);
         boolean includeTokensField = MachineLearning.CATEGORIZATION_TOKENIZATION_IN_JAVA

+ 4 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java

@@ -51,11 +51,11 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory
     @Override
     public NormalizerProcess createNormalizerProcess(String jobId, String quantilesState, Integer bucketSpan,
                                                      ExecutorService executorService) {
-        // The job ID passed to the process pipes is only used to make the file names unique.  Since normalize can get run many times
-        // in quick succession for the same job the job ID alone is not sufficient to guarantee that the normalizer process pipe names
-        // are unique.  Therefore an increasing counter value is appended to the job ID to ensure uniqueness between calls.
+        // Since normalize can get run many times in quick succession for the same job the job ID alone is not sufficient to
+        // guarantee that the normalizer process pipe names are unique.  Therefore an increasing counter value is passed as
+        // well as the job ID to ensure uniqueness between calls.
         ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, NormalizerBuilder.NORMALIZE,
-            jobId + "_" + counter.incrementAndGet(), false, true, true, false, false);
+            jobId, counter.incrementAndGet(), false, true, true, false, false);
         createNativeProcess(jobId, quantilesState, processPipes, bucketSpan);
 
         NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, nativeController, processPipes);

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java

@@ -69,7 +69,7 @@ public class NativeController implements MlController {
     NativeController(String localNodeName, Environment env, NamedPipeHelper namedPipeHelper, NamedXContentRegistry xContentRegistry)
         throws IOException {
         this.localNodeName = localNodeName;
-        ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER_CONNECT_TIMEOUT, CONTROLLER, null,
+        ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER_CONNECT_TIMEOUT, CONTROLLER, null, null,
                 true, false, true, false, false);
         processPipes.connectLogStream();
         this.cppLogHandler = processPipes.getLogStreamHandler();

+ 4 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java

@@ -76,7 +76,7 @@ public class ProcessPipes {
      *              May be null or empty for processes not associated with a specific job.
      */
     public ProcessPipes(Environment env, NamedPipeHelper namedPipeHelper, Duration timeout, String processName, String jobId,
-                        boolean wantCommandPipe, boolean wantProcessInPipe, boolean wantProcessOutPipe,
+                        Long uniqueId, boolean wantCommandPipe, boolean wantProcessInPipe, boolean wantProcessOutPipe,
                         boolean wantRestorePipe, boolean wantPersistPipe) {
         this.namedPipeHelper = namedPipeHelper;
         this.jobId = jobId;
@@ -91,6 +91,9 @@ public class ProcessPipes {
         if (!Strings.isNullOrEmpty(jobId)) {
             prefixBuilder.append(jobId).append('_');
         }
+        if (uniqueId != null) {
+            prefixBuilder.append(uniqueId).append('_');
+        }
         String prefix = prefixBuilder.toString();
         String suffix = String.format(Locale.ROOT, "_%d", JvmInfo.jvmInfo().getPid());
         logPipeName = String.format(Locale.ROOT, "%slog%s", prefix, suffix);

+ 3 - 3
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java

@@ -62,7 +62,7 @@ public class ProcessPipesTests extends ESTestCase {
 
         int timeoutSeconds = randomIntBetween(5, 100);
         ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(timeoutSeconds), AutodetectBuilder.AUTODETECT,
-            "my_job", false, true, true, true, true);
+            "my_job", null, false, true, true, true, true);
 
         List<String> command = new ArrayList<>();
         processPipes.addArgs(command);
@@ -110,7 +110,7 @@ public class ProcessPipesTests extends ESTestCase {
         Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
         Environment env = TestEnvironment.newEnvironment(settings);
 
-        new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job",
+        new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job", null,
                 true, true, true, true, true);
     }
 
@@ -138,7 +138,7 @@ public class ProcessPipesTests extends ESTestCase {
         Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
         Environment env = TestEnvironment.newEnvironment(settings);
         ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job",
-                true, true, true, true, true);
+            null, true, true, true, true, true);
 
         processPipes.connectLogStream();
         expectThrows(IOException.class, processPipes::connectOtherStreams);