From 72802a5972e3a5b3db298590f83eeb97a3f45fff Mon Sep 17 00:00:00 2001 From: Praveen raj Mani Date: Wed, 26 Apr 2023 11:27:40 +0530 Subject: [PATCH] Use 'minio/pkg/sync/errgroup' and 'minio/pkg/workers' (#17069) --- cmd/admin-handlers-users-race_test.go | 2 +- cmd/batch-handlers.go | 2 +- cmd/batch-rotate.go | 2 +- cmd/bucket-handlers.go | 2 +- cmd/bucket-lifecycle.go | 2 +- cmd/bucket-metadata-sys.go | 2 +- cmd/disk-cache.go | 2 +- cmd/erasure-common.go | 2 +- cmd/erasure-healing.go | 2 +- cmd/erasure-metadata-utils.go | 2 +- cmd/erasure-metadata.go | 2 +- cmd/erasure-multipart.go | 2 +- cmd/erasure-object.go | 2 +- cmd/erasure-server-pool.go | 2 +- cmd/erasure-sets.go | 2 +- cmd/erasure.go | 2 +- cmd/format-erasure.go | 2 +- cmd/notification.go | 2 +- cmd/peer-s3-client.go | 2 +- cmd/peer-s3-server.go | 2 +- go.mod | 4 +- go.sum | 11 +- internal/sync/errgroup/errgroup.go | 133 --------------------- internal/sync/errgroup/errgroup_test.go | 53 --------- internal/workers/workers.go | 63 ---------- internal/workers/workers_test.go | 148 ------------------------ 26 files changed, 28 insertions(+), 424 deletions(-) delete mode 100644 internal/sync/errgroup/errgroup.go delete mode 100644 internal/sync/errgroup/errgroup_test.go delete mode 100644 internal/workers/workers.go delete mode 100644 internal/workers/workers_test.go diff --git a/cmd/admin-handlers-users-race_test.go b/cmd/admin-handlers-users-race_test.go index a1d4b8b61..7708d46ca 100644 --- a/cmd/admin-handlers-users-race_test.go +++ b/cmd/admin-handlers-users-race_test.go @@ -32,7 +32,7 @@ import ( "github.com/minio/madmin-go/v2" minio "github.com/minio/minio-go/v7" - "github.com/minio/minio/internal/sync/errgroup" + "github.com/minio/pkg/sync/errgroup" ) func runAllIAMConcurrencyTests(suite *TestSuiteIAM, c *check) { diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index f68e5e2c9..08eb2ef8c 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -48,11 +48,11 @@ import ( "github.com/minio/minio/internal/hash" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/workers" "github.com/minio/pkg/console" "github.com/minio/pkg/env" iampolicy "github.com/minio/pkg/iam/policy" "github.com/minio/pkg/wildcard" + "github.com/minio/pkg/workers" "gopkg.in/yaml.v2" ) diff --git a/cmd/batch-rotate.go b/cmd/batch-rotate.go index c1c2c25ea..dd364b3ad 100644 --- a/cmd/batch-rotate.go +++ b/cmd/batch-rotate.go @@ -38,9 +38,9 @@ import ( xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/kms" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/workers" "github.com/minio/pkg/env" "github.com/minio/pkg/wildcard" + "github.com/minio/pkg/workers" ) // keyrotate: diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index d96593ae5..f35b9f040 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -51,9 +51,9 @@ import ( xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/kms" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/sync/errgroup" "github.com/minio/pkg/bucket/policy" iampolicy "github.com/minio/pkg/iam/policy" + "github.com/minio/pkg/sync/errgroup" ) const ( diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index 21358a574..b20740b45 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -41,8 +41,8 @@ import ( xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/s3select" - "github.com/minio/minio/internal/workers" "github.com/minio/pkg/env" + "github.com/minio/pkg/workers" ) const ( diff --git a/cmd/bucket-metadata-sys.go b/cmd/bucket-metadata-sys.go index f174e32dc..98c19d813 100644 --- a/cmd/bucket-metadata-sys.go +++ b/cmd/bucket-metadata-sys.go @@ -36,8 +36,8 @@ import ( "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/kms" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/sync/errgroup" "github.com/minio/pkg/bucket/policy" + "github.com/minio/pkg/sync/errgroup" ) // BucketMetadataSys captures all bucket metadata for a given cluster. diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index 024784aa1..598d98d0f 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -33,8 +33,8 @@ import ( "github.com/minio/minio/internal/disk" "github.com/minio/minio/internal/hash" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/sync/errgroup" xnet "github.com/minio/pkg/net" + "github.com/minio/pkg/sync/errgroup" "github.com/minio/pkg/wildcard" ) diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index 16c82a27e..eba322800 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -23,7 +23,7 @@ import ( "sync" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/sync/errgroup" + "github.com/minio/pkg/sync/errgroup" ) func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) { diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index c1e789df1..a79d7a804 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -28,7 +28,7 @@ import ( "github.com/minio/madmin-go/v2" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/sync/errgroup" + "github.com/minio/pkg/sync/errgroup" ) const reservedMetadataPrefixLowerDataShardFix = ReservedMetadataPrefixLower + "data-shard-fix" diff --git a/cmd/erasure-metadata-utils.go b/cmd/erasure-metadata-utils.go index e82432bc3..fae041e2b 100644 --- a/cmd/erasure-metadata-utils.go +++ b/cmd/erasure-metadata-utils.go @@ -25,7 +25,7 @@ import ( "io" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/sync/errgroup" + "github.com/minio/pkg/sync/errgroup" ) // figure out the most commonVersions across disk that satisfies diff --git a/cmd/erasure-metadata.go b/cmd/erasure-metadata.go index 67b3a92c8..2a13553e2 100644 --- a/cmd/erasure-metadata.go +++ b/cmd/erasure-metadata.go @@ -33,7 +33,7 @@ import ( "github.com/minio/minio/internal/hash/sha256" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/sync/errgroup" + "github.com/minio/pkg/sync/errgroup" "github.com/minio/sio" ) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 6310550c0..9238c8302 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -37,8 +37,8 @@ import ( "github.com/minio/minio/internal/hash" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/sync/errgroup" "github.com/minio/pkg/mimedb" + "github.com/minio/pkg/sync/errgroup" ) func (er erasureObjects) getUploadIDDir(bucket, object, uploadID string) string { diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 094cae7e9..9399ec6e6 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -42,8 +42,8 @@ import ( xhttp "github.com/minio/minio/internal/http" xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/sync/errgroup" "github.com/minio/pkg/mimedb" + "github.com/minio/pkg/sync/errgroup" "github.com/minio/pkg/wildcard" uatomic "go.uber.org/atomic" ) diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index f83bbf739..0dc3dcf7f 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -39,7 +39,7 @@ import ( "github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio/internal/config/storageclass" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/sync/errgroup" + "github.com/minio/pkg/sync/errgroup" "github.com/minio/pkg/wildcard" ) diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index db9738cec..1deca76a7 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -39,8 +39,8 @@ import ( "github.com/minio/minio/internal/bpool" "github.com/minio/minio/internal/dsync" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/sync/errgroup" "github.com/minio/pkg/console" + "github.com/minio/pkg/sync/errgroup" ) // setsDsyncLockers is encapsulated type for Close() diff --git a/cmd/erasure.go b/cmd/erasure.go index 18b095ac9..c393b546f 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -32,7 +32,7 @@ import ( "github.com/minio/minio/internal/bpool" "github.com/minio/minio/internal/dsync" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/sync/errgroup" + "github.com/minio/pkg/sync/errgroup" ) // list all errors that can be ignore in a bucket operation. diff --git a/cmd/format-erasure.go b/cmd/format-erasure.go index 0702d244c..874737cb3 100644 --- a/cmd/format-erasure.go +++ b/cmd/format-erasure.go @@ -33,7 +33,7 @@ import ( "github.com/minio/minio/internal/config/storageclass" xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/sync/errgroup" + "github.com/minio/pkg/sync/errgroup" ) const ( diff --git a/cmd/notification.go b/cmd/notification.go index 8b0ed46cf..48c2a0121 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -32,8 +32,8 @@ import ( "github.com/minio/madmin-go/v2" "github.com/minio/minio/internal/bucket/bandwidth" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/sync/errgroup" xnet "github.com/minio/pkg/net" + "github.com/minio/pkg/sync/errgroup" ) // This file contains peer related notifications. For sending notifications to diff --git a/cmd/peer-s3-client.go b/cmd/peer-s3-client.go index b3b9c81d9..0c5caea72 100644 --- a/cmd/peer-s3-client.go +++ b/cmd/peer-s3-client.go @@ -29,8 +29,8 @@ import ( xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/rest" - "github.com/minio/minio/internal/sync/errgroup" xnet "github.com/minio/pkg/net" + "github.com/minio/pkg/sync/errgroup" ) var errPeerOffline = errors.New("peer is offline") diff --git a/cmd/peer-s3-server.go b/cmd/peer-s3-server.go index bc1ebc3d3..9155142a2 100644 --- a/cmd/peer-s3-server.go +++ b/cmd/peer-s3-server.go @@ -25,8 +25,8 @@ import ( "sort" "github.com/minio/minio/internal/logger" - "github.com/minio/minio/internal/sync/errgroup" "github.com/minio/mux" + "github.com/minio/pkg/sync/errgroup" ) const ( diff --git a/go.mod b/go.mod index 773d8b90b..4cb1652cf 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/minio/madmin-go/v2 v2.0.20 github.com/minio/minio-go/v7 v7.0.52 github.com/minio/mux v1.9.0 - github.com/minio/pkg v1.6.6-0.20230330040824-5db111e5f63c + github.com/minio/pkg v1.7.0 github.com/minio/selfupdate v0.6.0 github.com/minio/sha256-simd v1.0.0 github.com/minio/simdjson-go v0.4.5 @@ -66,7 +66,7 @@ require ( github.com/philhofer/fwd v1.1.2 github.com/pierrec/lz4 v2.6.1+incompatible github.com/pkg/errors v0.9.1 - github.com/pkg/sftp v1.10.1 + github.com/pkg/sftp v1.13.1 github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_model v0.3.0 github.com/prometheus/common v0.42.0 diff --git a/go.sum b/go.sum index 1e31a8c2c..faddfc3d1 100644 --- a/go.sum +++ b/go.sum @@ -778,8 +778,6 @@ github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLT github.com/minio/kes-go v0.1.0 h1:h201DyOYP5sTqajkxFGxmXz/kPbT8HQNX1uh3Yx2PFc= github.com/minio/kes-go v0.1.0/go.mod h1:VorHLaIYis9/MxAHAtXN4d8PUMNKhIxTIlvFt0hBOEo= github.com/minio/madmin-go v1.6.6/go.mod h1:ATvkBOLiP3av4D++2v1UEHC/QzsGtgXD5kYvvRYzdKs= -github.com/minio/madmin-go/v2 v2.0.19 h1:XznxdMVCTyr0A88JrZFhdxWY8KLfJcrs0TTmFiE9cc8= -github.com/minio/madmin-go/v2 v2.0.19/go.mod h1:8bL1RMNkblIENFSgGYjeHrzUx9PxROb7OqfNuMU9ivE= github.com/minio/madmin-go/v2 v2.0.20 h1:EO2IIQsnaVM3ki/ONcW0Ry4UpDn5aa+/S1kLkHuDBs8= github.com/minio/madmin-go/v2 v2.0.20/go.mod h1:8bL1RMNkblIENFSgGYjeHrzUx9PxROb7OqfNuMU9ivE= github.com/minio/mc v0.0.0-20230411170328-83336dfab325 h1:da5I7G0Va6UnqoxzPHus7zmqo2pEQnltI09/XKNvHP4= @@ -793,8 +791,8 @@ github.com/minio/minio-go/v7 v7.0.52/go.mod h1:IbbodHyjUAguneyucUaahv+VMNs/EOTV9 github.com/minio/mux v1.9.0 h1:dWafQFyEfGhJvK6AwLOt83bIG5bxKxKJnKMCi0XAaoA= github.com/minio/mux v1.9.0/go.mod h1:1pAare17ZRL5GpmNL+9YmqHoWnLmMZF9C/ioUCfy0BQ= github.com/minio/pkg v1.5.4/go.mod h1:2MOaRFdmFKULD+uOLc3qHLGTQTuxCNPKNPfLBTxC8CA= -github.com/minio/pkg v1.6.6-0.20230330040824-5db111e5f63c h1:Ukw0+d0T/+9lserJfodr4HGIIb3hLkt8tlgMh2vUfZg= -github.com/minio/pkg v1.6.6-0.20230330040824-5db111e5f63c/go.mod h1:0iX1IuJGSCnMvIvrEJauk1GgQSX9JdU6Kh0P3EQRGkI= +github.com/minio/pkg v1.7.0 h1:tHPwt8eA4xd8mOUEGAripuHyePh8N3VavvZ5Lg4YMqk= +github.com/minio/pkg v1.7.0/go.mod h1:0iX1IuJGSCnMvIvrEJauk1GgQSX9JdU6Kh0P3EQRGkI= github.com/minio/selfupdate v0.6.0 h1:i76PgT0K5xO9+hjzKcacQtO7+MjJ4JKA8Ak8XQ9DDwU= github.com/minio/selfupdate v0.6.0/go.mod h1:bO02GTIPCMQFTEvE5h4DjYB58bCoZ35XLeBf0buTDdM= github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= @@ -899,8 +897,9 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.6.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18= -github.com/pkg/sftp v1.10.1 h1:VasscCm72135zRysgrJDKsntdmPN+OuU3+nnHYA9wyc= github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI= +github.com/pkg/sftp v1.13.1 h1:I2qBYMChEhIjOgazfJmV3/mZM256btk6wkCDRmW7JYs= +github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg= github.com/pkg/xattr v0.4.9 h1:5883YPCtkSd8LFbs13nXplj9g9tlrwoJRjgpgMu1/fE= github.com/pkg/xattr v0.4.9/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -1144,6 +1143,7 @@ golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= @@ -1365,6 +1365,7 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210420072515-93ed5bcd2bfe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/sync/errgroup/errgroup.go b/internal/sync/errgroup/errgroup.go deleted file mode 100644 index 4004fbf58..000000000 --- a/internal/sync/errgroup/errgroup.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright (c) 2015-2021 MinIO, Inc. -// -// This file is part of MinIO Object Storage stack -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package errgroup - -import ( - "context" - "sync" - "sync/atomic" -) - -// A Group is a collection of goroutines working on subtasks that are part of -// the same overall task. -// -// A zero Group can be used if errors should not be tracked. -type Group struct { - firstErr int64 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG - wg sync.WaitGroup - bucket chan struct{} - errs []error - cancel context.CancelFunc - ctxCancel <-chan struct{} // nil if no context. - ctxErr func() error -} - -// WithNErrs returns a new Group with length of errs slice upto nerrs, -// upon Wait() errors are returned collected from all tasks. -func WithNErrs(nerrs int) *Group { - return &Group{errs: make([]error, nerrs), firstErr: -1} -} - -// Wait blocks until all function calls from the Go method have returned, then -// returns the slice of errors from all function calls. -func (g *Group) Wait() []error { - g.wg.Wait() - if g.cancel != nil { - g.cancel() - } - return g.errs -} - -// WaitErr blocks until all function calls from the Go method have returned, then -// returns the first error returned. -func (g *Group) WaitErr() error { - g.wg.Wait() - if g.cancel != nil { - g.cancel() - } - if g.firstErr >= 0 && len(g.errs) > int(g.firstErr) { - // len(g.errs) > int(g.firstErr) is for then used uninitialized. - return g.errs[g.firstErr] - } - return nil -} - -// WithConcurrency allows to limit the concurrency of the group. -// This must be called before starting any async processes. -// There is no order to which functions are allowed to run. -// If n <= 0 no concurrency limits are enforced. -// g is modified and returned as well. -func (g *Group) WithConcurrency(n int) *Group { - if n <= 0 { - g.bucket = nil - return g - } - - // Fill bucket with tokens - g.bucket = make(chan struct{}, n) - for i := 0; i < n; i++ { - g.bucket <- struct{}{} - } - return g -} - -// WithCancelOnError will return a context that is canceled -// as soon as an error occurs. -// The returned CancelFunc must always be called similar to context.WithCancel. -// If the supplied context is canceled any goroutines waiting for execution are also canceled. -func (g *Group) WithCancelOnError(ctx context.Context) (context.Context, context.CancelFunc) { - ctx, g.cancel = context.WithCancel(ctx) - g.ctxCancel = ctx.Done() - g.ctxErr = ctx.Err - return ctx, g.cancel -} - -// Go calls the given function in a new goroutine. -// -// The errors will be collected in errs slice and returned by Wait(). -func (g *Group) Go(f func() error, index int) { - g.wg.Add(1) - go func() { - defer g.wg.Done() - if g.bucket != nil { - // Wait for token - select { - case <-g.bucket: - defer func() { - // Put back token.. - g.bucket <- struct{}{} - }() - case <-g.ctxCancel: - if len(g.errs) > index { - atomic.CompareAndSwapInt64(&g.firstErr, -1, int64(index)) - g.errs[index] = g.ctxErr() - } - return - } - } - if err := f(); err != nil { - if len(g.errs) > index { - atomic.CompareAndSwapInt64(&g.firstErr, -1, int64(index)) - g.errs[index] = err - } - if g.cancel != nil { - g.cancel() - } - } - }() -} diff --git a/internal/sync/errgroup/errgroup_test.go b/internal/sync/errgroup/errgroup_test.go deleted file mode 100644 index 3de4bdd06..000000000 --- a/internal/sync/errgroup/errgroup_test.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright (c) 2015-2021 MinIO, Inc. -// -// This file is part of MinIO Object Storage stack -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package errgroup - -import ( - "fmt" - "reflect" - "testing" -) - -func TestGroupWithNErrs(t *testing.T) { - err1 := fmt.Errorf("errgroup_test: 1") - err2 := fmt.Errorf("errgroup_test: 2") - - cases := []struct { - errs []error - }{ - {errs: []error{nil}}, - {errs: []error{err1}}, - {errs: []error{err1, nil}}, - {errs: []error{err1, nil, err2}}, - } - - for j, tc := range cases { - t.Run(fmt.Sprintf("Test%d", j+1), func(t *testing.T) { - g := WithNErrs(len(tc.errs)) - for i, err := range tc.errs { - err := err - g.Go(func() error { return err }, i) - } - - gotErrs := g.Wait() - if !reflect.DeepEqual(gotErrs, tc.errs) { - t.Errorf("Expected %#v, got %#v", tc.errs, gotErrs) - } - }) - } -} diff --git a/internal/workers/workers.go b/internal/workers/workers.go deleted file mode 100644 index 4ec42b35b..000000000 --- a/internal/workers/workers.go +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright (c) 2022-2023 MinIO, Inc. -// -// This file is part of MinIO Object Storage stack -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package workers - -import ( - "errors" - "sync" -) - -// Workers provides a bounded semaphore with the ability to wait until all -// concurrent jobs finish. -type Workers struct { - wg sync.WaitGroup - queue chan struct{} -} - -// New creates a Workers object which allows up to n jobs to proceed -// concurrently. n must be > 0. -func New(n int) (*Workers, error) { - if n <= 0 { - return nil, errors.New("n must be > 0") - } - - queue := make(chan struct{}, n) - for i := 0; i < n; i++ { - queue <- struct{}{} - } - return &Workers{ - queue: queue, - }, nil -} - -// Take is how a job (goroutine) can Take its turn. -func (jt *Workers) Take() { - jt.wg.Add(1) - <-jt.queue -} - -// Give is how a job (goroutine) can give back its turn once done. -func (jt *Workers) Give() { - jt.queue <- struct{}{} - jt.wg.Done() -} - -// Wait waits for all ongoing concurrent jobs to complete -func (jt *Workers) Wait() { - jt.wg.Wait() -} diff --git a/internal/workers/workers_test.go b/internal/workers/workers_test.go deleted file mode 100644 index 29b15f4f0..000000000 --- a/internal/workers/workers_test.go +++ /dev/null @@ -1,148 +0,0 @@ -// Copyright (c) 2022-2023 MinIO, Inc. -// -// This file is part of MinIO Object Storage stack -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package workers - -import ( - "fmt" - "sync" - "testing" -) - -func TestWorkers(t *testing.T) { - tests := []struct { - n int - jobs int - mustFail bool - }{ - { - n: 0, - jobs: 5, - mustFail: true, - }, - { - n: -1, - jobs: 5, - mustFail: true, - }, - { - n: 1, - jobs: 5, - }, - { - n: 2, - jobs: 5, - }, - { - n: 5, - jobs: 10, - }, - { - n: 10, - jobs: 5, - }, - } - testFn := func(n, jobs int, mustFail bool) { - var mu sync.Mutex - var jobsDone int - // Create workers for n concurrent workers - jt, err := New(n) - if err == nil && mustFail { - t.Fatal("Expected test to return error") - } - if err != nil && mustFail { - return - } - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - for i := 0; i < jobs; i++ { - jt.Take() - go func() { // Launch a worker after acquiring a token - defer jt.Give() // Give token back once done - mu.Lock() - jobsDone++ - mu.Unlock() - }() - } - jt.Wait() // Wait for all workers to complete - if jobsDone != jobs { - t.Fatalf("Expected %d jobs to be done but only %d were done", jobs, jobsDone) - } - } - - for i, test := range tests { - t.Run(fmt.Sprintf("test-%d", i), func(t *testing.T) { - testFn(test.n, test.jobs, test.mustFail) - }) - } - - // Verify that workers can be reused after full drain - t.Run("test-workers-reuse", func(t *testing.T) { - var mu sync.Mutex - jt, _ := New(5) - for reuse := 0; reuse < 3; reuse++ { - var jobsDone int - for i := 0; i < 10; i++ { - jt.Take() - go func() { - defer jt.Give() - mu.Lock() - jobsDone++ - mu.Unlock() - }() - } - jt.Wait() - if jobsDone != 10 { - t.Fatalf("Expected %d jobs to be complete but only %d were", 10, jobsDone) - } - } - }) -} - -func benchmarkWorkers(b *testing.B, n, jobs int) { - b.ReportAllocs() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - var mu sync.Mutex - var jobsDone int - jt, _ := New(n) - for i := 0; i < jobs; i++ { - jt.Take() - go func() { - defer jt.Give() - mu.Lock() - jobsDone++ - mu.Unlock() - }() - } - jt.Wait() - if jobsDone != jobs { - b.Fail() - } - } - }) -} - -func BenchmarkWorkers_N5_J10(b *testing.B) { - benchmarkWorkers(b, 5, 10) -} - -func BenchmarkWorkers_N5_J100(b *testing.B) { - benchmarkWorkers(b, 5, 100) -}