Selaa lähdekoodia

adapter 分组间线程统一抛异常

mcy 6 vuotta sitten
vanhempi
commit
7ea1468a8d

+ 20 - 14
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -73,16 +73,19 @@ public abstract class AbstractCanalAdapterWorker {
 
             // 等待所有适配器写入完成
             // 由于是组间并发操作,所以将阻塞直到耗时最久的工作组操作完成
-            futures.forEach(future -> {
+            RuntimeException exception = null;
+            for (Future<Boolean> future : futures) {
                 try {
                     if (!future.get()) {
-                        throw new RuntimeException("Outer adapter sync failed! ");
+                        exception = new RuntimeException("Outer adapter sync failed! ");
                     }
                 } catch (Exception e) {
-                    future.cancel(true);
-                    throw new RuntimeException(e);
+                    exception = new RuntimeException(e);
                 }
-            });
+            }
+            if (exception != null) {
+                throw exception;
+            }
         });
     }
 
@@ -113,22 +116,25 @@ public abstract class AbstractCanalAdapterWorker {
 
             // 等待所有适配器写入完成
             // 由于是组间并发操作,所以将阻塞直到耗时最久的工作组操作完成
-            futures.forEach(future -> {
+            RuntimeException exception = null;
+            for (Future<Boolean> future : futures) {
                 try {
                     if (!future.get()) {
-                        throw new RuntimeException("Outer adapter sync failed! ");
+                        exception = new RuntimeException("Outer adapter sync failed! ");
                     }
                 } catch (Exception e) {
-                    future.cancel(true);
-                    throw new RuntimeException(e);
+                    exception = new RuntimeException(e);
                 }
-            });
+            }
+            if (exception != null) {
+                throw exception;
+            }
         });
     }
 
     @SuppressWarnings("unchecked")
-    protected boolean mqWriteOutData(int retry, long timeout, int i, final boolean flatMessage, CanalMQConnector connector,
-                                  ExecutorService workerExecutor) {
+    protected boolean mqWriteOutData(int retry, long timeout, int i, final boolean flatMessage,
+                                     CanalMQConnector connector, ExecutorService workerExecutor) {
         try {
             List<?> messages;
             if (!flatMessage) {
@@ -157,7 +163,7 @@ public abstract class AbstractCanalAdapterWorker {
                 }
                 connector.ack();
             }
-           return true;
+            return true;
         } catch (Throwable e) {
             if (i == retry - 1) {
                 connector.ack();
@@ -173,7 +179,7 @@ public abstract class AbstractCanalAdapterWorker {
                 // ignore
             }
         }
-        return  false;
+        return false;
     }
 
     /**