1
0
七锋 6 жил өмнө
parent
commit
11d4761bcb

+ 0 - 1
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -15,7 +15,6 @@ public class CanalConstants {
     public static final String CANAL_ID                          = ROOT + "." + "id";
     public static final String CANAL_IP                          = ROOT + "." + "ip";
     public static final String CANAL_PORT                        = ROOT + "." + "port";
-    public static final String CANAL_METRICS_PULL_PORT           = ROOT + "." + "metrics.pull.port";
     public static final String CANAL_ZKSERVERS                   = ROOT + "." + "zkServers";
     public static final String CANAL_WITHOUT_NETTY               = ROOT + "." + "withoutNetty";
 

+ 0 - 8
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -98,14 +98,6 @@ public class CanalController {
         port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
         embededCanalServer = CanalServerWithEmbedded.instance();
         embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
-        try {
-            int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT));
-            embededCanalServer.setMetricsPort(metricsPort);
-        } catch (NumberFormatException e) {
-            logger.info("No valid metrics server port found, use default 11112.");
-            embededCanalServer.setMetricsPort(11112);
-        }
-
         String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
         if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
             canalServer = CanalServerWithNetty.instance();

+ 0 - 1
deployer/src/main/resources/canal.properties

@@ -4,7 +4,6 @@
 canal.id= 1
 canal.ip=
 canal.port=11111
-canal.metrics.pull.port=11112
 canal.zkServers=
 # flush data to zk
 canal.zookeeper.flush.period = 1000

+ 1 - 34
docker/Dockerfile

@@ -1,32 +1,7 @@
-FROM centos:centos6.7
+FROM canal/osbase:v1
 
 MAINTAINER agapple (jianghang115@gmail.com)
 
-# install system
-RUN \
-    /bin/cp -f /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \
-    echo 'root:Hello1234' | chpasswd && \
-    groupadd -r admin && useradd -g admin admin && \
-    yum install -y man && \
-    yum install -y dstat && \
-    yum install -y unzip && \
-    yum install -y nc && \
-    yum install -y openssh-server && \
-    yum install -y tar && \
-    yum install -y which && \
-    yum install -y wget && \
-    yum install -y perl && \
-    yum install -y file && \
-    ssh-keygen -q -N "" -t dsa -f /etc/ssh/ssh_host_dsa_key && \
-    ssh-keygen -q -N "" -t rsa -f /etc/ssh/ssh_host_rsa_key && \
-    sed -ri 's/session    required     pam_loginuid.so/#session    required     pam_loginuid.so/g' /etc/pam.d/sshd && \
-    sed -i -e 's/^#Port 22$/Port 2222/' /etc/ssh/sshd_config && \
-    mkdir -p /root/.ssh && chown root.root /root && chmod 700 /root/.ssh && \
-    yum install -y cronie && \
-    sed -i '/session required pam_loginuid.so/d' /etc/pam.d/crond && \
-    yum clean all && \
-    true
-
 # install canal
 COPY image/ /tmp/docker/
 COPY canal.deployer-*.tar.gz /home/admin/
