mcy 5 роки тому
батько
коміт
317439670c
20 змінених файлів з 351 додано та 56 видалено
  1. 1 1
      .gitignore
  2. 0 6
      canal-admin/canal-admin-server/pom.xml
  3. 25 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/common/exception/ConfigurationException.java
  4. 31 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/common/exception/ParamValidationException.java
  5. 39 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/common/exception/PermissionDeniedException.java
  6. 43 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/common/exception/ResourceNotFoundException.java
  7. 47 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/common/exception/ServiceException.java
  8. 43 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/common/exception/SystemException.java
  9. 30 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/common/exception/VersionValidationException.java
  10. 23 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/controller/CanalConfigController.java
  11. 12 17
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/dao/CanalConfigDao.java
  12. 3 6
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/model/CanalConfig.java
  13. 0 7
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/model/Person.java
  14. 10 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/CanalConfigService.java
  15. 27 0
      canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/impl/CanalConfigServiceImpl.java
  16. 5 5
      canal-admin/pom.xml
  17. 3 0
      client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java
  18. 3 8
      client-adapter/launcher/src/main/resources/application.yml
  19. 3 3
      deployer/src/main/resources/canal.properties
  20. 3 3
      deployer/src/main/resources/example/instance.properties

+ 1 - 1
.gitignore

@@ -21,4 +21,4 @@ client-adapter/example/
 
 canal-admin/canal-admin-ui/dist
 canal-admin/canal-admin-ui/node
-canal-admin/canal-admin-ui/node-modules
+canal-admin/canal-admin-ui/node_modules

+ 0 - 6
canal-admin/canal-admin-server/pom.xml

@@ -21,19 +21,13 @@
             <artifactId>spring-boot-starter-jdbc</artifactId>
         </dependency>
 
-        <dependency>
-            <groupId>org.javalite</groupId>
-            <artifactId>activejdbc</artifactId>
-        </dependency>
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
         </dependency>
-
         <dependency>
             <groupId>commons-dbutils</groupId>
             <artifactId>commons-dbutils</artifactId>
-            <version>1.6</version>
         </dependency>
     </dependencies>
 

+ 25 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/common/exception/ConfigurationException.java

@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.otter.canal.admin.common.exception;
+
+public class ConfigurationException extends RuntimeException{
+
+    public ConfigurationException(String message) {
+        super(message);
+    }
+}

+ 31 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/common/exception/ParamValidationException.java

@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.otter.canal.admin.common.exception;
+
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.ResponseStatus;
+
+/**
+ * Parameter validation failure exception
+ */
+@ResponseStatus(value = HttpStatus.BAD_REQUEST)
+public class ParamValidationException extends SystemException {
+
+    public ParamValidationException(String message) {
+        super(message);
+    }
+}

+ 39 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/common/exception/PermissionDeniedException.java

@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.otter.canal.admin.common.exception;
+
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.ResponseStatus;
+
+/**
+ * Permission denied exception
+ */
+@ResponseStatus(value = HttpStatus.UNAUTHORIZED)
+public class PermissionDeniedException extends RuntimeException {
+
+    public PermissionDeniedException(String message) {
+        super(message);
+    }
+
+    public PermissionDeniedException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    protected PermissionDeniedException() {
+        super();
+    }
+}

+ 43 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/common/exception/ResourceNotFoundException.java

@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.otter.canal.admin.common.exception;
+
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.ResponseStatus;
+
+/**
+ * Resource not found exception
+ */
+@ResponseStatus(value = HttpStatus.NOT_FOUND)
+public class ResourceNotFoundException extends SystemException {
+
+    public ResourceNotFoundException() {
+    }
+
+    public ResourceNotFoundException(String message) {
+        super(message);
+    }
+
+    public ResourceNotFoundException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ResourceNotFoundException(Throwable cause) {
+        super(cause);
+    }
+
+}

+ 47 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/common/exception/ServiceException.java

@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.otter.canal.admin.common.exception;
+
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.ResponseStatus;
+
+/**
+ * Service Logic Exception
+ */
+@ResponseStatus(value = HttpStatus.SERVICE_UNAVAILABLE)
+public class ServiceException extends RuntimeException {
+
+	public ServiceException() {
+	}
+
+	public ServiceException(String message) {
+		super(message);
+	}
+
+	public ServiceException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public ServiceException(Throwable cause) {
+		super(cause);
+	}
+
+	public ServiceException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+		super(message, cause, enableSuppression, writableStackTrace);
+	}
+
+}

+ 43 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/common/exception/SystemException.java

