Browse Source

[ML][Transforms] unifying logging, adding some more logging (#45788)

* [ML][Transforms] unifying logging, adding some more logging

* using parameterizedMessage instead of string concat

* fixing bracket closure
Benjamin Trent 6 years ago
parent
commit
36d859fa7c

+ 61 - 38
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.dataframe.transforms;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
@@ -21,7 +22,6 @@ import org.elasticsearch.action.search.SearchAction;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.logging.LoggerMessageFormat;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.unit.TimeValue;
@@ -248,7 +248,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
      * @param listener Started listener
      */
     public synchronized void start(Long startingCheckpoint, boolean force, ActionListener<Response> listener) {
-        logger.debug("[{}] start called with force [{}] and state [{}]", getTransformId(), force, getState());
+        logger.debug("[{}] start called with force [{}] and state [{}].", getTransformId(), force, getState());
         if (taskState.get() == DataFrameTransformTaskState.FAILED && force == false) {
             listener.onFailure(new ElasticsearchStatusException(
                 DataFrameMessages.getMessage(DATA_FRAME_CANNOT_START_FAILED_TRANSFORM,
@@ -290,7 +290,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             null,
             getIndexer().getProgress());
 
-        logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString());
+        logger.info("[{}] updating state for data frame transform to [{}].", transform.getId(), state.toString());
         // Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate
         // This keeps track of STARTED, FAILED, STOPPED
         // This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that
@@ -306,6 +306,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
             },
             exc -> {
+                logger.error(new ParameterizedMessage("[{}] failed updating state to [{}].", getTransformId(), state), exc);
                 getIndexer().stop();
                 listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform ["
                                     + transform.getId() + "] to [" + state.getIndexerState() + "].", exc));
@@ -354,12 +355,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         }
 
         if (getIndexer() == null) {
-            logger.warn("Data frame task [{}] triggered with an unintialized indexer", getTransformId());
+            logger.warn("[{}] data frame task triggered with an unintialized indexer.", getTransformId());
             return;
         }
 
         if (taskState.get() == DataFrameTransformTaskState.FAILED) {
-            logger.debug("Schedule was triggered for transform [{}] but task is failed. Ignoring trigger.", getTransformId());
+            logger.debug("[{}] schedule was triggered for transform but task is failed. Ignoring trigger.", getTransformId());
             return;
         }
 
@@ -368,15 +369,15 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         if (IndexerState.INDEXING.equals(indexerState) ||
             IndexerState.STOPPING.equals(indexerState) ||
             IndexerState.STOPPED.equals(indexerState)) {
-            logger.debug("Indexer for transform [{}] has state [{}], ignoring trigger", getTransformId(), indexerState);
+            logger.debug("[{}] indexer for transform has state [{}]. Ignoring trigger.", getTransformId(), indexerState);
             return;
         }
 
-        logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), indexerState);
+        logger.debug("[{}] data frame indexer schedule has triggered, state: [{}].", event.getJobName(), indexerState);
 
         // if it runs for the 1st time we just do it, if not we check for changes
         if (currentCheckpoint.get() == 0) {
-            logger.debug("Trigger initial run");
+            logger.debug("Trigger initial run.");
             getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis());
         } else if (getIndexer().isContinuous()) {
             getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis());
@@ -407,12 +408,14 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                                     ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
         updatePersistentTaskState(state, ActionListener.wrap(
             success -> {
-                logger.debug("Successfully updated state for data frame transform [{}] to [{}]", transform.getId(), state.toString());
+                logger.debug("[{}] successfully updated state for data frame transform to [{}].", transform.getId(), state.toString());
                 listener.onResponse(success);
             },
             failure -> {
                 auditor.warning(transform.getId(), "Failed to persist to state to cluster state: " + failure.getMessage());
-                logger.error("Failed to update state for data frame transform [" + transform.getId() + "]", failure);
+                logger.error(new ParameterizedMessage("[{}] failed to update cluster state for data frame transform.",
+                    transform.getId()),
+                    failure);
                 listener.onFailure(failure);
             }
         ));
@@ -422,7 +425,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         // If we are already flagged as failed, this probably means that a second trigger started firing while we were attempting to
         // flag the previously triggered indexer as failed. Exit early as we are already flagged as failed.
         if (taskState.get() == DataFrameTransformTaskState.FAILED) {
-            logger.warn("[{}] is already failed but encountered new failure; reason [{}] ", getTransformId(), reason);
+            logger.warn("[{}] is already failed but encountered new failure; reason [{}].", getTransformId(), reason);
             listener.onResponse(null);
             return;
         }
