Browse Source

fixed issue #126, support gtid

七锋 7 years ago
parent
commit
3c913d1e22

+ 3 - 3
dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/GtidLogEvent.java

@@ -1,11 +1,11 @@
 package com.taobao.tddl.dbsync.binlog.event;
 
-import com.taobao.tddl.dbsync.binlog.LogBuffer;
-import com.taobao.tddl.dbsync.binlog.LogEvent;
-
 import java.nio.ByteBuffer;
 import java.util.UUID;
 
+import com.taobao.tddl.dbsync.binlog.LogBuffer;
+import com.taobao.tddl.dbsync.binlog.LogEvent;
+
 /**
  * @author jianghang 2013-4-8 上午12:36:29
  * @version 1.0.3

+ 3 - 2
deployer/src/main/resources/example/instance.properties

@@ -1,15 +1,16 @@
 #################################################
 ## mysql serverId
 canal.instance.mysql.slaveId=0
+
 # position info
 canal.instance.master.address=127.0.0.1:3306
+# enable gtid use true/false
+canal.instance.gtidon=false
 canal.instance.master.journal.name=
 canal.instance.master.position=
 canal.instance.master.timestamp=
 canal.instance.master.gtid=
 
-canal.instance.gtidon=true
-
 # table meta tsdb info
 canal.instance.tsdb.enable=true
 canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}

+ 5 - 0
deployer/src/main/resources/spring/default-instance.xml

@@ -166,6 +166,7 @@
 				<property name="journalName" value="${canal.instance.master.journal.name}" />
 				<property name="position" value="${canal.instance.master.position}" />
 				<property name="timestamp" value="${canal.instance.master.timestamp}" />
+				<property name="gtid" value="${canal.instance.master.gtid}" />
 			</bean>
 		</property>
 		<property name="standbyPosition">
@@ -173,6 +174,7 @@
 				<property name="journalName" value="${canal.instance.standby.journal.name}" />
 				<property name="position" value="${canal.instance.standby.position}" />
 				<property name="timestamp" value="${canal.instance.standby.timestamp}" />
+				<property name="gtid" value="${canal.instance.standby.gtid}" />
 			</bean>
 		</property>
 		<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
@@ -187,5 +189,8 @@
 		<!--表结构相关-->
 		<property name="enableTsdb" value="${canal.instance.tsdb.enable:true}"/>
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
+		
+		<!--是否启用GTID模式-->
+		<property name="isGTIDMode" value="${canal.instance.gtidon}"/>
 	</bean>
 </beans>

+ 4 - 0
deployer/src/main/resources/spring/group-instance.xml

@@ -148,6 +148,7 @@
 				<property name="journalName" value="${canal.instance.master1.journal.name}" />
 				<property name="position" value="${canal.instance.master1.position}" />
 				<property name="timestamp" value="${canal.instance.master1.timestamp}" />
+				<property name="gtid" value="${canal.instance.master1.gtid}" />
 			</bean>
 		</property>
 		<property name="standbyPosition">
@@ -155,6 +156,7 @@
 				<property name="journalName" value="${canal.instance.standby1.journal.name}" />
 				<property name="position" value="${canal.instance.standby1.position}" />
 				<property name="timestamp" value="${canal.instance.standby1.timestamp}" />
+				<property name="gtid" value="${canal.instance.standby1.gtid}" />
 			</bean>
 		</property>
 		<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
@@ -239,6 +241,7 @@
 				<property name="journalName" value="${canal.instance.master2.journal.name}" />
 				<property name="position" value="${canal.instance.master2.position}" />
 				<property name="timestamp" value="${canal.instance.master2.timestamp}" />
+				<property name="gtid" value="${canal.instance.master2.gtid}" />
 			</bean>
 		</property>
 		<property name="standbyPosition">
@@ -246,6 +249,7 @@
 				<property name="journalName" value="${canal.instance.standby2.journal.name}" />
 				<property name="position" value="${canal.instance.standby2.position}" />
 				<property name="timestamp" value="${canal.instance.standby2.timestamp}" />
+				<property name="gtid" value="${canal.instance.standby2.gtid}" />
 			</bean>
 		</property>
 		<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />

+ 5 - 0
deployer/src/main/resources/spring/memory-instance.xml

@@ -139,6 +139,7 @@
 				<property name="journalName" value="${canal.instance.master.journal.name}" />
 				<property name="position" value="${canal.instance.master.position}" />
 				<property name="timestamp" value="${canal.instance.master.timestamp}" />
+				<property name="gtid" value="${canal.instance.master.gtid}" />
 			</bean>
 		</property>
 		<property name="standbyPosition">
@@ -146,6 +147,7 @@
 				<property name="journalName" value="${canal.instance.standby.journal.name}" />
 				<property name="position" value="${canal.instance.standby.position}" />
 				<property name="timestamp" value="${canal.instance.standby.timestamp}" />
+				<property name="gtid" value="${canal.instance.standby.gtid}" />
 			</bean>
 		</property>
 		<property name="filterQueryDml" value="${canal.instance.filter.query.dml:false}" />
@@ -160,5 +162,8 @@
 		<!--表结构相关-->
 		<property name="enableTsdb" value="${canal.instance.tsdb.enable:false}"/>
 		<property name="tsdbSpringXml" value="${canal.instance.tsdb.spring.xml:}"/>
+		
+		<!--是否启用GTID模式-->
+		<property name="isGTIDMode" value="${canal.instance.gtidon}"/>
 	</bean>
 </beans>

+ 14 - 7
example/src/main/java/com/alibaba/otter/canal/example/AbstractCanalClientTest.java

@@ -4,6 +4,7 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.SystemUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,10 +59,12 @@ public class AbstractCanalClientTest {
         context_format += "****************************************************" + SEP;
 
         row_format = SEP
-                     + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , delay : {} ms"
+                     + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , gtid : ({}) , delay : {} ms"
                      + SEP;
 
-        transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {}({}) , delay : {}ms" + SEP;
+        transaction_format = SEP
+                             + "================> binlog[{}:{}] , executeTime : {}({}) , gtid : ({}) , delay : {}ms"
+                             + SEP;
 
     }
 
@@ -166,8 +169,12 @@ public class AbstractCanalClientTest {
         long time = entry.getHeader().getExecuteTime();
         Date date = new Date(time);
         SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
-        return entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":"
-               + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";
+        String position = entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":"
+                          + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";
+        if (StringUtils.isNotEmpty(entry.getHeader().getGtid())) {
+            position += " gtid(" + entry.getHeader().getGtid() + ")";
+        }
+        return position;
     }
 
     protected void printEntry(List<Entry> entrys) {
@@ -190,7 +197,7 @@ public class AbstractCanalClientTest {
                         new Object[] { entry.getHeader().getLogfileName(),
                                 String.valueOf(entry.getHeader().getLogfileOffset()),
                                 String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
-                                String.valueOf(delayTime) });
+                                entry.getHeader().getGtid(), String.valueOf(delayTime) });
                     logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
                 } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
                     TransactionEnd end = null;
@@ -206,7 +213,7 @@ public class AbstractCanalClientTest {
                         new Object[] { entry.getHeader().getLogfileName(),
                                 String.valueOf(entry.getHeader().getLogfileOffset()),
                                 String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
-                                String.valueOf(delayTime) });
+                                entry.getHeader().getGtid(), String.valueOf(delayTime) });
                 }
 
                 continue;
@@ -227,7 +234,7 @@ public class AbstractCanalClientTest {
                             String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
                             entry.getHeader().getTableName(), eventType,
                             String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
-                            String.valueOf(delayTime) });
+                            entry.getHeader().getGtid(), String.valueOf(delayTime) });
 
                 if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
                     logger.info(" sql ----> " + rowChage.getSql() + SEP);

+ 5 - 4
meta/src/main/java/com/alibaba/otter/canal/meta/FileMixedMetaManager.java

@@ -109,10 +109,11 @@ public class FileMixedMetaManager extends MemoryMetaManager implements CanalMeta
                         // 定时将内存中的最新值刷到file中,多次变更只刷一次
                         if (logger.isInfoEnabled()) {
                             LogPosition cursor = (LogPosition) getCursor(clientIdentity);
-                            logger.info("clientId:{} cursor:[{},{},{}] address[{}]",
-                                new Object[] { clientIdentity.getClientId(), cursor.getPostion().getJournalName(),
-                                        cursor.getPostion().getPosition(), cursor.getPostion().getTimestamp(),
-                                        cursor.getIdentity().getSourceAddress().toString() });
+                            logger.info("clientId:{} cursor:[{},{},{},{},{}] address[{}]", new Object[] {
+                                    clientIdentity.getClientId(), cursor.getPostion().getJournalName(),
+                                    cursor.getPostion().getPosition(), cursor.getPostion().getTimestamp(),
+                                    cursor.getPostion().getServerId(), cursor.getPostion().getGtid(),
+                                    cursor.getIdentity().getSourceAddress().toString() });
                         }
                         flushDataToFile(clientIdentity.getDestination());
                         updateCursorTasks.remove(clientIdentity);

+ 1 - 12
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.java

@@ -8,8 +8,6 @@ import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
-import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.lang.math.RandomUtils;
@@ -21,6 +19,7 @@ import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.common.alarm.CanalAlarmHandler;
 import com.alibaba.otter.canal.filter.CanalEventFilter;
 import com.alibaba.otter.canal.parse.CanalEventParser;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException;
 import com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer.TransactionFlushCallback;
@@ -90,7 +89,6 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
     protected Throwable                              exception                  = null;
 
     protected boolean                                isGTIDMode                 = false; // 是否是GTID模式
-    protected String                                 GTIDSetStr                 = null;
 
     protected abstract BinlogParser buildParser();
 
@@ -550,13 +548,4 @@ public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle
     public void setIsGTIDMode(boolean isGTIDMode) {
         this.isGTIDMode = isGTIDMode;
     }
-
-    public String getGTIDSetStr() {
-        return GTIDSetStr;
-    }
-
-    public void setGTIDSetStr(String GTIDSetStr) {
-        this.GTIDSetStr = GTIDSetStr;
-    }
-
 }

+ 13 - 17
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java

@@ -1,15 +1,13 @@
 package com.alibaba.otter.canal.parse.inbound.mysql;
 
-import java.io.IOException;
 import java.nio.charset.Charset;
 
-import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
-import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvertGTID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.filter.CanalEventFilter;
 import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.inbound.AbstractEventParser;
 import com.alibaba.otter.canal.parse.inbound.BinlogParser;
@@ -37,20 +35,7 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
     protected boolean           useDruidDdlFilter       = true;
 
     protected BinlogParser buildParser() {
-        LogEventConvert convert;
-
-        if (isGTIDMode()) {
-            EntryPosition position;
-            try {
-                position = findStartPosition(null);
-            } catch (IOException e) {
-                throw new RuntimeException("findStartPosition failed.", e);
-            }
-            convert = new LogEventConvertGTID(MysqlGTIDSet.parse(position.getGtid()));
-        } else {
-            convert = new LogEventConvert();
-        }
-
+        LogEventConvert convert = new LogEventConvert();
         if (eventFilter != null && eventFilter instanceof AviaterRegexFilter) {
             convert.setNameFilter((AviaterRegexFilter) eventFilter);
         }
@@ -84,7 +69,18 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
      * @return
      */
     protected boolean processTableMeta(EntryPosition position) {
+        if (isGTIDMode()) {
+            if (binlogParser instanceof LogEventConvert) {
+                // 记录gtid
+                ((LogEventConvert) binlogParser).setGtidSet(MysqlGTIDSet.parse(position.getGtid()));
+            }
+        }
+
         if (tableMetaTSDB != null) {
+            if (position.getTimestamp() == null || position.getTimestamp() <= 0) {
+                throw new CanalParseException("use gtid and TableMeta TSDB should be config timestamp > 0");
+            }
+
             return tableMetaTSDB.rollback(position);
         }
 

+ 5 - 5
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlEventParser.java

@@ -10,10 +10,6 @@ import java.util.Map;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
-import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
-import com.alibaba.otter.canal.parse.inbound.BinlogParser;
-import com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvertGTID;
 import org.apache.commons.lang.StringUtils;
 import org.springframework.util.CollectionUtils;
 
@@ -354,8 +350,12 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
             if (logPosition != null) {
                 return logPosition.getPostion();
             }
-            return masterPosition;
+
+            if (StringUtils.isNotEmpty(masterPosition.getGtid())) {
+                return masterPosition;
+            }
         }
