Browse Source

Merge pull request #123 from marianrh/master

Support limiting the number of parallel processing goroutines
Grigory Dryapak 5 years ago
parent
commit
339f5d48a4
2 changed files with 52 additions and 0 deletions
  1. 13 0
      utils.go
  2. 39 0
      utils_test.go

+ 13 - 0
utils.go

@@ -5,8 +5,17 @@ import (
 	"math"
 	"runtime"
 	"sync"
+	"sync/atomic"
 )
 
+var maxProcs int64
+
+// SetMaxProcs limits the number of concurrent processing goroutines to the given value.
+// A value <= 0 clears the limit.
+func SetMaxProcs(value int) {
+	atomic.StoreInt64(&maxProcs, int64(value))
+}
+
 // parallel processes the data in separate goroutines.
 func parallel(start, stop int, fn func(<-chan int)) {
 	count := stop - start
@@ -15,6 +24,10 @@ func parallel(start, stop int, fn func(<-chan int)) {
 	}
 
 	procs := runtime.GOMAXPROCS(0)
+	limit := int(atomic.LoadInt64(&maxProcs))
+	if procs > limit && limit > 0 {
+		procs = limit
+	}
 	if procs > count {
 		procs = count
 	}

+ 39 - 0
utils_test.go

@@ -4,6 +4,7 @@ import (
 	"image"
 	"math"
 	"runtime"
+	"sync/atomic"
 	"testing"
 )
 
@@ -49,6 +50,44 @@ func testParallelN(n, procs int) bool {
 	return true
 }
 
+func TestParallelMaxProcs(t *testing.T) {
+	for _, n := range []int{0, 1, 10, 100, 1000} {
+		for _, p := range []int{1, 2, 4, 8, 16, 100} {
+			if !testParallelMaxProcsN(n, p) {
+				t.Fatalf("test [parallel max procs %d %d] failed", n, p)
+			}
+		}
+	}
+}
+
+func testParallelMaxProcsN(n, procs int) bool {
+	data := make([]bool, n)
+	SetMaxProcs(procs)
+	parallel(0, n, func(is <-chan int) {
+		for i := range is {
+			data[i] = true
+		}
+	})
+	SetMaxProcs(0)
+	for i := 0; i < n; i++ {
+		if !data[i] {
+			return false
+		}
+	}
+	return true
+}
+
+func TestSetMaxProcs(t *testing.T) {
+	for _, p := range []int{-1, 0, 10} {
+		SetMaxProcs(p)
+		if int(atomic.LoadInt64(&maxProcs)) != p {
+			t.Fatalf("test [set max procs %d] failed", p)
+		}
+	}
+
+	SetMaxProcs(0)
+}
+
 func TestClamp(t *testing.T) {
 	testCases := []struct {
 		f float64