|
@@ -233,9 +233,10 @@ public final class JobModelSnapshotUpgrader {
|
|
|
return;
|
|
|
}
|
|
|
submitOperation(() -> {
|
|
|
+ writeHeader();
|
|
|
String flushId = process.flushJob(FlushJobParams.builder().waitForNormalization(false).build());
|
|
|
return waitFlushToCompletion(flushId);
|
|
|
- }, (aVoid, e) -> {
|
|
|
+ }, (flushAcknowledgement, e) -> {
|
|
|
Runnable nextStep;
|
|
|
if (e != null) {
|
|
|
logger.error(
|
|
@@ -250,6 +251,12 @@ public final class JobModelSnapshotUpgrader {
|
|
|
ActionListener.wrap(t -> shutdown(e), f -> shutdown(e))
|
|
|
);
|
|
|
} else {
|
|
|
+ logger.debug(() -> new ParameterizedMessage(
|
|
|
+ "[{}] [{}] flush [{}] acknowledged requesting state write",
|
|
|
+ jobId,
|
|
|
+ snapshotId,
|
|
|
+ flushAcknowledgement.getId()
|
|
|
+ ));
|
|
|
nextStep = this::requestStateWrite;
|
|
|
}
|
|
|
threadPool.executor(UTILITY_THREAD_POOL_NAME).execute(nextStep);
|
|
@@ -267,7 +274,6 @@ public final class JobModelSnapshotUpgrader {
|
|
|
}
|
|
|
submitOperation(
|
|
|
() -> {
|
|
|
- writeHeader();
|
|
|
process.persistState(
|
|
|
params.modelSnapshot().getTimestamp().getTime(),
|
|
|
params.modelSnapshot().getSnapshotId(),
|