|
@@ -76,16 +76,19 @@ public abstract class AbstractCanalAdapterWorker {
|
|
|
|
|
|
// 等待所有适配器写入完成
|
|
|
// 由于是组间并发操作,所以将阻塞直到耗时最久的工作组操作完成
|
|
|
- futures.forEach(future -> {
|
|
|
+ RuntimeException exception = null;
|
|
|
+ for (Future<Boolean> future : futures) {
|
|
|
try {
|
|
|
if (!future.get()) {
|
|
|
- throw new RuntimeException("Outer adapter sync failed! ");
|
|
|
+ exception = new RuntimeException("Outer adapter sync failed! ");
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- future.cancel(true);
|
|
|
- throw new RuntimeException(e);
|
|
|
+ exception = new RuntimeException(e);
|
|
|
}
|
|
|
- });
|
|
|
+ }
|
|
|
+ if (exception != null) {
|
|
|
+ throw exception;
|
|
|
+ }
|
|
|
});
|
|
|
}
|
|
|
|
|
@@ -116,16 +119,19 @@ public abstract class AbstractCanalAdapterWorker {
|
|
|
|
|
|
// 等待所有适配器写入完成
|
|
|
// 由于是组间并发操作,所以将阻塞直到耗时最久的工作组操作完成
|
|
|
- futures.forEach(future -> {
|
|
|
+ RuntimeException exception = null;
|
|
|
+ for (Future<Boolean> future : futures) {
|
|
|
try {
|
|
|
if (!future.get()) {
|
|
|
- throw new RuntimeException("Outer adapter sync failed! ");
|
|
|
+ exception = new RuntimeException("Outer adapter sync failed! ");
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- future.cancel(true);
|
|
|
- throw new RuntimeException(e);
|
|
|
+ exception = new RuntimeException(e);
|
|
|
}
|
|
|
- });
|
|
|
+ }
|
|
|
+ if (exception != null) {
|
|
|
+ throw exception;
|
|
|
+ }
|
|
|
});
|
|
|
}
|
|
|
|