Explorar el Código

Merge pull request #833 from lcybo/master

store增加bufferSize,metrics server端口可配置
agapple hace 6 años
padre
commit
72213bb80d

+ 1 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalConstants.java

@@ -15,6 +15,7 @@ public class CanalConstants {
     public static final String CANAL_ID                          = ROOT + "." + "id";
     public static final String CANAL_IP                          = ROOT + "." + "ip";
     public static final String CANAL_PORT                        = ROOT + "." + "port";
+    public static final String CANAL_METRICS_PULL_PORT           = ROOT + "." + "metrics.pull.port";
     public static final String CANAL_ZKSERVERS                   = ROOT + "." + "zkServers";
     public static final String CANAL_WITHOUT_NETTY               = ROOT + "." + "withoutNetty";
 

+ 8 - 0
deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java

@@ -98,6 +98,14 @@ public class CanalController {
         port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
         embededCanalServer = CanalServerWithEmbedded.instance();
         embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
+        try {
+            int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT));
+            embededCanalServer.setMetricsPort(metricsPort);
+        } catch (NumberFormatException e) {
+            logger.info("No valid metrics server port found, use default 11112.");
+            embededCanalServer.setMetricsPort(11112);
+        }
+
         String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
         if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
             canalServer = CanalServerWithNetty.instance();

+ 1 - 0
deployer/src/main/resources/canal.properties

@@ -4,6 +4,7 @@
 canal.id= 1
 canal.ip=
 canal.port=11111
+canal.metrics.pull.port=11112
 canal.zkServers=
 # flush data to zk
 canal.zookeeper.flush.period = 1000

+ 1 - 1
instance/spring/src/test/resources/canal.properties

@@ -51,4 +51,4 @@ canal.instance.global.mode = spring
 canal.instance.global.lazy = false
 #canal.instance.global.manager.address = 127.0.0.1:1099
 canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
-#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
+#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

+ 9 - 3
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/PrometheusService.java

@@ -22,6 +22,7 @@ public class PrometheusService implements CanalMetricsService {
     private static final Logger          logger          = LoggerFactory.getLogger(PrometheusService.class);
     private final CanalInstanceExports   instanceExports;
     private volatile boolean             running         = false;
+    private int                          port;
     private HTTPServer                   server;
     private final ClientInstanceProfiler clientProfiler;
 
@@ -41,9 +42,9 @@ public class PrometheusService implements CanalMetricsService {
     @Override
     public void initialize() {
         try {
-            //TODO 1.Configurable port
-            //TODO 2.Https
-            server = new HTTPServer(11112);
+            logger.info("Start prometheus HTTPServer on port {}.", port);
+            //TODO 2.Https?
+            server = new HTTPServer(port);
         } catch (IOException e) {
             logger.warn("Unable to start prometheus HTTPServer.", e);
             return;
@@ -112,4 +113,9 @@ public class PrometheusService implements CanalMetricsService {
         logger.info("Unregister metrics for destination {}.", instance.getDestination());
     }
 
+    @Override
+    public void setServerPort(int port) {
+        this.port = port;
+    }
+
 }

+ 0 - 1
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/ParserCollector.java

@@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST;
 import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABELS_LIST;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 /**
  * @author Chuanyi Li

+ 4 - 2
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/StoreCollector.java

@@ -40,7 +40,7 @@ public class StoreCollector extends Collector implements InstanceRegistry {
     private static final String                             PRODUCE_MEM_HELP = "Produced mem bytes of canal instance";
     private static final String                             CONSUME_MEM_HELP = "Consumed mem bytes of canal instance";
     private final ConcurrentMap<String, StoreMetricsHolder> instances        = new ConcurrentHashMap<String, StoreMetricsHolder>();
-    private final List<String>                              storeLabelsList  = Arrays.asList(DEST, "batchMode");
+    private final List<String>                              storeLabelsList  = Arrays.asList(DEST, "batchMode", "size");
 
     private StoreCollector() {}
 
@@ -100,7 +100,8 @@ public class StoreCollector extends Collector implements InstanceRegistry {
         holder.putSeq = memStore.getPutSequence();
         holder.ackSeq = memStore.getAckSequence();
         holder.destLabelValues = Collections.singletonList(destination);
-        holder.storeLabelValues = Arrays.asList(destination, memStore.getBatchMode().name());
+        holder.size = memStore.getBufferSize();
+        holder.storeLabelValues = Arrays.asList(destination, holder.batchMode.name(), Integer.toString(holder.size));
         Preconditions.checkNotNull(holder.batchMode);
         Preconditions.checkNotNull(holder.putSeq);
         Preconditions.checkNotNull(holder.ackSeq);
@@ -128,6 +129,7 @@ public class StoreCollector extends Collector implements InstanceRegistry {
         private BatchMode    batchMode;
         private AtomicLong   putMemSize;
         private AtomicLong   ackMemSize;
+        private int          size;
         private List<String> destLabelValues;
         private List<String> storeLabelValues;
     }

+ 6 - 0
server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java

@@ -44,6 +44,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
     private Map<String, CanalInstance> canalInstances;
     // private Map<ClientIdentity, Position> lastRollbackPostions;
     private CanalInstanceGenerator     canalInstanceGenerator;
+    private int                        metricsPort;
     private CanalMetricsService        metrics          = NopCanalMetricsService.NOP;
 
     private static class SingletonHolder {
@@ -64,6 +65,7 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
             super.start();
             // 如果存在provider,则启动metrics service
             loadCanalMetrics();
+            metrics.setServerPort(metricsPort);
             metrics.initialize();
             canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>() {
 
@@ -529,4 +531,8 @@ public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements C
         this.canalInstanceGenerator = canalInstanceGenerator;
     }
 
+    public void setMetricsPort(int metricsPort) {
+        this.metricsPort = metricsPort;
+    }
+
 }

+ 5 - 0
server/src/main/java/com/alibaba/otter/canal/spi/CanalMetricsService.java

@@ -39,4 +39,9 @@ public interface CanalMetricsService {
      */
     void unregister(CanalInstance instance);
 
+    /**
+     * @param port server port for pull
+     */
+    void setServerPort(int port);
+
 }

+ 5 - 0
server/src/main/java/com/alibaba/otter/canal/spi/NopCanalMetricsService.java

@@ -35,4 +35,9 @@ public class NopCanalMetricsService implements CanalMetricsService {
     public void unregister(CanalInstance instance) {
 
     }
+
+    @Override
+    public void setServerPort(int port) {
+
+    }
 }

+ 8 - 5
store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java

@@ -545,6 +545,9 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
     }
 
     // ================ setter / getter ==================
+    public int getBufferSize() {
+        return this.bufferSize;
+    }
 
     public void setBufferSize(int bufferSize) {
         this.bufferSize = bufferSize;
@@ -563,22 +566,22 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
     }
 
     public AtomicLong getPutSequence() {
-        return putSequence;
+        return this.putSequence;
     }
 
     public AtomicLong getAckSequence() {
-        return ackSequence;
+        return this.ackSequence;
     }
 
     public AtomicLong getPutMemSize() {
-        return putMemSize;
+        return this.putMemSize;
     }
 
     public AtomicLong getAckMemSize() {
-        return ackMemSize;
+        return this.ackMemSize;
     }
 
     public BatchMode getBatchMode() {
-        return batchMode;
+        return this.batchMode;
     }
 }