@@ -39,14 +14,6 @@ RUN \
     cp -R /tmp/docker/admin/* /home/admin/  && \
     /bin/cp -f alidata/bin/lark-wait /usr/bin/lark-wait && \
 
-    touch /var/lib/rpm/* && \ 
-    yum -y install /tmp/jdk-8-linux-x64.rpm && \
-    /bin/rm -f /tmp/jdk-8-linux-x64.rpm && \
-
-    echo "export JAVA_HOME=/usr/java/latest" >> /etc/profile && \
-    echo "export PATH=\$JAVA_HOME/bin:\$PATH" >> /etc/profile && \
-    /bin/mv /home/admin/bin/clean_log /etc/cron.d && \
-
     mkdir -p /home/admin/canal-server && \
     tar -xzvf /home/admin/canal.deployer-*.tar.gz -C /home/admin/canal-server && \
     /bin/rm -f /home/admin/canal.deployer-*.tar.gz && \

+ 41 - 0
docker/base/Dockerfile

@@ -0,0 +1,41 @@
+FROM centos:centos6.10
+
+MAINTAINER agapple (jianghang115@gmail.com)
+
+env DOWNLOAD_LINK="http://download.oracle.com/otn-pub/java/jdk/8u181-b13/96a7b8442fe848ef90c96a2fad6ed6d1/jdk-8u181-linux-x64.rpm"
+# install system
+RUN \
+    /bin/cp -f /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \
+    echo 'root:Hello1234' | chpasswd && \
+    groupadd -r admin && useradd -g admin admin && \
+    yum install -y man && \
+    yum install -y dstat && \
+    yum install -y unzip && \
+    yum install -y nc && \
+    yum install -y openssh-server && \
+    yum install -y tar && \
+    yum install -y which && \
+    yum install -y wget && \
+    yum install -y perl && \
+    yum install -y file && \
+    ssh-keygen -q -N "" -t dsa -f /etc/ssh/ssh_host_dsa_key && \
+    ssh-keygen -q -N "" -t rsa -f /etc/ssh/ssh_host_rsa_key && \
+    sed -ri 's/session    required     pam_loginuid.so/#session    required     pam_loginuid.so/g' /etc/pam.d/sshd && \
+    sed -i -e 's/^#Port 22$/Port 2222/' /etc/ssh/sshd_config && \
+    mkdir -p /root/.ssh && chown root.root /root && chmod 700 /root/.ssh && \
+    yum install -y cronie && \
+    sed -i '/session required pam_loginuid.so/d' /etc/pam.d/crond && \
+    true
+
+RUN \
+    touch /var/lib/rpm/* && \ 
+    wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=xxx; oraclelicense=accept-securebackup-cookie" "$DOWNLOAD_LINK" -O /tmp/jdk-8-linux-x64.rpm && \
+    yum -y install /tmp/jdk-8-linux-x64.rpm && \
+    /bin/rm -f /tmp/jdk-8-linux-x64.rpm && \
+
+    echo "export JAVA_HOME=/usr/java/latest" >> /etc/profile && \
+    echo "export PATH=\$JAVA_HOME/bin:\$PATH" >> /etc/profile && \
+    yum clean all && \
+    true
+
+CMD ["/bin/bash"]

+ 10 - 12
docker/build.sh

@@ -14,17 +14,15 @@ case "`uname`" in
 esac
 BASE=${bin_abs_path}
 
-if [ ! -f $BASE/jdk*.rpm ] ; then
-    DOWNLOAD_LINK="http://download.oracle.com/otn-pub/java/jdk/8u181-b13/96a7b8442fe848ef90c96a2fad6ed6d1/jdk-8u181-linux-x64.tar.gz"
-    wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=xxx; oraclelicense=accept-securebackup-cookie" "$DOWNLOAD_LINK" -O $BASE/jdk-8-linux-x64.rpm
-fi
-
-cd $BASE/../ && mvn clean package -Dmaven.test.skip -Denv=release && cd $current_path ;
-
-if [ "$1" == "kafka" ] ; then
-	cp $BASE/../target/canal-kafka-*.tar.gz $BASE/
-	docker build --no-cache -t canal/canal-server $BASE/
+if [ "$1" == "base" ] ; then
+    docker build --no-cache -t canal/osbase $BASE/base
 else 
-	cp $BASE/../target/canal.deployer-*.tar.gz $BASE/
-	docker build --no-cache -t canal/canal-server $BASE/
+    cd $BASE/../ && mvn clean package -Dmaven.test.skip -Denv=release && cd $current_path ;
+    if [ "$1" == "kafka" ] ; then
+	   cp $BASE/../target/canal-kafka-*.tar.gz $BASE/
+	   docker build --no-cache -t canal/canal-server $BASE/
+    else 
+	   cp $BASE/../target/canal.deployer-*.tar.gz $BASE/
+	   docker build --no-cache -t canal/canal-server $BASE/
+    fi
 fi

+ 3 - 1
docker/image/admin/app.sh

@@ -86,7 +86,9 @@ function start_canal() {
         echo "multi destination:$destination is not support"
         exit 1;
     else
-        mv /home/admin/canal-server/conf/example /home/admin/canal-server/conf/$destination
+        if [ "$destination" != "" ] && [ "$destination" != "example" ] ; then
+            mv /home/admin/canal-server/conf/example /home/admin/canal-server/conf/$destination
+        fi 
     fi
     su admin -c 'cd /home/admin/canal-server/bin/ && sh restart.sh 1>>/tmp/start.log 2>&1'
     sleep 5

+ 7 - 2
docker/image/admin/health.sh

@@ -1,6 +1,11 @@
 #!/bin/sh
-CHECK_URL="http://127.0.0.1:8080/metrics"
-CHECK_POINT="success"
+metrics_port=`perl -le 'print $ENV{"canal.metrics.pull.port"}'`
+if [ "$metrics_port" == "" ]; then
+	metrics_port="11112"
+fi
+
+CHECK_URL="http://127.0.0.1:$metrics_port/metrics"
+CHECK_POINT="canal"
 CHECK_COUNT=`curl -s --connect-timeout 7 --max-time 7 $CHECK_URL | grep -c $CHECK_POINT`
 if [ $CHECK_COUNT -eq 0 ]; then
     echo "[FAILED]"

+ 1 - 1
instance/spring/src/test/resources/canal.properties

@@ -51,4 +51,4 @@ canal.instance.global.mode = spring
 canal.instance.global.lazy = false
 #canal.instance.global.manager.address = 127.0.0.1:1099
 canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
-#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
+#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

+ 3 - 9
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/PrometheusService.java

@@ -22,7 +22,6 @@ public class PrometheusService implements CanalMetricsService {
     private static final Logger          logger          = LoggerFactory.getLogger(PrometheusService.class);
     private final CanalInstanceExports   instanceExports;
     private volatile boolean             running         = false;
-    private int                          port;
     private HTTPServer                   server;
     private final ClientInstanceProfiler clientProfiler;
 
@@ -42,9 +41,9 @@ public class PrometheusService implements CanalMetricsService {
     @Override
     public void initialize() {
         try {
-            logger.info("Start prometheus HTTPServer on port {}.", port);
-            //TODO 2.Https?
-            server = new HTTPServer(port);
+            //TODO 1.Configurable port
+            //TODO 2.Https
+            server = new HTTPServer(11112);
         } catch (IOException e) {
             logger.warn("Unable to start prometheus HTTPServer.", e);
             return;
@@ -113,9 +112,4 @@ public class PrometheusService implements CanalMetricsService {
         logger.info("Unregister metrics for destination {}.", instance.getDestination());
     }
 
-    @Override
-    public void setServerPort(int port) {
-        this.port = port;
-    }
-
 }

+ 1 - 0
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/ParserCollector.java

@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST;
 import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABELS_LIST;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 /**
  * @author Chuanyi Li

+ 2 - 4
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/StoreCollector.java

@@ -40,7 +40,7 @@ public class StoreCollector extends Collector implements InstanceRegistry {
     private static final String                             PRODUCE_MEM_HELP = "Produced mem bytes of canal instance";
     private static final String                             CONSUME_MEM_HELP = "Consumed mem bytes of canal instance";
     private final ConcurrentMap<String, StoreMetricsHolder> instances        = new ConcurrentHashMap<String, StoreMetricsHolder>();
-    private final List<String>                              storeLabelsList  = Arrays.asList(DEST, "batchMode", "size");
+    private final List<String>                              storeLabelsList  = Arrays.asList(DEST, "batchMode");
 
     private StoreCollector() {}
 
@@ -100,8 +100,7 @@ public class StoreCollector extends Collector implements InstanceRegistry {
         holder.putSeq = memStore.getPutSequence();
         holder.ackSeq = memStore.getAckSequence();
         holder.destLabelValues = Collections.singletonList(destination);
-        holder.size = memStore.getBufferSize();
-        holder.storeLabelValues = Arrays.asList(destination, holder.batchMode.name(), Integer.toString(holder.size));
+        holder.storeLabelValues = Arrays.asList(destination, memStore.getBatchMode().name());
         Preconditions.checkNotNull(holder.batchMode);
         Preconditions.checkNotNull(holder.putSeq);
         Preconditions.checkNotNull(holder.ackSeq);
@@ -129,7 +128,6 @@ public class StoreCollector extends Collector implements InstanceRegistry {
         private BatchMode    batchMode;
         private AtomicLong   putMemSize;
         private AtomicLong   ackMemSize;
-        private int          size;
         private List<String> destLabelValues;
         private List<String> storeLabelValues;
     }

+ 0 - 6
server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

@@ -44,7 +44,6 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
     private Map<String, CanalInstance> canalInstances;
     // private Map<ClientIdentity, Position> lastRollbackPostions;
     private CanalInstanceGenerator     canalInstanceGenerator;
-    private int                        metricsPort;
     private CanalMetricsService        metrics          = NopCanalMetricsService.NOP;
 
     private static class SingletonHolder {
@@ -65,7 +64,6 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
             super.start();
             // 如果存在provider,则启动metrics service
             loadCanalMetrics();
-            metrics.setServerPort(metricsPort);
             metrics.initialize();
             canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>() {
 
@@ -531,8 +529,4 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
         this.canalInstanceGenerator = canalInstanceGenerator;
     }
 
-    public void setMetricsPort(int metricsPort) {
-        this.metricsPort = metricsPort;
-    }
-
 }

+ 0 - 5
server/src/main/java/com/alibaba/otter/canal/spi/CanalMetricsService.java

@@ -39,9 +39,4 @@ public interface CanalMetricsService {
      */
     void unregister(CanalInstance instance);
 
