|
@@ -43,14 +43,12 @@ import com.alibaba.otter.canal.server.netty.NettyUtils;
|
|
|
public class SessionHandler extends SimpleChannelHandler {
|
|
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(SessionHandler.class);
|
|
|
- private CanalServerWithEmbedded embededServer;
|
|
|
+ private CanalServerWithEmbedded embeddedServer;
|
|
|
|
|
|
- public SessionHandler(){
|
|
|
+ public SessionHandler(){ }
|
|
|
|
|
|
- }
|
|
|
-
|
|
|
- public SessionHandler(CanalServerWithEmbedded embededServer){
|
|
|
- this.embededServer = embededServer;
|
|
|
+ public SessionHandler(CanalServerWithEmbedded embeddedServer){
|
|
|
+ this.embeddedServer = embeddedServer;
|
|
|
}
|
|
|
|
|
|
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
|
|
@@ -66,10 +64,10 @@ public class SessionHandler extends SimpleChannelHandler {
|
|
|
clientIdentity = new ClientIdentity(sub.getDestination(), Short.valueOf(sub.getClientId()),
|
|
|
sub.getFilter());
|
|
|
MDC.put("destination", clientIdentity.getDestination());
|
|
|
- embededServer.subscribe(clientIdentity);
|
|
|
+ embeddedServer.subscribe(clientIdentity);
|
|
|
|
|
|
// 尝试启动,如果已经启动,忽略
|
|
|
- if (!embededServer.isStart(clientIdentity.getDestination())) {
|
|
|
+ if (!embeddedServer.isStart(clientIdentity.getDestination())) {
|
|
|
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
|
|
|
if (!runningMonitor.isStart()) {
|
|
|
runningMonitor.start();
|
|
@@ -90,7 +88,7 @@ public class SessionHandler extends SimpleChannelHandler {
|
|
|
clientIdentity = new ClientIdentity(unsub.getDestination(), Short.valueOf(unsub.getClientId()),
|
|
|
unsub.getFilter());
|
|
|
MDC.put("destination", clientIdentity.getDestination());
|
|
|
- embededServer.unsubscribe(clientIdentity);
|
|
|
+ embeddedServer.unsubscribe(clientIdentity);
|
|
|
stopCanalInstanceIfNecessary(clientIdentity);// 尝试关闭
|
|
|
NettyUtils.ack(ctx.getChannel(), null);
|
|
|
} else {
|
|
@@ -108,17 +106,17 @@ public class SessionHandler extends SimpleChannelHandler {
|
|
|
|
|
|
// if (get.getAutoAck()) {
|
|
|
// if (get.getTimeout() == -1) {//是否是初始值
|
|
|
- // message = embededServer.get(clientIdentity, get.getFetchSize());
|
|
|
+ // message = embeddedServer.get(clientIdentity, get.getFetchSize());
|
|
|
// } else {
|
|
|
// TimeUnit unit = convertTimeUnit(get.getUnit());
|
|
|
- // message = embededServer.get(clientIdentity, get.getFetchSize(), get.getTimeout(), unit);
|
|
|
+ // message = embeddedServer.get(clientIdentity, get.getFetchSize(), get.getTimeout(), unit);
|
|
|
// }
|
|
|
// } else {
|
|
|
if (get.getTimeout() == -1) {//是否是初始值
|
|
|
- message = embededServer.getWithoutAck(clientIdentity, get.getFetchSize());
|
|
|
+ message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
|
|
|
} else {
|
|
|
TimeUnit unit = convertTimeUnit(get.getUnit());
|
|
|
- message = embededServer.getWithoutAck(clientIdentity, get.getFetchSize(), get.getTimeout(),
|
|
|
+ message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize(), get.getTimeout(),
|
|
|
unit);
|
|
|
}
|
|
|
// }
|
|
@@ -153,7 +151,7 @@ public class SessionHandler extends SimpleChannelHandler {
|
|
|
// donothing
|
|
|
} else {
|
|
|
clientIdentity = new ClientIdentity(ack.getDestination(), Short.valueOf(ack.getClientId()));
|
|
|
- embededServer.ack(clientIdentity, ack.getBatchId());
|
|
|
+ embeddedServer.ack(clientIdentity, ack.getBatchId());
|
|
|
}
|
|
|
} else {
|
|
|
NettyUtils.error(401,
|
|
@@ -169,9 +167,9 @@ public class SessionHandler extends SimpleChannelHandler {
|
|
|
clientIdentity = new ClientIdentity(rollback.getDestination(),
|
|
|
Short.valueOf(rollback.getClientId()));
|
|
|
if (rollback.getBatchId() == 0L) {
|
|
|
- embededServer.rollback(clientIdentity);// 回滚所有批次
|
|
|
+ embeddedServer.rollback(clientIdentity);// 回滚所有批次
|
|
|
} else {
|
|
|
- embededServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滚单个批次
|
|
|
+ embeddedServer.rollback(clientIdentity, rollback.getBatchId()); // 只回滚单个批次
|
|
|
}
|
|
|
} else {
|
|
|
NettyUtils.error(401,
|
|
@@ -212,7 +210,7 @@ public class SessionHandler extends SimpleChannelHandler {
|
|
|
}
|
|
|
|
|
|
private void stopCanalInstanceIfNecessary(ClientIdentity clientIdentity) {
|
|
|
- List<ClientIdentity> clientIdentitys = embededServer.listAllSubscribe(clientIdentity.getDestination());
|
|
|
+ List<ClientIdentity> clientIdentitys = embeddedServer.listAllSubscribe(clientIdentity.getDestination());
|
|
|
if (clientIdentitys != null && clientIdentitys.size() == 1 && clientIdentitys.contains(clientIdentity)) {
|
|
|
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
|
|
|
if (runningMonitor.isStart()) {
|
|
@@ -242,8 +240,8 @@ public class SessionHandler extends SimpleChannelHandler {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void setEmbededServer(CanalServerWithEmbedded embededServer) {
|
|
|
- this.embededServer = embededServer;
|
|
|
+ public void setEmbeddedServer(CanalServerWithEmbedded embeddedServer) {
|
|
|
+ this.embeddedServer = embeddedServer;
|
|
|
}
|
|
|
|
|
|
}
|