@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.otter.canal.admin.common.exception;
+
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.ResponseStatus;
+
+/**
+ * System Exception
+ */
+@ResponseStatus(value = HttpStatus.INTERNAL_SERVER_ERROR)
+public class SystemException extends RuntimeException {
+
+    public SystemException() {
+        super();
+    }
+
+    public SystemException(String message) {
+        super(message);
+    }
+
+    public SystemException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public SystemException(Throwable cause) {
+        super(cause);
+    }
+}

+ 30 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/common/exception/VersionValidationException.java

@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.otter.canal.admin.common.exception;
+
+
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.ResponseStatus;
+
+@ResponseStatus(value = HttpStatus.BAD_REQUEST)
+public class VersionValidationException extends SystemException{
+
+    public VersionValidationException(String message) {
+        super(message);
+    }
+}

+ 23 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/controller/CanalConfigController.java

@@ -0,0 +1,23 @@
+package com.alibaba.otter.canal.admin.controller;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+import com.alibaba.otter.canal.admin.model.CanalConfig;
+import com.alibaba.otter.canal.admin.service.CanalConfigService;
+
+@RestController
+@RequestMapping("/api/{env}/config")
+public class CanalConfigController {
+
+    @Autowired
+    CanalConfigService canalConfigService;
+
+    @RequestMapping(value = "/canal", method = RequestMethod.GET)
+    public CanalConfig canalConfig(@PathVariable String env) {
+        return canalConfigService.getCanalConfig();
+    }
+}

+ 12 - 17
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/dao/CanalConfigDao.java

@@ -6,35 +6,30 @@ import org.apache.commons.dbutils.handlers.BeanHandler;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Repository;
 
