Răsfoiți Sursa

[ML][Data Frame] adjust onFinish audit frequency (#44450)

Benjamin Trent 6 ani în urmă
părinte
comite
f131560aa8

+ 23 - 3
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -482,8 +482,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
 
     static class ClientDataFrameIndexer extends DataFrameIndexer {
 
-        private static final int ON_FINISH_AUDIT_FREQUENCY = 1000;
-
+        private long logEvery = 1;
+        private long logCount = 0;
         private final Client client;
         private final DataFrameTransformsConfigManager transformsConfigManager;
         private final DataFrameTransformsCheckpointService transformsCheckpointService;
@@ -711,7 +711,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 nextCheckpoint = null;
                 // Reset our failure count as we have finished and may start again with a new checkpoint
                 failureCount.set(0);
-                if (checkpoint % ON_FINISH_AUDIT_FREQUENCY == 0) {
+                if (shouldAuditOnFinish(checkpoint)) {
                     auditor.info(transformTask.getTransformId(),
                         "Finished indexing for data frame transform checkpoint [" + checkpoint + "].");
                 }
@@ -724,6 +724,26 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             }
         }
 
+        /**
+         * Indicates if an audit message should be written when onFinish is called for the given checkpoint
+         * We audit the first checkpoint, and then every 10 checkpoints until completedCheckpoint == 99
+         * Then we audit every 100, until completedCheckpoint == 999
+         *
+         * Then we always audit every 1_000 checkpoints
+         *
+         * @param completedCheckpoint The checkpoint that was just completed
+         * @return {@code true} if an audit message should be written
+         */
+        protected boolean shouldAuditOnFinish(long completedCheckpoint) {
+            if (++logCount % logEvery != 0) {
+                return false;
+            }
+            int log10Checkpoint = (int) Math.floor(Math.log10(completedCheckpoint + 1));
+            logEvery = log10Checkpoint >= 3  ? 1_000 : (int)Math.pow(10.0, log10Checkpoint);
+            logCount = 0;
+            return true;
+        }
+
         @Override
         protected void onStop() {
             auditor.info(transformConfig.getId(), "Data frame transform has stopped.");

+ 103 - 0
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/ClientDataFrameIndexerTests.java

@@ -0,0 +1,103 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.dataframe.transforms;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
+import org.elasticsearch.xpack.core.indexing.IndexerState;
+import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
+import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
+import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
+import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
+
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ClientDataFrameIndexerTests extends ESTestCase {
+
+    public void testAudiOnFinishFrequency() {
+        ThreadPool threadPool = mock(ThreadPool.class);
+        when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class));
+        DataFrameTransformTask parentTask = new DataFrameTransformTask(1,
+            "dataframe",
+            "ptask",
+            new TaskId("dataframe:1"),
+            mock(DataFrameTransform.class),
+            null,
+            mock(SchedulerEngine.class),
+            mock(DataFrameAuditor.class),
+            threadPool,
+            Collections.emptyMap());
+        DataFrameTransformTask.ClientDataFrameIndexer indexer = new DataFrameTransformTask.ClientDataFrameIndexer(randomAlphaOfLength(10),
+            mock(DataFrameTransformsConfigManager.class),
+            mock(DataFrameTransformsCheckpointService.class),
+            new AtomicReference<>(IndexerState.STOPPED),
+            null,
+            mock(Client.class),
+            mock(DataFrameAuditor.class),
+            mock(DataFrameIndexerTransformStats.class),
+            mock(DataFrameTransformConfig.class),
+            Collections.emptyMap(),
+            null,
+            new DataFrameTransformCheckpoint("transform",
+                Instant.now().toEpochMilli(),
+                0L,
+                Collections.emptyMap(),
+                Instant.now().toEpochMilli()),
+            new DataFrameTransformCheckpoint("transform",
+                Instant.now().toEpochMilli(),
+                2L,
+                Collections.emptyMap(),
+                Instant.now().toEpochMilli()),
+            parentTask);
+
+        List<Boolean> shouldAudit = IntStream.range(0, 100_000).boxed().map(indexer::shouldAuditOnFinish).collect(Collectors.toList());
+
+        // Audit every checkpoint for the first 10
+        assertTrue(shouldAudit.get(0));
+        assertTrue(shouldAudit.get(1));
+        assertTrue(shouldAudit.get(9));
+
+        // Then audit every 10 while < 100
+        assertFalse(shouldAudit.get(10));
+        assertFalse(shouldAudit.get(11));
+        assertTrue(shouldAudit.get(19));
+        assertTrue(shouldAudit.get(29));
+        assertFalse(shouldAudit.get(30));
+        assertTrue(shouldAudit.get(99));
+
+        // Then audit every 100 < 1000
+        assertFalse(shouldAudit.get(100));
+        assertFalse(shouldAudit.get(109));
+        assertFalse(shouldAudit.get(110));
+        assertTrue(shouldAudit.get(199));
+
+        // Then audit every 1000 for the rest of time
+        assertTrue(shouldAudit.get(1999));
+        assertFalse(shouldAudit.get(2199));
+        assertTrue(shouldAudit.get(2999));
+        assertTrue(shouldAudit.get(9999));
+        assertTrue(shouldAudit.get(10_999));
+        assertFalse(shouldAudit.get(11_000));
+        assertTrue(shouldAudit.get(11_999));
+    }
+
+}