|
@@ -28,7 +28,6 @@ import com.alibaba.otter.canal.store.model.Event;
|
|
import com.alibaba.otter.canal.store.model.Events;
|
|
import com.alibaba.otter.canal.store.model.Events;
|
|
import com.google.common.base.Function;
|
|
import com.google.common.base.Function;
|
|
import com.google.common.collect.Lists;
|
|
import com.google.common.collect.Lists;
|
|
-import com.google.common.collect.MapMaker;
|
|
|
|
import com.google.common.collect.Maps;
|
|
import com.google.common.collect.Maps;
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -38,7 +37,8 @@ import com.google.common.collect.Maps;
|
|
* @author zebin.xuzb
|
|
* @author zebin.xuzb
|
|
* @version 1.0.0
|
|
* @version 1.0.0
|
|
*/
|
|
*/
|
|
-public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer {
|
|
|
|
|
|
+public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, com.alibaba.otter.canal.server.CanalService
|
|
|
|
+{
|
|
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(CanalServerWithEmbedded.class);
|
|
private static final Logger logger = LoggerFactory.getLogger(CanalServerWithEmbedded.class);
|
|
private Map<String, CanalInstance> canalInstances;
|
|
private Map<String, CanalInstance> canalInstances;
|
|
@@ -116,6 +116,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
|
|
/**
|
|
/**
|
|
* 客户端订阅,重复订阅时会更新对应的filter信息
|
|
* 客户端订阅,重复订阅时会更新对应的filter信息
|
|
*/
|
|
*/
|
|
|
|
+ @Override
|
|
public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
|
|
public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
|
|
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
|
|
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
|
|
if (!canalInstance.getMetaManager().isStart()) {
|
|
if (!canalInstance.getMetaManager().isStart()) {
|
|
@@ -142,6 +143,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
|
|
/**
|
|
/**
|
|
* 取消订阅
|
|
* 取消订阅
|
|
*/
|
|
*/
|
|
|
|
+ @Override
|
|
public void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException {
|
|
public void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException {
|
|
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
|
|
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
|
|
canalInstance.getMetaManager().unsubscribe(clientIdentity); // 执行一下meta订阅
|
|
canalInstance.getMetaManager().unsubscribe(clientIdentity); // 执行一下meta订阅
|
|
@@ -164,6 +166,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
|
|
* 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
|
|
* 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
|
|
* </pre>
|
|
* </pre>
|
|
*/
|
|
*/
|
|
|
|
+ @Override
|
|
public Message get(ClientIdentity clientIdentity, int batchSize) throws CanalServerException {
|
|
public Message get(ClientIdentity clientIdentity, int batchSize) throws CanalServerException {
|
|
return get(clientIdentity, batchSize, null, null);
|
|
return get(clientIdentity, batchSize, null, null);
|
|
}
|
|
}
|
|
@@ -181,6 +184,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
|
|
* 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
|
|
* 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
|
|
* </pre>
|
|
* </pre>
|
|
*/
|
|
*/
|
|
|
|
+ @Override
|
|
public Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
|
|
public Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
|
|
throws CanalServerException {
|
|
throws CanalServerException {
|
|
checkStart(clientIdentity.getDestination());
|
|
checkStart(clientIdentity.getDestination());
|
|
@@ -215,8 +219,8 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
|
|
});
|
|
});
|
|
|
|
|
|
logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",
|
|
logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",
|
|
- new Object[] { clientIdentity.getClientId(), batchSize, entrys.size(), batchId,
|
|
|
|
- events.getPositionRange() });
|
|
|
|
|
|
+ clientIdentity.getClientId(), batchSize, entrys.size(), batchId,
|
|
|
|
+ events.getPositionRange());
|
|
// 直接提交ack
|
|
// 直接提交ack
|
|
ack(clientIdentity, batchId);
|
|
ack(clientIdentity, batchId);
|
|
return new Message(batchId, entrys);
|
|
return new Message(batchId, entrys);
|
|
@@ -232,6 +236,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
|
|
* 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
|
|
* 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
|
|
* </pre>
|
|
* </pre>
|
|
*/
|
|
*/
|
|
|
|
+ @Override
|
|
public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize) throws CanalServerException {
|
|
public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize) throws CanalServerException {
|
|
return getWithoutAck(clientIdentity, batchSize, null, null);
|
|
return getWithoutAck(clientIdentity, batchSize, null, null);
|
|
}
|
|
}
|
|
@@ -250,6 +255,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
|
|
* 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
|
|
* 注意: meta获取和数据的获取需要保证顺序性,优先拿到meta的,一定也会是优先拿到数据,所以需要加同步. (不能出现先拿到meta,拿到第二批数据,这样就会导致数据顺序性出现问题)
|
|
* </pre>
|
|
* </pre>
|
|
*/
|
|
*/
|
|
|
|
+ @Override
|
|
public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
|
|
public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit)
|
|
throws CanalServerException {
|
|
throws CanalServerException {
|
|
checkStart(clientIdentity.getDestination());
|
|
checkStart(clientIdentity.getDestination());
|
|
@@ -287,8 +293,8 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
|
|
});
|
|
});
|
|
|
|
|
|
logger.info("getWithoutAck successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",
|
|
logger.info("getWithoutAck successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]",
|
|
- new Object[] { clientIdentity.getClientId(), batchSize, entrys.size(), batchId,
|
|
|
|
- events.getPositionRange() });
|
|
|
|
|
|
+ clientIdentity.getClientId(), batchSize, entrys.size(), batchId,
|
|
|
|
+ events.getPositionRange());
|
|
return new Message(batchId, entrys);
|
|
return new Message(batchId, entrys);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -316,6 +322,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
|
|
* 注意:进行反馈时必须按照batchId的顺序进行ack(需有客户端保证)
|
|
* 注意:进行反馈时必须按照batchId的顺序进行ack(需有客户端保证)
|
|
* </pre>
|
|
* </pre>
|
|
*/
|
|
*/
|
|
|
|
+ @Override
|
|
public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {
|
|
public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {
|
|
checkStart(clientIdentity.getDestination());
|
|
checkStart(clientIdentity.getDestination());
|
|
checkSubscribe(clientIdentity);
|
|
checkSubscribe(clientIdentity);
|
|
@@ -350,7 +357,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
|
|
if (positionRanges.getAck() != null) {
|
|
if (positionRanges.getAck() != null) {
|
|
canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
|
|
canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
|
|
logger.info("ack successfully, clientId:{} batchId:{} position:{}",
|
|
logger.info("ack successfully, clientId:{} batchId:{} position:{}",
|
|
- new Object[] { clientIdentity.getClientId(), batchId, positionRanges });
|
|
|
|
|
|
+ clientIdentity.getClientId(), batchId, positionRanges);
|
|
}
|
|
}
|
|
|
|
|
|
// 可定时清理数据
|
|
// 可定时清理数据
|
|
@@ -361,6 +368,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
|
|
/**
|
|
/**
|
|
* 回滚到未进行 {@link #ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link #ack} 的地方开始拿
|
|
* 回滚到未进行 {@link #ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link #ack} 的地方开始拿
|
|
*/
|
|
*/
|
|
|
|
+ @Override
|
|
public void rollback(ClientIdentity clientIdentity) throws CanalServerException {
|
|
public void rollback(ClientIdentity clientIdentity) throws CanalServerException {
|
|
checkStart(clientIdentity.getDestination());
|
|
checkStart(clientIdentity.getDestination());
|
|
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
|
|
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
|
|
@@ -382,6 +390,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
|
|
/**
|
|
/**
|
|
* 回滚到未进行 {@link #ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link #ack} 的地方开始拿
|
|
* 回滚到未进行 {@link #ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link #ack} 的地方开始拿
|
|
*/
|
|
*/
|
|
|
|
+ @Override
|
|
public void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException {
|
|
public void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException {
|
|
checkStart(clientIdentity.getDestination());
|
|
checkStart(clientIdentity.getDestination());
|
|
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
|
|
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
|
|
@@ -407,7 +416,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
|
|
canalInstance.getEventStore().rollback();// rollback
|
|
canalInstance.getEventStore().rollback();// rollback
|
|
// eventStore中的状态信息
|
|
// eventStore中的状态信息
|
|
logger.info("rollback successfully, clientId:{} batchId:{} position:{}",
|
|
logger.info("rollback successfully, clientId:{} batchId:{} position:{}",
|
|
- new Object[] { clientIdentity.getClientId(), batchId, positionRanges });
|
|
|
|
|
|
+ clientIdentity.getClientId(), batchId, positionRanges);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|