+
         EntryPosition startPosition = findStartPositionInternal(connection);
         if (needTransactionPosition.get()) {
             logger.warn("prepare to find last position : {}", startPosition.toString());

+ 30 - 4
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java

@@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory;
 
 import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
 import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
+import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
 import com.alibaba.otter.canal.parse.exception.CanalParseException;
 import com.alibaba.otter.canal.parse.exception.TableIdNotFoundException;
 import com.alibaba.otter.canal.parse.inbound.BinlogParser;
@@ -40,6 +41,7 @@ import com.alibaba.otter.canal.protocol.position.EntryPosition;
 import com.google.protobuf.ByteString;
 import com.taobao.tddl.dbsync.binlog.LogEvent;
 import com.taobao.tddl.dbsync.binlog.event.DeleteRowsLogEvent;
+import com.taobao.tddl.dbsync.binlog.event.GtidLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.IntvarLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.LogHeader;
 import com.taobao.tddl.dbsync.binlog.event.QueryLogEvent;
@@ -56,7 +58,6 @@ import com.taobao.tddl.dbsync.binlog.event.UserVarLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.XidLogEvent;
 import com.taobao.tddl.dbsync.binlog.event.mariadb.AnnotateRowsEvent;
-import com.taobao.tddl.dbsync.binlog.event.GtidLogEvent;
 
 /**
  * 基于{@linkplain LogEvent}转化为Entry对象的处理
@@ -93,6 +94,17 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
     private boolean                     filterRows          = false;
     private boolean                     useDruidDdlFilter   = true;
 
+    // latest gtid
+    private GTIDSet                     gtidSet;
+
+    public LogEventConvert(GTIDSet gtidSet){
+        this.gtidSet = gtidSet;
+    }
+
+    public LogEventConvert(){
+
+    }
+
     @Override
     public Entry parse(LogEvent logEvent, boolean isSeek) throws CanalParseException {
         if (logEvent == null || logEvent instanceof UnknownLogEvent) {
@@ -148,10 +160,15 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
 
     private Entry parseGTIDLogEvent(GtidLogEvent logEvent) {
         LogHeader logHeader = logEvent.getHeader();
-        Header header = createHeader("", logHeader, "", "", EventType.GTID);
+        String value = logEvent.getSid().toString() + ":" + logEvent.getGno();
         Pair.Builder builder = Pair.newBuilder();
         builder.setKey("gtid");
-        builder.setValue(String.format("%s:%d", logEvent.getSid(), logEvent.getGno()));
+        builder.setValue(value);
+        if (gtidSet != null) {
+            gtidSet.update(value);
+        }
+
+        Header header = createHeader("", logHeader, "", "", EventType.GTID);
         return createEntry(header, EntryType.GTIDLOG, builder.build().toByteString());
     }
 
@@ -737,6 +754,11 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
             headerBuilder.setTableName(tableName);
         }
         headerBuilder.setEventLength(logHeader.getEventLen());
+        // enable gtid position
+        if (gtidSet != null) {
+            String gtid = gtidSet.toString();
+            headerBuilder.setGtid(gtid);
+        }
         return headerBuilder.build();
     }
 
@@ -785,7 +807,7 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         return "LONGTEXT".equalsIgnoreCase(columnType) || "MEDIUMTEXT".equalsIgnoreCase(columnType)
                || "TEXT".equalsIgnoreCase(columnType) || "TINYTEXT".equalsIgnoreCase(columnType);
     }
-    
+
     private boolean isAliSQLHeartBeat(String schema, String table) {
         return "test".equalsIgnoreCase(schema) && "heartbeat".equalsIgnoreCase(table);
     }
@@ -857,4 +879,8 @@ public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogPar
         this.filterRows = filterRows;
     }
 
+    public void setGtidSet(GTIDSet gtidSet) {
+        this.gtidSet = gtidSet;
+    }
+
 }

+ 0 - 55
parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvertGTID.java

@@ -1,55 +0,0 @@
-package com.alibaba.otter.canal.parse.inbound.mysql.dbsync;
-
-import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
-import com.alibaba.otter.canal.parse.exception.CanalParseException;
-import com.alibaba.otter.canal.parse.inbound.BinlogParser;
-import com.alibaba.otter.canal.protocol.CanalEntry;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.taobao.tddl.dbsync.binlog.LogEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by hiwjd on 2018/5/2.
- * hiwjd0@gmail.com
- */
-public class LogEventConvertGTID extends LogEventConvert implements BinlogParser<LogEvent> {
-
-    public static final Logger logger = LoggerFactory.getLogger(LogEventConvertGTID.class);
-
-    // latest gtid
-    private GTIDSet gtidSet;
-
-    public LogEventConvertGTID(GTIDSet gtidSet) {
-        this.gtidSet = gtidSet;
-    }
-
-    @Override
-    public CanalEntry.Entry parse(LogEvent event, boolean isSeek) throws CanalParseException {
-        CanalEntry.Entry entry = super.parse(event, isSeek);
-        if (entry == null) {
-            return null;
-        }
-
-        if (entry.getEntryType() == CanalEntry.EntryType.GTIDLOG) {
-            try {
-                CanalEntry.Pair pair = CanalEntry.Pair.parseFrom(entry.getStoreValue());
-                gtidSet.update(pair.getValue());
-
-            } catch (InvalidProtocolBufferException e) {
-                logger.error("retrive gtid failed.", e);
-                throw new CanalParseException("retrive gtid failed.", e);
-            }
-
-            return null;
-        }
-
-        if (gtidSet != null) {
-            String gtid = gtidSet.toString();
-            CanalEntry.Header header = entry.getHeader().toBuilder().setGtid(gtid).build();
-            entry = entry.toBuilder().setHeader(header).build();
-        }
-
-        return entry;
-    }
-}

+ 2 - 2
parse/src/test/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlDumpTest.java

@@ -27,8 +27,7 @@ public class MysqlDumpTest {
     @Test
     public void testSimple() {
         final MysqlEventParser controller = new MysqlEventParser();
-        final EntryPosition startPosition = new EntryPosition("mysql-bin.000001", 104606L);
-
+        final EntryPosition startPosition = new EntryPosition("mysql-bin.000010", 154L);
         controller.setConnectionCharset(Charset.forName("UTF-8"));
         controller.setSlaveId(3344L);
         controller.setDetectingEnable(false);
@@ -39,6 +38,7 @@ public class MysqlDumpTest {
         controller.setTsdbSpringXml("classpath:tsdb/h2-tsdb.xml");
         controller.setEventFilter(new AviaterRegexFilter("test\\..*"));
         controller.setEventBlackFilter(new AviaterRegexFilter("canal_tsdb\\..*"));
+        controller.setIsGTIDMode(true);
         controller.setEventSink(new AbstractCanalEventSinkTest<List<Entry>>() {
 
             public boolean sink(List<Entry> entrys, InetSocketAddress remoteAddress, String destination)