Prechádzať zdrojové kódy

Add size label for store metrics.

Chuanyi Li 7 rokov pred
rodič
commit
a3bf1d59e6

+ 1 - 0
deployer/src/main/resources/canal.properties

@@ -4,6 +4,7 @@
 canal.id= 1
 canal.ip=
 canal.port=11111
+canal.metrics.pull.port=11112
 canal.zkServers=
 # flush data to zk
 canal.zookeeper.flush.period = 1000

+ 0 - 1
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/ParserCollector.java

@@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST;
 import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABELS_LIST;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 /**
  * @author Chuanyi Li

+ 4 - 2
prometheus/src/main/java/com/alibaba/otter/canal/prometheus/impl/StoreCollector.java

@@ -40,7 +40,7 @@ public class StoreCollector extends Collector implements InstanceRegistry {
     private static final String                             PRODUCE_MEM_HELP = "Produced mem bytes of canal instance";
     private static final String                             CONSUME_MEM_HELP = "Consumed mem bytes of canal instance";
     private final ConcurrentMap<String, StoreMetricsHolder> instances        = new ConcurrentHashMap<String, StoreMetricsHolder>();
-    private final List<String>                              storeLabelsList  = Arrays.asList(DEST, "batchMode");
+    private final List<String>                              storeLabelsList  = Arrays.asList(DEST, "batchMode", "size");
 
     private StoreCollector() {}
 
@@ -100,7 +100,8 @@ public class StoreCollector extends Collector implements InstanceRegistry {
         holder.putSeq = memStore.getPutSequence();
         holder.ackSeq = memStore.getAckSequence();
         holder.destLabelValues = Collections.singletonList(destination);
-        holder.storeLabelValues = Arrays.asList(destination, memStore.getBatchMode().name());
+        holder.size = memStore.getBufferSize();
+        holder.storeLabelValues = Arrays.asList(destination, holder.batchMode.name(), Integer.toString(holder.size));
         Preconditions.checkNotNull(holder.batchMode);
         Preconditions.checkNotNull(holder.putSeq);
         Preconditions.checkNotNull(holder.ackSeq);
@@ -128,6 +129,7 @@ public class StoreCollector extends Collector implements InstanceRegistry {
         private BatchMode    batchMode;
         private AtomicLong   putMemSize;
         private AtomicLong   ackMemSize;
+        private int          size;
         private List<String> destLabelValues;
         private List<String> storeLabelValues;
     }

+ 8 - 5
store/src/main/java/com/alibaba/otter/canal/store/memory/MemoryEventStoreWithBuffer.java

@@ -545,6 +545,9 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
     }
 
     // ================ setter / getter ==================
+    public int getBufferSize() {
+        return this.bufferSize;
+    }
 
     public void setBufferSize(int bufferSize) {
         this.bufferSize = bufferSize;
@@ -563,22 +566,22 @@ public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge imple
     }
 
     public AtomicLong getPutSequence() {
-        return putSequence;
+        return this.putSequence;
     }
 
     public AtomicLong getAckSequence() {
-        return ackSequence;
+        return this.ackSequence;
     }
 
     public AtomicLong getPutMemSize() {
-        return putMemSize;
+        return this.putMemSize;
     }
 
     public AtomicLong getAckMemSize() {
-        return ackMemSize;
+        return this.ackMemSize;
     }
 
     public BatchMode getBatchMode() {
-        return batchMode;
+        return this.batchMode;
     }
 }