|
@@ -564,7 +564,10 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
protected void afterFinishOrFailure() {
|
|
protected void afterFinishOrFailure() {
|
|
|
- finishIndexerThreadShutdown();
|
|
|
|
|
|
|
+ finishIndexerThreadShutdown(() -> {
|
|
|
|
|
+ auditor.info(transformConfig.getId(), "Transform has stopped.");
|
|
|
|
|
+ logger.info("[{}] transform has stopped.", transformConfig.getId());
|
|
|
|
|
+ });
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -646,12 +649,6 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- @Override
|
|
|
|
|
- protected void onStop() {
|
|
|
|
|
- auditor.info(transformConfig.getId(), "Transform has stopped.");
|
|
|
|
|
- logger.info("[{}] transform has stopped.", transformConfig.getId());
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
@Override
|
|
@Override
|
|
|
protected void onAbort() {
|
|
protected void onAbort() {
|
|
|
auditor.info(transformConfig.getId(), "Received abort request, stopping transform.");
|
|
auditor.info(transformConfig.getId(), "Received abort request, stopping transform.");
|
|
@@ -1203,7 +1200,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- private void finishIndexerThreadShutdown() {
|
|
|
|
|
|
|
+ private void finishIndexerThreadShutdown(Runnable next) {
|
|
|
synchronized (context) {
|
|
synchronized (context) {
|
|
|
indexerThreadShuttingDown = false;
|
|
indexerThreadShuttingDown = false;
|
|
|
if (saveStateRequestedDuringIndexerThreadShutdown) {
|
|
if (saveStateRequestedDuringIndexerThreadShutdown) {
|
|
@@ -1212,7 +1209,9 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
|
|
|
if (context.shouldStopAtCheckpoint() && nextCheckpoint == null) {
|
|
if (context.shouldStopAtCheckpoint() && nextCheckpoint == null) {
|
|
|
stop();
|
|
stop();
|
|
|
}
|
|
}
|
|
|
- doSaveState(getState(), getPosition(), () -> {});
|
|
|
|
|
|
|
+ doSaveState(getState(), getPosition(), next);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ next.run();
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|