@@ -430,7 +433,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         // the indexer to fail. Since `ClientDataFrameIndexer#doSaveState` will persist the state to the index once the indexer stops,
         // it is probably best to NOT change the internal state of the task and allow the normal stopping logic to continue.
         if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPING) {
-            logger.info("Attempt to fail transform [" + getTransformId() + "] with reason [" + reason + "] while it was stopping.");
+            logger.info("[{}] attempt to fail transform with reason [{}] while it was stopping.", getTransformId(), reason);
             auditor.info(getTransformId(), "Attempted to fail transform with reason [" + reason + "] while in STOPPING state.");
             listener.onResponse(null);
             return;
@@ -438,7 +441,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         // If we are stopped, this means that between the failure occurring and being handled, somebody called stop
         // We should just allow that stop to continue
         if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPED) {
-            logger.info("[{}] encountered a failure but indexer is STOPPED; reason [{}]", getTransformId(), reason);
+            logger.info("[{}] encountered a failure but indexer is STOPPED; reason [{}].", getTransformId(), reason);
             listener.onResponse(null);
             return;
         }
@@ -456,7 +459,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         persistStateToClusterState(newState, ActionListener.wrap(
             r -> listener.onResponse(null),
             e -> {
-                logger.error("Failed to set task state as failed to cluster state", e);
+                logger.error(new ParameterizedMessage("[{}] failed to set task state as failed to cluster state.", getTransformId()),
+                    e);
                 listener.onFailure(e);
             }
         ));
@@ -469,8 +473,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
      */
     @Override
     public synchronized void onCancelled() {
-        logger.info(
-                "Received cancellation request for data frame transform [" + transform.getId() + "], state: [" + taskState.get() + "]");
+        logger.info("[{}] received cancellation request for data frame transform, state: [{}].",
+            getTransformId(),
+            taskState.get());
         if (getIndexer() != null && getIndexer().abort()) {
             // there is no background transform running, we can shutdown safely
             shutdown();
@@ -695,13 +700,15 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                             }
                             TransformProgressGatherer.getInitialProgress(this.client, buildFilterQuery(), getConfig(), ActionListener.wrap(
                                 newProgress -> {
-                                    logger.trace("[{}] reset the progress from [{}] to [{}]", transformId, progress, newProgress);
+                                    logger.trace("[{}] reset the progress from [{}] to [{}].", transformId, progress, newProgress);
                                     progress = newProgress;
                                     super.onStart(now, listener);
                                 },
                                 failure -> {
                                     progress = null;
-                                    logger.warn("Unable to load progress information for task [" + transformId + "]", failure);
+                                    logger.warn(new ParameterizedMessage("[{}] unable to load progress information for task.",
+                                        transformId),
+                                        failure);
                                     super.onStart(now, listener);
                                 }
                             ));
@@ -778,14 +785,14 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         @Override
         public synchronized boolean maybeTriggerAsyncJob(long now) {
             if (transformTask.taskState.get() == DataFrameTransformTaskState.FAILED) {
-                logger.debug("Schedule was triggered for transform [{}] but task is failed. Ignoring trigger.", getJobId());
+                logger.debug("[{}] schedule was triggered for transform but task is failed. Ignoring trigger.", getJobId());
                 return false;
             }
 
             // ignore trigger if indexer is running, prevents log spam in A2P indexer
             IndexerState indexerState = getState();
             if (IndexerState.INDEXING.equals(indexerState) || IndexerState.STOPPING.equals(indexerState)) {
-                logger.debug("Indexer for transform [{}] has state [{}], ignoring trigger", getJobId(), indexerState);
+                logger.debug("[{}] indexer for transform has state [{}]. Ignoring trigger.", getJobId(), indexerState);
                 return false;
             }
 
@@ -876,7 +883,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 indexerState = IndexerState.STOPPED;
 
                 auditor.info(transformConfig.getId(), "Data frame finished indexing all data, initiating stop");
-                logger.info("Data frame [{}] finished indexing all data, initiating stop", transformConfig.getId());
+                logger.info("[{}] data frame transform finished indexing all data, initiating stop.", transformConfig.getId());
             }
 
             final DataFrameTransformState state = new DataFrameTransformState(
@@ -886,7 +893,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 transformTask.currentCheckpoint.get(),
                 transformTask.stateReason.get(),
                 getProgress());
-            logger.debug("Updating persistent state of transform [{}] to [{}]", transformConfig.getId(), state.toString());
+            logger.debug("[{}] updating persistent state of transform to [{}].", transformConfig.getId(), state.toString());
 
             // Persist the current state and stats in the internal index. The interval of this method being
             // called is controlled by AsyncTwoPhaseIndexer#onBulkResponse which calls doSaveState every so
@@ -900,7 +907,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                                     transformTask.shutdown();
                                 }
                                 // Only do this clean up once, if it succeeded, no reason to do the query again.
