|
|
@@ -444,7 +444,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
|
|
private final DataFrameTransformsCheckpointService transformsCheckpointService;
|
|
|
private final String transformId;
|
|
|
private final DataFrameTransformTask transformTask;
|
|
|
- private volatile DataFrameIndexerTransformStats previouslyPersistedStats = null;
|
|
|
private final AtomicInteger failureCount;
|
|
|
// Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
|
|
|
private volatile String lastAuditedExceptionMessage = null;
|
|
|
@@ -552,25 +551,18 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
|
|
// only every-so-often when doing the bulk indexing calls. See AsyncTwoPhaseIndexer#onBulkResponse for current periodicity
|
|
|
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> updateClusterStateListener = ActionListener.wrap(
|
|
|
task -> {
|
|
|
- // Only persist the stats if something has actually changed
|
|
|
- if (previouslyPersistedStats == null || previouslyPersistedStats.equals(getStats()) == false) {
|
|
|
- transformsConfigManager.putOrUpdateTransformStats(
|
|
|
- new DataFrameTransformStateAndStats(transformId, state, getStats(),
|
|
|
- DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
|
|
|
+ transformsConfigManager.putOrUpdateTransformStats(
|
|
|
+ new DataFrameTransformStateAndStats(transformId, state, getStats(),
|
|
|
+ DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
|
|
|
ActionListener.wrap(
|
|
|
- r -> {
|
|
|
- previouslyPersistedStats = getStats();
|
|
|
- next.run();
|
|
|
- },
|
|
|
- statsExc -> {
|
|
|
- logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
|
|
|
- next.run();
|
|
|
- }
|
|
|
+ r -> {
|
|
|
+ next.run();
|
|
|
+ },
|
|
|
+ statsExc -> {
|
|
|
+ logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
|
|
|
+ next.run();
|
|
|
+ }
|
|
|
));
|
|
|
- // The stats that we have previously written to the doc is the same as as it is now, no need to update it
|
|
|
- } else {
|
|
|
- next.run();
|
|
|
- }
|
|
|
},
|
|
|
exc -> {
|
|
|
logger.error("Updating persistent state of transform [" + transformConfig.getId() + "] failed", exc);
|