|
@@ -10,7 +10,6 @@ package org.elasticsearch.xpack.transform.transforms.scheduling;
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
-import org.elasticsearch.core.Strings;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
import org.elasticsearch.threadpool.Scheduler;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
@@ -24,6 +23,8 @@ import java.util.List;
|
|
|
import java.util.Objects;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
+import static org.elasticsearch.core.Strings.format;
|
|
|
+
|
|
|
/**
|
|
|
* {@link TransformScheduler} class is responsible for scheduling transform tasks according to their configured frequency as well as
|
|
|
* retrying policy.
|
|
@@ -115,10 +116,7 @@ public final class TransformScheduler {
|
|
|
if (isTraceEnabled) {
|
|
|
Instant processingFinished = clock.instant();
|
|
|
logger.trace(
|
|
|
- Strings.format(
|
|
|
- "Processing scheduled tasks finished, took {}ms",
|
|
|
- Duration.between(processingStarted, processingFinished).toMillis()
|
|
|
- )
|
|
|
+ format("Processing scheduled tasks finished, took %dms", Duration.between(processingStarted, processingFinished).toMillis())
|
|
|
);
|
|
|
}
|
|
|
if (taskWasProcessed == false) {
|
|
@@ -152,8 +150,8 @@ public final class TransformScheduler {
|
|
|
scheduledTasks.update(scheduledTask.getTransformId(), task -> {
|
|
|
if (task.equals(scheduledTask) == false) {
|
|
|
logger.debug(
|
|
|
- () -> Strings.format(
|
|
|
- "[{}] task object got modified while processing. Expected: {}, was: {}",
|
|
|
+ () -> format(
|
|
|
+ "[%s] task object got modified while processing. Expected: %s, was: %s",
|
|
|
scheduledTask.getTransformId(),
|
|
|
scheduledTask,
|
|
|
task
|
|
@@ -191,7 +189,7 @@ public final class TransformScheduler {
|
|
|
*/
|
|
|
public void registerTransform(TransformTaskParams transformTaskParams, Listener listener) {
|
|
|
String transformId = transformTaskParams.getId();
|
|
|
- logger.trace(() -> Strings.format("[{}] register the transform", transformId));
|
|
|
+ logger.trace(() -> format("[%s] register the transform", transformId));
|
|
|
long currentTimeMillis = clock.millis();
|
|
|
TransformScheduledTask transformScheduledTask = new TransformScheduledTask(
|
|
|
transformId,
|
|
@@ -214,7 +212,7 @@ public final class TransformScheduler {
|
|
|
* @param failureCount new value of transform task's failure count
|
|
|
*/
|
|
|
public void handleTransformFailureCountChanged(String transformId, int failureCount) {
|
|
|
- logger.trace(() -> Strings.format("[{}] handle transform failure count change to {}", transformId, failureCount));
|
|
|
+ logger.trace(() -> format("[%s] handle transform failure count change to %d", transformId, failureCount));
|
|
|
// Update the task's failure count (next_scheduled_time gets automatically re-calculated)
|
|
|
scheduledTasks.update(
|
|
|
transformId,
|
|
@@ -235,7 +233,7 @@ public final class TransformScheduler {
|
|
|
*/
|
|
|
public void deregisterTransform(String transformId) {
|
|
|
Objects.requireNonNull(transformId);
|
|
|
- logger.trace(() -> Strings.format("[{}] de-register the transform", transformId));
|
|
|
+ logger.trace(() -> format("[%s] de-register the transform", transformId));
|
|
|
scheduledTasks.remove(transformId);
|
|
|
}
|
|
|
|