-                                if (oldStatsCleanedUp.compareAndExchange(false, true) == false) {
+                                if (oldStatsCleanedUp.compareAndSet(false, true)) {
                                     transformsConfigManager.deleteOldTransformStoredDocuments(transformId, ActionListener.wrap(
                                         nil -> {
                                             logger.trace("[{}] deleted old transform stats and state document", transformId);
@@ -918,7 +925,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                                 }
                             },
                             statsExc -> {
-                                logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
+                                logger.error(new ParameterizedMessage("[{}] updating stats of transform failed.",
+                                    transformConfig.getId()),
+                                    statsExc);
                                 auditor.warning(getJobId(),
                                     "Failure updating stats of transform: " + statsExc.getMessage());
                                 // for auto stop shutdown the task
@@ -942,7 +951,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 }
                 handleFailure(exc);
             } catch (Exception e) {
-                logger.error("Data frame transform encountered an unexpected internal exception: " ,e);
+                logger.error(
+                    new ParameterizedMessage("[{}] data frame transform encountered an unexpected internal exception: ", transformId),
+                    e);
             }
         }
 
@@ -967,7 +978,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 if (progress != null && progress.getPercentComplete() != null && progress.getPercentComplete() < 100.0) {
                     progress.incrementDocsProcessed(progress.getTotalDocs() - progress.getDocumentsProcessed());
                 }
-                logger.info("Last checkpoint for {} {}", getJobId(), Strings.toString(lastCheckpoint));
                 // If the last checkpoint is now greater than 1, that means that we have just processed the first
                 // continuous checkpoint and should start recording the exponential averages
                 if (lastCheckpoint != null && lastCheckpoint.getCheckpoint() > 1) {
@@ -987,7 +997,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                         "Finished indexing for data frame transform checkpoint [" + checkpoint + "].");
                 }
                 logger.debug(
-                    "Finished indexing for data frame transform [" + transformTask.getTransformId() + "] checkpoint [" + checkpoint + "]");
+                    "[{}] finished indexing for data frame transform checkpoint [{}].", getJobId(), checkpoint);
                 auditBulkFailures = true;
                 listener.onResponse(null);
             } catch (Exception e) {
@@ -1009,7 +1019,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             if (++logCount % logEvery != 0) {
                 return false;
             }
-            int log10Checkpoint = (int) Math.floor(Math.log10(completedCheckpoint + 1));
+            if (completedCheckpoint == 0) {
+                return true;
+            }
+            int log10Checkpoint = (int) Math.floor(Math.log10(completedCheckpoint));
             logEvery = log10Checkpoint >= 3  ? 1_000 : (int)Math.pow(10.0, log10Checkpoint);
             logCount = 0;
             return true;
@@ -1018,13 +1031,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         @Override
         protected void onStop() {
             auditor.info(transformConfig.getId(), "Data frame transform has stopped.");
-            logger.info("Data frame transform [{}] has stopped", transformConfig.getId());
+            logger.info("[{}] data frame transform has stopped.", transformConfig.getId());
         }
 
         @Override
         protected void onAbort() {
             auditor.info(transformConfig.getId(), "Received abort request, stopping data frame transform.");
-            logger.info("Data frame transform [" + transformConfig.getId() + "] received abort request, stopping indexer");
+            logger.info("[{}] data frame transform received abort request. Stopping indexer.", transformConfig.getId());
             transformTask.shutdown();
         }
 
@@ -1034,11 +1047,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                     checkpoint -> transformsConfigManager.putTransformCheckpoint(checkpoint,
                         ActionListener.wrap(
                             putCheckPointResponse -> listener.onResponse(checkpoint),
-                            createCheckpointException ->
-                                listener.onFailure(new RuntimeException("Failed to create checkpoint", createCheckpointException))
+                            createCheckpointException -> {
+                                logger.warn(new ParameterizedMessage("[{}] failed to create checkpoint.", transformId),
+                                    createCheckpointException);
+                                listener.onFailure(new RuntimeException("Failed to create checkpoint", createCheckpointException));
+                            }
                     )),
-                    getCheckPointException ->
-                        listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException))
+                    getCheckPointException -> {
+                        logger.warn(new ParameterizedMessage("[{}] failed to retrieve checkpoint.", transformId),
+                            getCheckPointException);
+                        listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException));
+                    }
             ));
         }
 
