Use 'minio/pkg/sync/errgroup' and 'minio/pkg/workers' (#17069)

This commit is contained in:
Praveen raj Mani 2023-04-26 11:27:40 +05:30 committed by GitHub
parent b1f3935c5b
commit 72802a5972
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 28 additions and 424 deletions

View File

@ -32,7 +32,7 @@ import (
"github.com/minio/madmin-go/v2"
minio "github.com/minio/minio-go/v7"
"github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/pkg/sync/errgroup"
)
func runAllIAMConcurrencyTests(suite *TestSuiteIAM, c *check) {

View File

@ -48,11 +48,11 @@ import (
"github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/workers"
"github.com/minio/pkg/console"
"github.com/minio/pkg/env"
iampolicy "github.com/minio/pkg/iam/policy"
"github.com/minio/pkg/wildcard"
"github.com/minio/pkg/workers"
"gopkg.in/yaml.v2"
)

View File

@ -38,9 +38,9 @@ import (
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/kms"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/workers"
"github.com/minio/pkg/env"
"github.com/minio/pkg/wildcard"
"github.com/minio/pkg/workers"
)
// keyrotate:

View File

@ -51,9 +51,9 @@ import (
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/kms"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/pkg/bucket/policy"
iampolicy "github.com/minio/pkg/iam/policy"
"github.com/minio/pkg/sync/errgroup"
)
const (

View File

@ -41,8 +41,8 @@ import (
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/s3select"
"github.com/minio/minio/internal/workers"
"github.com/minio/pkg/env"
"github.com/minio/pkg/workers"
)
const (

View File

@ -36,8 +36,8 @@ import (
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/kms"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/pkg/bucket/policy"
"github.com/minio/pkg/sync/errgroup"
)
// BucketMetadataSys captures all bucket metadata for a given cluster.

View File

@ -33,8 +33,8 @@ import (
"github.com/minio/minio/internal/disk"
"github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
xnet "github.com/minio/pkg/net"
"github.com/minio/pkg/sync/errgroup"
"github.com/minio/pkg/wildcard"
)

View File

@ -23,7 +23,7 @@ import (
"sync"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/pkg/sync/errgroup"
)
func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) {

View File

@ -28,7 +28,7 @@ import (
"github.com/minio/madmin-go/v2"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/pkg/sync/errgroup"
)
const reservedMetadataPrefixLowerDataShardFix = ReservedMetadataPrefixLower + "data-shard-fix"

View File

@ -25,7 +25,7 @@ import (
"io"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/pkg/sync/errgroup"
)
// figure out the most commonVersions across disk that satisfies

View File

@ -33,7 +33,7 @@ import (
"github.com/minio/minio/internal/hash/sha256"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/pkg/sync/errgroup"
"github.com/minio/sio"
)

View File

@ -37,8 +37,8 @@ import (
"github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/pkg/mimedb"
"github.com/minio/pkg/sync/errgroup"
)
func (er erasureObjects) getUploadIDDir(bucket, object, uploadID string) string {

View File

@ -42,8 +42,8 @@ import (
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/pkg/mimedb"
"github.com/minio/pkg/sync/errgroup"
"github.com/minio/pkg/wildcard"
uatomic "go.uber.org/atomic"
)

View File

@ -39,7 +39,7 @@ import (
"github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/internal/config/storageclass"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/pkg/sync/errgroup"
"github.com/minio/pkg/wildcard"
)

View File

@ -39,8 +39,8 @@ import (
"github.com/minio/minio/internal/bpool"
"github.com/minio/minio/internal/dsync"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/pkg/console"
"github.com/minio/pkg/sync/errgroup"
)
// setsDsyncLockers is encapsulated type for Close()

View File

@ -32,7 +32,7 @@ import (
"github.com/minio/minio/internal/bpool"
"github.com/minio/minio/internal/dsync"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/pkg/sync/errgroup"
)
// list all errors that can be ignore in a bucket operation.

View File

@ -33,7 +33,7 @@ import (
"github.com/minio/minio/internal/config/storageclass"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/pkg/sync/errgroup"
)
const (

View File

@ -32,8 +32,8 @@ import (
"github.com/minio/madmin-go/v2"
"github.com/minio/minio/internal/bucket/bandwidth"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
xnet "github.com/minio/pkg/net"
"github.com/minio/pkg/sync/errgroup"
)
// This file contains peer related notifications. For sending notifications to

View File

@ -29,8 +29,8 @@ import (
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/rest"
"github.com/minio/minio/internal/sync/errgroup"
xnet "github.com/minio/pkg/net"
"github.com/minio/pkg/sync/errgroup"
)
var errPeerOffline = errors.New("peer is offline")

View File

@ -25,8 +25,8 @@ import (
"sort"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/mux"
"github.com/minio/pkg/sync/errgroup"
)
const (

4
go.mod
View File

@ -50,7 +50,7 @@ require (
github.com/minio/madmin-go/v2 v2.0.20
github.com/minio/minio-go/v7 v7.0.52
github.com/minio/mux v1.9.0
github.com/minio/pkg v1.6.6-0.20230330040824-5db111e5f63c
github.com/minio/pkg v1.7.0
github.com/minio/selfupdate v0.6.0
github.com/minio/sha256-simd v1.0.0
github.com/minio/simdjson-go v0.4.5
@ -66,7 +66,7 @@ require (
github.com/philhofer/fwd v1.1.2
github.com/pierrec/lz4 v2.6.1+incompatible
github.com/pkg/errors v0.9.1
github.com/pkg/sftp v1.10.1
github.com/pkg/sftp v1.13.1
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.42.0

11
go.sum
View File

@ -778,8 +778,6 @@ github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLT
github.com/minio/kes-go v0.1.0 h1:h201DyOYP5sTqajkxFGxmXz/kPbT8HQNX1uh3Yx2PFc=
github.com/minio/kes-go v0.1.0/go.mod h1:VorHLaIYis9/MxAHAtXN4d8PUMNKhIxTIlvFt0hBOEo=
github.com/minio/madmin-go v1.6.6/go.mod h1:ATvkBOLiP3av4D++2v1UEHC/QzsGtgXD5kYvvRYzdKs=
github.com/minio/madmin-go/v2 v2.0.19 h1:XznxdMVCTyr0A88JrZFhdxWY8KLfJcrs0TTmFiE9cc8=
github.com/minio/madmin-go/v2 v2.0.19/go.mod h1:8bL1RMNkblIENFSgGYjeHrzUx9PxROb7OqfNuMU9ivE=
github.com/minio/madmin-go/v2 v2.0.20 h1:EO2IIQsnaVM3ki/ONcW0Ry4UpDn5aa+/S1kLkHuDBs8=
github.com/minio/madmin-go/v2 v2.0.20/go.mod h1:8bL1RMNkblIENFSgGYjeHrzUx9PxROb7OqfNuMU9ivE=
github.com/minio/mc v0.0.0-20230411170328-83336dfab325 h1:da5I7G0Va6UnqoxzPHus7zmqo2pEQnltI09/XKNvHP4=
@ -793,8 +791,8 @@ github.com/minio/minio-go/v7 v7.0.52/go.mod h1:IbbodHyjUAguneyucUaahv+VMNs/EOTV9
github.com/minio/mux v1.9.0 h1:dWafQFyEfGhJvK6AwLOt83bIG5bxKxKJnKMCi0XAaoA=
github.com/minio/mux v1.9.0/go.mod h1:1pAare17ZRL5GpmNL+9YmqHoWnLmMZF9C/ioUCfy0BQ=
github.com/minio/pkg v1.5.4/go.mod h1:2MOaRFdmFKULD+uOLc3qHLGTQTuxCNPKNPfLBTxC8CA=
github.com/minio/pkg v1.6.6-0.20230330040824-5db111e5f63c h1:Ukw0+d0T/+9lserJfodr4HGIIb3hLkt8tlgMh2vUfZg=
github.com/minio/pkg v1.6.6-0.20230330040824-5db111e5f63c/go.mod h1:0iX1IuJGSCnMvIvrEJauk1GgQSX9JdU6Kh0P3EQRGkI=
github.com/minio/pkg v1.7.0 h1:tHPwt8eA4xd8mOUEGAripuHyePh8N3VavvZ5Lg4YMqk=
github.com/minio/pkg v1.7.0/go.mod h1:0iX1IuJGSCnMvIvrEJauk1GgQSX9JdU6Kh0P3EQRGkI=
github.com/minio/selfupdate v0.6.0 h1:i76PgT0K5xO9+hjzKcacQtO7+MjJ4JKA8Ak8XQ9DDwU=
github.com/minio/selfupdate v0.6.0/go.mod h1:bO02GTIPCMQFTEvE5h4DjYB58bCoZ35XLeBf0buTDdM=
github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
@ -899,8 +897,9 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.6.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18=
github.com/pkg/sftp v1.10.1 h1:VasscCm72135zRysgrJDKsntdmPN+OuU3+nnHYA9wyc=
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
github.com/pkg/sftp v1.13.1 h1:I2qBYMChEhIjOgazfJmV3/mZM256btk6wkCDRmW7JYs=
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
github.com/pkg/xattr v0.4.9 h1:5883YPCtkSd8LFbs13nXplj9g9tlrwoJRjgpgMu1/fE=
github.com/pkg/xattr v0.4.9/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@ -1144,6 +1143,7 @@ golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
@ -1365,6 +1365,7 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210420072515-93ed5bcd2bfe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

View File

@ -1,133 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package errgroup
import (
"context"
"sync"
"sync/atomic"
)
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group can be used if errors should not be tracked.
type Group struct {
firstErr int64 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
wg sync.WaitGroup
bucket chan struct{}
errs []error
cancel context.CancelFunc
ctxCancel <-chan struct{} // nil if no context.
ctxErr func() error
}
// WithNErrs returns a new Group with length of errs slice upto nerrs,
// upon Wait() errors are returned collected from all tasks.
func WithNErrs(nerrs int) *Group {
return &Group{errs: make([]error, nerrs), firstErr: -1}
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the slice of errors from all function calls.
func (g *Group) Wait() []error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.errs
}
// WaitErr blocks until all function calls from the Go method have returned, then
// returns the first error returned.
func (g *Group) WaitErr() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
if g.firstErr >= 0 && len(g.errs) > int(g.firstErr) {
// len(g.errs) > int(g.firstErr) is for then used uninitialized.
return g.errs[g.firstErr]
}
return nil
}
// WithConcurrency allows to limit the concurrency of the group.
// This must be called before starting any async processes.
// There is no order to which functions are allowed to run.
// If n <= 0 no concurrency limits are enforced.
// g is modified and returned as well.
func (g *Group) WithConcurrency(n int) *Group {
if n <= 0 {
g.bucket = nil
return g
}
// Fill bucket with tokens
g.bucket = make(chan struct{}, n)
for i := 0; i < n; i++ {
g.bucket <- struct{}{}
}
return g
}
// WithCancelOnError will return a context that is canceled
// as soon as an error occurs.
// The returned CancelFunc must always be called similar to context.WithCancel.
// If the supplied context is canceled any goroutines waiting for execution are also canceled.
func (g *Group) WithCancelOnError(ctx context.Context) (context.Context, context.CancelFunc) {
ctx, g.cancel = context.WithCancel(ctx)
g.ctxCancel = ctx.Done()
g.ctxErr = ctx.Err
return ctx, g.cancel
}
// Go calls the given function in a new goroutine.
//
// The errors will be collected in errs slice and returned by Wait().
func (g *Group) Go(f func() error, index int) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if g.bucket != nil {
// Wait for token
select {
case <-g.bucket:
defer func() {
// Put back token..
g.bucket <- struct{}{}
}()
case <-g.ctxCancel:
if len(g.errs) > index {
atomic.CompareAndSwapInt64(&g.firstErr, -1, int64(index))
g.errs[index] = g.ctxErr()
}
return
}
}
if err := f(); err != nil {
if len(g.errs) > index {
atomic.CompareAndSwapInt64(&g.firstErr, -1, int64(index))
g.errs[index] = err
}
if g.cancel != nil {
g.cancel()
}
}
}()
}

View File

@ -1,53 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package errgroup
import (
"fmt"
"reflect"
"testing"
)
func TestGroupWithNErrs(t *testing.T) {
err1 := fmt.Errorf("errgroup_test: 1")
err2 := fmt.Errorf("errgroup_test: 2")
cases := []struct {
errs []error
}{
{errs: []error{nil}},
{errs: []error{err1}},
{errs: []error{err1, nil}},
{errs: []error{err1, nil, err2}},
}
for j, tc := range cases {
t.Run(fmt.Sprintf("Test%d", j+1), func(t *testing.T) {
g := WithNErrs(len(tc.errs))
for i, err := range tc.errs {
err := err
g.Go(func() error { return err }, i)
}
gotErrs := g.Wait()
if !reflect.DeepEqual(gotErrs, tc.errs) {
t.Errorf("Expected %#v, got %#v", tc.errs, gotErrs)
}
})
}
}

View File

@ -1,63 +0,0 @@
// Copyright (c) 2022-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package workers
import (
"errors"
"sync"
)
// Workers provides a bounded semaphore with the ability to wait until all
// concurrent jobs finish.
type Workers struct {
wg sync.WaitGroup
queue chan struct{}
}
// New creates a Workers object which allows up to n jobs to proceed
// concurrently. n must be > 0.
func New(n int) (*Workers, error) {
if n <= 0 {
return nil, errors.New("n must be > 0")
}
queue := make(chan struct{}, n)
for i := 0; i < n; i++ {
queue <- struct{}{}
}
return &Workers{
queue: queue,
}, nil
}
// Take is how a job (goroutine) can Take its turn.
func (jt *Workers) Take() {
jt.wg.Add(1)
<-jt.queue
}
// Give is how a job (goroutine) can give back its turn once done.
func (jt *Workers) Give() {
jt.queue <- struct{}{}
jt.wg.Done()
}
// Wait waits for all ongoing concurrent jobs to complete
func (jt *Workers) Wait() {
jt.wg.Wait()
}

View File

@ -1,148 +0,0 @@
// Copyright (c) 2022-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package workers
import (
"fmt"
"sync"
"testing"
)
func TestWorkers(t *testing.T) {
tests := []struct {
n int
jobs int
mustFail bool
}{
{
n: 0,
jobs: 5,
mustFail: true,
},
{
n: -1,
jobs: 5,
mustFail: true,
},
{
n: 1,
jobs: 5,
},
{
n: 2,
jobs: 5,
},
{
n: 5,
jobs: 10,
},
{
n: 10,
jobs: 5,
},
}
testFn := func(n, jobs int, mustFail bool) {
var mu sync.Mutex
var jobsDone int
// Create workers for n concurrent workers
jt, err := New(n)
if err == nil && mustFail {
t.Fatal("Expected test to return error")
}
if err != nil && mustFail {
return
}
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
for i := 0; i < jobs; i++ {
jt.Take()
go func() { // Launch a worker after acquiring a token
defer jt.Give() // Give token back once done
mu.Lock()
jobsDone++
mu.Unlock()
}()
}
jt.Wait() // Wait for all workers to complete
if jobsDone != jobs {
t.Fatalf("Expected %d jobs to be done but only %d were done", jobs, jobsDone)
}
}
for i, test := range tests {
t.Run(fmt.Sprintf("test-%d", i), func(t *testing.T) {
testFn(test.n, test.jobs, test.mustFail)
})
}
// Verify that workers can be reused after full drain
t.Run("test-workers-reuse", func(t *testing.T) {
var mu sync.Mutex
jt, _ := New(5)
for reuse := 0; reuse < 3; reuse++ {
var jobsDone int
for i := 0; i < 10; i++ {
jt.Take()
go func() {
defer jt.Give()
mu.Lock()
jobsDone++
mu.Unlock()
}()
}
jt.Wait()
if jobsDone != 10 {
t.Fatalf("Expected %d jobs to be complete but only %d were", 10, jobsDone)
}
}
})
}
func benchmarkWorkers(b *testing.B, n, jobs int) {
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var mu sync.Mutex
var jobsDone int
jt, _ := New(n)
for i := 0; i < jobs; i++ {
jt.Take()
go func() {
defer jt.Give()
mu.Lock()
jobsDone++
mu.Unlock()
}()
}
jt.Wait()
if jobsDone != jobs {
b.Fail()
}
}
})
}
func BenchmarkWorkers_N5_J10(b *testing.B) {
benchmarkWorkers(b, 5, 10)
}
func BenchmarkWorkers_N5_J100(b *testing.B) {
benchmarkWorkers(b, 5, 100)
}