-import javax.annotation.PostConstruct;
 import java.sql.SQLException;
 import java.util.Date;
 
 @Repository
 public class CanalConfigDao {
+
     @Autowired
     private QueryRunner runner;
 
-    @PostConstruct
-    public void init() {
+    public CanalConfig findById(Long id) {
         try {
-            CanalConfig canalConfig = findById(1L);
-            canalConfig = canalConfig;
-            canalConfig.setContent(canalConfig.getContent()+"   xxx");
-            updateContent(canalConfig);
+            String sql = "select id,name,content,modified_time as modifiedTime" + " from canal_config where id=?";
+            return runner.query(sql, new BeanHandler<>(CanalConfig.class), id);
         } catch (SQLException e) {
-            e.printStackTrace();
+            throw new RuntimeException(e.getMessage(), e);
         }
     }
 
-    public CanalConfig findById(Long id) throws SQLException {
-        String sql = "select id,name,content,modified_time as modifiedTime" +
-                " from canal_config where id=?";
-        return runner.query(sql, new BeanHandler<>(CanalConfig.class), id);
-    }
-
-    public void updateContent(CanalConfig canalConfig) throws SQLException {
-        String sql = "update canal_config set content=?, modified_time=? where id=?";
-        runner.update(sql, canalConfig.getContent(), new Date(), canalConfig.getId());
+    public int updateContent(CanalConfig canalConfig){
+        try {
+            String sql = "update canal_config set content=?, modified_time=? where id=?";
+            return runner.update(sql, canalConfig.getContent(), new Date(), canalConfig.getId());
+        } catch (SQLException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
     }
 }

+ 3 - 6
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/model/CanalConfig.java

@@ -1,16 +1,13 @@
 package com.alibaba.otter.canal.admin.model;
 
-import org.javalite.activejdbc.Model;
-import org.javalite.activejdbc.annotations.Table;
-
-import java.sql.Timestamp;
 import java.util.Date;
 
 public class CanalConfig {
-    private Long id;
+
+    private Long   id;
     private String name;
     private String content;
-    private Date modifiedTime;
+    private Date   modifiedTime;
 
     public Long getId() {
         return id;

+ 0 - 7
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/model/Person.java

@@ -1,7 +0,0 @@
-package com.alibaba.otter.canal.admin.model;
-
-import org.javalite.activejdbc.Model;
-
-public class Person extends Model {
-
-}

+ 10 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/CanalConfigService.java

@@ -0,0 +1,10 @@
+package com.alibaba.otter.canal.admin.service;
+
+import com.alibaba.otter.canal.admin.model.CanalConfig;
+
+public interface CanalConfigService {
+
+    CanalConfig getCanalConfig();
+
+    CanalConfig getAdapterConfig();
+}

+ 27 - 0
canal-admin/canal-admin-server/src/main/java/com/alibaba/otter/canal/admin/service/impl/CanalConfigServiceImpl.java

@@ -0,0 +1,27 @@
+package com.alibaba.otter.canal.admin.service.impl;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import com.alibaba.otter.canal.admin.dao.CanalConfigDao;
+import com.alibaba.otter.canal.admin.model.CanalConfig;
+import com.alibaba.otter.canal.admin.service.CanalConfigService;
+
+@Service
+public class CanalConfigServiceImpl implements CanalConfigService {
+
+    @Autowired
+    CanalConfigDao canalConfigDao;
+
+    public CanalConfig getCanalConfig() {
+        return canalConfigDao.findById(1L);
+    }
+
+    public CanalConfig getAdapterConfig() {
+        return canalConfigDao.findById(2L);
+    }
+
+    public void updateContent(CanalConfig canalConfig) {
+        canalConfigDao.findById(canalConfig.getId());
+    }
+}

+ 5 - 5
canal-admin/pom.xml

@@ -33,16 +33,16 @@
                 <scope>import</scope>
             </dependency>
 
-            <dependency>
-                <groupId>org.javalite</groupId>
-                <artifactId>activejdbc</artifactId>
-                <version>2.3</version>
-            </dependency>
             <dependency>
                 <groupId>mysql</groupId>
                 <artifactId>mysql-connector-java</artifactId>
                 <version>5.1.40</version>
             </dependency>
+            <dependency>
+                <groupId>commons-dbutils</groupId>
+                <artifactId>commons-dbutils</artifactId>
+                <version>1.6</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 

+ 3 - 0
client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/AbstractCanalAdapterWorker.java

@@ -6,6 +6,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import com.alibaba.fastjson.JSON;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -151,6 +152,8 @@ public abstract class AbstractCanalAdapterWorker {
                     if (flatMessage) {
                         // batch write
                         writeOut((List<FlatMessage>) messages);
+                        //FIXME xxx
+                        messages.forEach((message -> System.out.println(JSON.toJSONString(message))));
                     } else {
                         for (final Object message : messages) {
                             writeOut((Message) message);

+ 3 - 8
client-adapter/launcher/src/main/resources/application.yml

@@ -7,10 +7,10 @@ spring:
     default-property-inclusion: non_null
 
 canal.conf:
-  mode: tcp # kafka rocketMQ
-  canalServerHost: 127.0.0.1:11111
+  mode: kafka # kafka rocketMQ
+#  canalServerHost: 127.0.0.1:11111
 #  zookeeperHosts: slave1:2181
-#  mqServers: 127.0.0.1:9092 #or rocketmq nameservers
+  mqServers: 127.0.0.1:9092 #or rocketmq
 #  flatMessage: true
   batchSize: 500
   syncBatchSize: 1000
@@ -18,11 +18,6 @@ canal.conf:
   timeout:
   accessKey:
   secretKey:
-# enableMessageTrace:
-# accessChannel:
-# customizedTraceTopic:
-# namespace:
-
 #  srcDataSources:
 #    defaultDS:
 #      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true

+ 3 - 3
deployer/src/main/resources/canal.properties

@@ -13,7 +13,7 @@ canal.zkServers =
 canal.zookeeper.flush.period = 1000
 canal.withoutNetty = false
 # tcp, kafka, RocketMQ
-canal.serverMode = tcp
+canal.serverMode = kafka
 # flush meta cursor/parse position to file
 canal.file.data.dir = ${canal.conf.dir}
 canal.file.flush.period = 1000
@@ -67,7 +67,7 @@ canal.instance.parser.parallel = true
 canal.instance.parser.parallelBufferSize = 256
 
 # table meta tsdb info
-canal.instance.tsdb.enable = true
+canal.instance.tsdb.enable = false
 canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
 canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
 canal.instance.tsdb.dbUsername = canal
@@ -104,7 +104,7 @@ canal.instance.global.spring.xml = classpath:spring/file-instance.xml
 ##################################################
 ######### 		     MQ 		     #############
 ##################################################
-canal.mq.servers = 127.0.0.1:6667
+canal.mq.servers = 127.0.0.1:9092
 canal.mq.retries = 0
 canal.mq.batchSize = 16384
 canal.mq.maxRequestSize = 1048576

+ 3 - 3
deployer/src/main/resources/example/instance.properties

@@ -18,7 +18,7 @@ canal.instance.rds.secretkey=
 canal.instance.rds.instanceId=
 
 # table meta tsdb info
-canal.instance.tsdb.enable=true
+canal.instance.tsdb.enable=false
 #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
 #canal.instance.tsdb.dbUsername=canal
 #canal.instance.tsdb.dbPassword=canal
@@ -30,8 +30,8 @@ canal.instance.tsdb.enable=true
 #canal.instance.standby.gtid=
 
 # username/password
-canal.instance.dbUsername=canal
-canal.instance.dbPassword=canal
+canal.instance.dbUsername=root
+canal.instance.dbPassword=121212
 canal.instance.connectionCharset = UTF-8
 # enable druid Decrypt database password
 canal.instance.enableDruid=false