@@ -1047,12 +1066,14 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             checkpointProvider.sourceHasChanged(getLastCheckpoint(),
                 ActionListener.wrap(
                     hasChanged -> {
-                        logger.trace("[{}] change detected [{}]", transformId, hasChanged);
+                        logger.trace("[{}] change detected [{}].", transformId, hasChanged);
                         hasChangedListener.onResponse(hasChanged);
                     },
                     e -> {
                         logger.warn(
-                            "Failed to detect changes for data frame transform [" + transformId + "], skipping update till next check.",
+                            new ParameterizedMessage(
+                                "[{}] failed to detect changes for data frame transform. Skipping update till next check.",
+                                transformId),
                             e);
                         auditor.warning(transformId,
                             "Failed to detect changes for data frame transform, skipping update till next check. Exception: "
@@ -1068,7 +1089,9 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         }
 
         synchronized void handleFailure(Exception e) {
-            logger.warn("Data frame transform [" + transformTask.getTransformId() + "] encountered an exception: ", e);
+            logger.warn(new ParameterizedMessage("[{}] data frame transform encountered an exception: ",
+                transformTask.getTransformId()),
+                e);
             if (handleCircuitBreakingException(e)) {
                 return;
             }
@@ -1083,7 +1106,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
 
         @Override
         protected void failIndexer(String failureMessage) {
-            logger.error("Data frame transform [" + getJobId() + "]: " + failureMessage);
+            logger.error("[{}] transform has failed; experienced: [{}].", getJobId(), failureMessage);
             auditor.error(transformTask.getTransformId(), failureMessage);
             transformTask.markAsFailed(failureMessage, ActionListener.wrap(
                 r -> {

+ 14 - 14
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/ClientDataFrameIndexerTests.java

@@ -74,30 +74,30 @@ public class ClientDataFrameIndexerTests extends ESTestCase {
         // Audit every checkpoint for the first 10
         assertTrue(shouldAudit.get(0));
         assertTrue(shouldAudit.get(1));
-        assertTrue(shouldAudit.get(9));
+        assertTrue(shouldAudit.get(10));
 
         // Then audit every 10 while < 100
-        assertFalse(shouldAudit.get(10));
         assertFalse(shouldAudit.get(11));
-        assertTrue(shouldAudit.get(19));
-        assertTrue(shouldAudit.get(29));
-        assertFalse(shouldAudit.get(30));
-        assertTrue(shouldAudit.get(99));
+        assertTrue(shouldAudit.get(20));
+        assertFalse(shouldAudit.get(29));
+        assertTrue(shouldAudit.get(30));
+        assertFalse(shouldAudit.get(99));
 
         // Then audit every 100 < 1000
-        assertFalse(shouldAudit.get(100));
+        assertTrue(shouldAudit.get(100));
         assertFalse(shouldAudit.get(109));
         assertFalse(shouldAudit.get(110));
-        assertTrue(shouldAudit.get(199));
+        assertFalse(shouldAudit.get(199));
 
         // Then audit every 1000 for the rest of time
-        assertTrue(shouldAudit.get(1999));
+        assertFalse(shouldAudit.get(1999));
         assertFalse(shouldAudit.get(2199));
-        assertTrue(shouldAudit.get(2999));
-        assertTrue(shouldAudit.get(9999));
-        assertTrue(shouldAudit.get(10_999));
-        assertFalse(shouldAudit.get(11_000));
-        assertTrue(shouldAudit.get(11_999));
+        assertTrue(shouldAudit.get(3000));
+        assertTrue(shouldAudit.get(10_000));
+        assertFalse(shouldAudit.get(10_999));
+        assertTrue(shouldAudit.get(11_000));
+        assertFalse(shouldAudit.get(11_001));
+        assertFalse(shouldAudit.get(11_999));
     }
 
 }