|
@@ -13,26 +13,19 @@ import org.springframework.util.CollectionUtils;
|
|
|
|
|
|
import com.alibaba.otter.canal.client.CanalConnector;
|
|
|
import com.alibaba.otter.canal.protocol.CanalEntry;
|
|
|
-import com.alibaba.otter.canal.protocol.CanalEntry.Column;
|
|
|
-import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
|
|
|
-import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
|
|
|
-import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
|
|
|
-import com.alibaba.otter.canal.protocol.CanalEntry.Pair;
|
|
|
-import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
|
|
|
-import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
|
|
|
-import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
|
|
|
-import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;
|
|
|
import com.alibaba.otter.canal.protocol.Message;
|
|
|
+import com.alibaba.otter.canal.protocol.CanalEntry.*;
|
|
|
import com.google.protobuf.InvalidProtocolBufferException;
|
|
|
|
|
|
public class BaseCanalClientTest {
|
|
|
|
|
|
- protected final static Logger logger = LoggerFactory.getLogger(AbstractCanalClientTest.class);
|
|
|
+ protected final static Logger logger = LoggerFactory
|
|
|
+ .getLogger(AbstractCanalClientTest.class);
|
|
|
protected static final String SEP = SystemUtils.LINE_SEPARATOR;
|
|
|
protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
|
|
|
protected volatile boolean running = false;
|
|
|
protected Thread.UncaughtExceptionHandler handler = (t, e) -> logger.error("parse events has an error",
|
|
|
- e);
|
|
|
+ e);
|
|
|
protected Thread thread = null;
|
|
|
protected CanalConnector connector;
|
|
|
protected static String context_format = null;
|
|
@@ -51,8 +44,7 @@ public class BaseCanalClientTest {
|
|
|
+ "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , gtid : ({}) , delay : {} ms"
|
|
|
+ SEP;
|
|
|
|
|
|
- transaction_format = SEP
|
|
|
- + "================> binlog[{}:{}] , executeTime : {}({}) , gtid : ({}) , delay : {}ms"
|
|
|
+ transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {}({}) , gtid : ({}) , delay : {}ms"
|
|
|
+ SEP;
|
|
|
|
|
|
}
|
|
@@ -71,8 +63,8 @@ public class BaseCanalClientTest {
|
|
|
}
|
|
|
|
|
|
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
|
|
|
- logger.info(context_format, new Object[] { batchId, size, memsize, format.format(new Date()), startPosition,
|
|
|
- endPosition });
|
|
|
+ logger.info(context_format,
|
|
|
+ new Object[] { batchId, size, memsize, format.format(new Date()), startPosition, endPosition });
|
|
|
}
|
|
|
|
|
|
protected String buildPositionForDump(Entry entry) {
|
|
@@ -94,7 +86,8 @@ public class BaseCanalClientTest {
|
|
|
Date date = new Date(entry.getHeader().getExecuteTime());
|
|
|
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
|
|
|
|
- if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
|
|
|
+ if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
|
|
|
+ || entry.getEntryType() == EntryType.TRANSACTIONEND) {
|
|
|
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
|
|
|
TransactionBegin begin = null;
|
|
|
try {
|
|
@@ -105,9 +98,10 @@ public class BaseCanalClientTest {
|
|
|
// 打印事务头信息,执行的线程id,事务耗时
|
|
|
logger.info(transaction_format,
|
|
|
new Object[] { entry.getHeader().getLogfileName(),
|
|
|
- String.valueOf(entry.getHeader().getLogfileOffset()),
|
|
|
- String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
|
|
|
- entry.getHeader().getGtid(), String.valueOf(delayTime) });
|
|
|
+ String.valueOf(entry.getHeader().getLogfileOffset()),
|
|
|
+ String.valueOf(entry.getHeader().getExecuteTime()),
|
|
|
+ simpleDateFormat.format(date), entry.getHeader().getGtid(),
|
|
|
+ String.valueOf(delayTime) });
|
|
|
logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
|
|
|
printXAInfo(begin.getPropsList());
|
|
|
} else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
|
|
@@ -123,9 +117,10 @@ public class BaseCanalClientTest {
|
|
|
printXAInfo(end.getPropsList());
|
|
|
logger.info(transaction_format,
|
|
|
new Object[] { entry.getHeader().getLogfileName(),
|
|
|
- String.valueOf(entry.getHeader().getLogfileOffset()),
|
|
|
- String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
|
|
|
- entry.getHeader().getGtid(), String.valueOf(delayTime) });
|
|
|
+ String.valueOf(entry.getHeader().getLogfileOffset()),
|
|
|
+ String.valueOf(entry.getHeader().getExecuteTime()),
|
|
|
+ simpleDateFormat.format(date), entry.getHeader().getGtid(),
|
|
|
+ String.valueOf(delayTime) });
|
|
|
}
|
|
|
|
|
|
continue;
|
|
@@ -143,10 +138,10 @@ public class BaseCanalClientTest {
|
|
|
|
|
|
logger.info(row_format,
|
|
|
new Object[] { entry.getHeader().getLogfileName(),
|
|
|
- String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
|
|
|
- entry.getHeader().getTableName(), eventType,
|
|
|
- String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
|
|
|
- entry.getHeader().getGtid(), String.valueOf(delayTime) });
|
|
|
+ String.valueOf(entry.getHeader().getLogfileOffset()),
|
|
|
+ entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType,
|
|
|
+ String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
|
|
|
+ entry.getHeader().getGtid(), String.valueOf(delayTime) });
|
|
|
|
|
|
if (eventType == EventType.QUERY || rowChange.getIsDdl()) {
|
|
|
logger.info("ddl : " + rowChange.getIsDdl() + " , sql ----> " + rowChange.getSql() + SEP);
|
|
@@ -174,8 +169,8 @@ public class BaseCanalClientTest {
|
|
|
if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB")
|
|
|
|| StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) {
|
|
|
// get value bytes
|
|
|
- builder.append(column.getName() + " : "
|
|
|
- + new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));
|
|
|
+ builder.append(
|
|
|
+ column.getName() + " : " + new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));
|
|
|
} else {
|
|
|
builder.append(column.getName() + " : " + column.getValue());
|
|
|
}
|