-    /**
-     * @param port server port for pull
-     */
-    void setServerPort(int port);
-
 }

+ 0 - 5
server/src/main/java/com/alibaba/otter/canal/spi/NopCanalMetricsService.java

@@ -35,9 +35,4 @@ public class NopCanalMetricsService implements CanalMetricsService {
     public void unregister(CanalInstance instance) {
 
     }
-
-    @Override
-    public void setServerPort(int port) {
-
-    }
 }

+ 5 - 8
store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java

@@ -545,9 +545,6 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
     }
 
     // ================ setter / getter ==================
-    public int getBufferSize() {
-        return this.bufferSize;
-    }
 
     public void setBufferSize(int bufferSize) {
         this.bufferSize = bufferSize;
@@ -566,22 +563,22 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
     }
 
     public AtomicLong getPutSequence() {
-        return this.putSequence;
+        return putSequence;
     }
 
     public AtomicLong getAckSequence() {
-        return this.ackSequence;
+        return ackSequence;
     }
 
     public AtomicLong getPutMemSize() {
-        return this.putMemSize;
+        return putMemSize;
     }
 
     public AtomicLong getAckMemSize() {
-        return this.ackMemSize;
+        return ackMemSize;
     }
 
     public BatchMode getBatchMode() {
-        return this.batchMode;
+        return batchMode;
     }
 }