Browse Source

Calibrating default buffer size; Prometheus metrics for calibration

DarthSim 6 năm trước cách đây
mục cha
commit
9807728f9c
2 tập tin đã thay đổi với 57 bổ sung28 xóa
  1. 32 27
      bufpool.go
  2. 25 1
      prometheus.go

+ 32 - 27
bufpool.go

@@ -13,32 +13,27 @@ func (p intSlice) Less(i, j int) bool { return p[i] < p[j] }
 func (p intSlice) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }
 
 type bufPool struct {
-	mutex   sync.Mutex
-	name    string
-	size    int
-	buffers []*bytes.Buffer
+	name        string
+	defaultSize int
+	maxSize     int
+	buffers     []*bytes.Buffer
 
 	calls   intSlice
 	callInd int
 
-	throughput int
+	mutex sync.Mutex
 }
 
-func newBufPool(name string, n int, size int) *bufPool {
+func newBufPool(name string, n int, defaultSize int) *bufPool {
 	pool := bufPool{
-		name:    name,
-		size:    size,
-		buffers: make([]*bytes.Buffer, n),
-		calls:   make(intSlice, 1024),
+		name:        name,
+		defaultSize: defaultSize,
+		buffers:     make([]*bytes.Buffer, n),
+		calls:       make(intSlice, 1024),
 	}
 
 	for i := range pool.buffers {
-		pool.buffers[i] = pool.new()
-	}
-
-	if pool.size > 0 {
-		// Get the real cap
-		pool.size = pool.buffers[0].Cap()
+		pool.buffers[i] = new(bytes.Buffer)
 	}
 
 	return &pool
@@ -49,8 +44,8 @@ func (p *bufPool) new() *bytes.Buffer {
 
 	buf = new(bytes.Buffer)
 
-	if p.size > 0 {
-		buf.Grow(p.size)
+	if p.defaultSize > 0 {
+		buf.Grow(p.defaultSize)
 	}
 
 	return buf
@@ -63,20 +58,28 @@ func (p *bufPool) calibrateAndClean() {
 	score := p.calls[pos]
 
 	p.callInd = 0
-	p.throughput = 64
+	p.maxSize = 64
 
 	for {
-		if p.throughput > score {
+		if p.maxSize > score {
 			break
 		}
-		p.throughput <<= 1
+		p.maxSize <<= 1
 	}
 
+	p.defaultSize = maxInt(p.defaultSize, p.calls[0])
+	p.maxSize = maxInt(p.defaultSize, p.maxSize)
+
 	for i, buf := range p.buffers {
-		if buf != nil && buf.Cap() > p.throughput {
+		if buf != nil && buf.Cap() > p.maxSize {
 			p.buffers[i] = nil
 		}
 	}
+
+	if prometheusEnabled {
+		setPrometheusBufferDefaultSize(p.name, p.defaultSize)
+		setPrometheusBufferMaxSize(p.name, p.maxSize)
+	}
 }
 
 func (p *bufPool) Get(size int) *bytes.Buffer {
@@ -113,13 +116,15 @@ func (p *bufPool) Get(size int) *bytes.Buffer {
 		p.buffers[maxInd] = nil
 	} else {
 		// We didn't find buffers at all
-		return p.new()
+		buf = new(bytes.Buffer)
 	}
 
 	buf.Reset()
 
-	if size > 0 && size > buf.Cap() {
-		buf.Grow(size)
+	growSize := maxInt(size, p.defaultSize)
+
+	if growSize > buf.Cap() {
+		buf.Grow(growSize)
 	}
 
 	return buf
@@ -138,7 +143,7 @@ func (p *bufPool) Put(buf *bytes.Buffer) {
 		}
 	}
 
-	if p.throughput > 0 && buf.Cap() > p.throughput {
+	if p.maxSize > 0 && buf.Cap() > p.maxSize {
 		return
 	}
 
@@ -147,7 +152,7 @@ func (p *bufPool) Put(buf *bytes.Buffer) {
 			p.buffers[i] = buf
 
 			if prometheusEnabled && buf.Cap() > 0 {
-				observeBufferSize(p.name, buf.Cap())
+				observePrometheusBufferSize(p.name, buf.Cap())
 			}
 
 			return

+ 25 - 1
prometheus.go

@@ -17,6 +17,8 @@ var (
 	prometheusDownloadDuration   prometheus.Histogram
 	prometheusProcessingDuration prometheus.Histogram
 	prometheusBufferSize         *prometheus.HistogramVec
+	prometheusBufferDefaultSize  *prometheus.GaugeVec
+	prometheusBufferMaxSize      *prometheus.GaugeVec
 )
 
 func initPrometheus() {
@@ -54,6 +56,16 @@ func initPrometheus() {
 		Help: "A histogram of the buffer size in megabytes.",
 	}, []string{"type"})
 
+	prometheusBufferDefaultSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+		Name: "buffer_default_size_megabytes",
+		Help: "A gauge of the buffer default size in megabytes.",
+	}, []string{"type"})
+
+	prometheusBufferMaxSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+		Name: "buffer_max_size_megabytes",
+		Help: "A gauge of the buffer max size in megabytes.",
+	}, []string{"type"})
+
 	prometheus.MustRegister(
 		prometheusRequestsTotal,
 		prometheusErrorsTotal,
@@ -61,6 +73,8 @@ func initPrometheus() {
 		prometheusDownloadDuration,
 		prometheusProcessingDuration,
 		prometheusBufferSize,
+		prometheusBufferDefaultSize,
+		prometheusBufferMaxSize,
 	)
 
 	prometheusEnabled = true
@@ -89,7 +103,17 @@ func incrementPrometheusErrorsTotal(t string) {
 	prometheusErrorsTotal.With(prometheus.Labels{"type": t}).Inc()
 }
 
-func observeBufferSize(t string, cap int) {
+func observePrometheusBufferSize(t string, cap int) {
 	size := float64(cap) / 1024.0 / 1024.0
 	prometheusBufferSize.With(prometheus.Labels{"type": t}).Observe(size)
 }
+
+func setPrometheusBufferDefaultSize(t string, cap int) {
+	size := float64(cap) / 1024.0 / 1024.0
+	prometheusBufferDefaultSize.With(prometheus.Labels{"type": t}).Set(size)
+}
+
+func setPrometheusBufferMaxSize(t string, cap int) {
+	size := float64(cap) / 1024.0 / 1024.0
+	prometheusBufferMaxSize.With(prometheus.Labels{"type": t}).Set(size)
+}