mcy vor 6 Jahren
Ursprung
Commit
60441d8b31

+ 8 - 3
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/service/RdbSyncService.java

@@ -73,10 +73,10 @@ public class RdbSyncService {
             this.executorThreads = new ExecutorService[this.threads];
             for (int i = 0; i < this.threads; i++) {
                 dmlsPartition[i] = new ArrayList<>();
-                batchExecutors[i] = new BatchExecutor(dataSource.getConnection());
+                batchExecutors[i] = new BatchExecutor(dataSource);
                 executorThreads[i] = Executors.newSingleThreadExecutor();
             }
-        } catch (SQLException e) {
+        } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
@@ -122,6 +122,12 @@ public class RdbSyncService {
                 }
             });
         }
+
+        for (BatchExecutor batchExecutor : batchExecutors) {
+            if (batchExecutor != null) {
+                batchExecutor.close();
+            }
+        }
     }
 
     /**
@@ -475,7 +481,6 @@ public class RdbSyncService {
 
     public void close() {
         for (int i = 0; i < threads; i++) {
-            batchExecutors[i].close();
             executorThreads[i].shutdown();
         }
     }

+ 27 - 10
client-adapter/rdb/src/main/java/com/alibaba/otter/canal/client/adapter/rdb/support/BatchExecutor.java

@@ -9,22 +9,38 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.sql.DataSource;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * sql批量执行器
+ *
+ * @author rewerma 2018-11-7 下午06:45:49
+ * @version 1.0.0
+ */
 public class BatchExecutor implements Closeable {
 
     private static final Logger logger = LoggerFactory.getLogger(BatchExecutor.class);
 
+    private DataSource          dataSource;
     private Connection          conn;
     private AtomicInteger       idx    = new AtomicInteger(0);
 
-    public BatchExecutor(Connection conn) throws SQLException{
-        this.conn = conn;
-        this.conn.setAutoCommit(false);
+    public BatchExecutor(DataSource dataSource){
+        this.dataSource = dataSource;
     }
 
     public Connection getConn() {
+        if (conn == null) {
+            try {
+                conn = dataSource.getConnection();
+                this.conn.setAutoCommit(false);
+            } catch (SQLException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
         return conn;
     }
 
@@ -36,7 +52,7 @@ public class BatchExecutor implements Closeable {
     }
 
     public void execute(String sql, List<Map<String, ?>> values) throws SQLException {
-        PreparedStatement pstmt = conn.prepareStatement(sql);
+        PreparedStatement pstmt = getConn().prepareStatement(sql);
         int len = values.size();
         for (int i = 0; i < len; i++) {
             int type = (Integer) values.get(i).get("type");
@@ -49,7 +65,7 @@ public class BatchExecutor implements Closeable {
     }
 
     public void commit() throws SQLException {
-        conn.commit();
+        getConn().commit();
         if (logger.isTraceEnabled()) {
             logger.trace("Batch executor commit " + idx.get() + " rows");
         }
@@ -57,7 +73,7 @@ public class BatchExecutor implements Closeable {
     }
 
     public void rollback() throws SQLException {
-        conn.rollback();
+        getConn().rollback();
         if (logger.isTraceEnabled()) {
             logger.trace("Batch executor rollback " + idx.get() + " rows");
         }
@@ -68,10 +84,11 @@ public class BatchExecutor implements Closeable {
     public void close() {
         if (conn != null) {
             try {
-                if (conn != null) {
-                    conn.close();
-                }
-            } catch (SQLException ioe) {
+                conn.close();
+            } catch (SQLException e) {
+                logger.error(e.getMessage(), e);
+            } finally {
+                conn = null;
             }
         }
     }