|
@@ -1,5 +1,17 @@
|
|
|
package com.alibaba.otter.canal.client.adapter.phoenix.service;
|
|
|
|
|
|
+import java.sql.*;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
+
|
|
|
+import javax.sql.DataSource;
|
|
|
+
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
import com.alibaba.otter.canal.client.adapter.phoenix.PhoenixAdapter;
|
|
|
import com.alibaba.otter.canal.client.adapter.phoenix.config.MappingConfig;
|
|
|
import com.alibaba.otter.canal.client.adapter.phoenix.config.MappingConfig.DbMapping;
|
|
@@ -10,16 +22,6 @@ import com.alibaba.otter.canal.client.adapter.support.DatasourceConfig;
|
|
|
import com.alibaba.otter.canal.client.adapter.support.EtlResult;
|
|
|
import com.alibaba.otter.canal.client.adapter.support.Util;
|
|
|
import com.google.common.base.Joiner;
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
-
|
|
|
-import javax.sql.DataSource;
|
|
|
-import java.sql.*;
|
|
|
-import java.util.*;
|
|
|
-import java.util.concurrent.ExecutorService;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
-import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
/**
|
|
|
* Phoenix ETL 操作业务类
|
|
@@ -41,16 +43,17 @@ public class PhoenixEtlService {
|
|
|
if (srcDataSource == null) {
|
|
|
return false;
|
|
|
}
|
|
|
- try {
|
|
|
- return syncSchema(srcDataSource.getConnection(), targetDSConnection, config);
|
|
|
+
|
|
|
+ try (Connection conn = srcDataSource.getConnection()) {
|
|
|
+ return syncSchema(conn, targetDSConnection, config);
|
|
|
} catch (SQLException e) {
|
|
|
throw new RuntimeException(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private static boolean syncSchema(DataSource srcDS,Connection targetDSConnection, MappingConfig config) {
|
|
|
- try {
|
|
|
- return syncSchema(srcDS.getConnection(),targetDSConnection, config);
|
|
|
+ try (Connection conn = srcDS.getConnection()) {
|
|
|
+ return syncSchema(conn, targetDSConnection, config);
|
|
|
} catch (SQLException e) {
|
|
|
throw new RuntimeException(e);
|
|
|
}
|