From 68a519a4684cce019962c11cdc942ee4889484c2 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 14 Oct 2019 09:44:51 -0700 Subject: [PATCH] Use errgroups instead of sync.WaitGroup as needed (#8354) --- cmd/bucket-handlers.go | 3 +- cmd/disk-cache.go | 31 ++-- cmd/format-xl.go | 119 ++++++-------- cmd/notification.go | 329 ++++++++++++++++++------------------- cmd/posix-list-dir_test.go | 4 +- cmd/xl-sets.go | 83 +++++----- cmd/xl-v1-bucket.go | 132 +++++++-------- cmd/xl-v1-common.go | 33 ++-- cmd/xl-v1-healing.go | 76 ++++----- cmd/xl-v1-metadata.go | 34 ++-- cmd/xl-v1-multipart.go | 50 +++--- cmd/xl-v1-object.go | 145 ++++++++-------- cmd/xl-v1-utils.go | 31 ++-- cmd/xl-v1.go | 43 +++-- 14 files changed, 512 insertions(+), 601 deletions(-) diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index ac9e60425..da30a941f 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -71,8 +71,8 @@ func initFederatorBackend(objLayer ObjectLayer) { // Add buckets that are not registered with the DNS g := errgroup.WithNErrs(len(b)) for index := range b { - index := index bucketSet.Add(b[index].Name) + index := index g.Go(func() error { r, gerr := globalDNSConfig.Get(b[index].Name) if gerr != nil { @@ -99,7 +99,6 @@ func initFederatorBackend(objLayer ObjectLayer) { // Remove buckets that are in DNS for this server, but aren't local for index := range dnsBuckets { index := index - g.Go(func() error { // This is a local bucket that exists, so we can continue if bucketSet.Contains(dnsBuckets[index].Key) { diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index 4a59169c5..ea1939f2f 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -16,6 +16,7 @@ import ( "github.com/minio/minio/cmd/config/cache" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/color" + "github.com/minio/minio/pkg/sync/errgroup" "github.com/minio/minio/pkg/wildcard" ) @@ -450,36 +451,32 @@ func checkAtimeSupport(dir string) (err error) { func (c *cacheObjects) migrateCacheFromV1toV2(ctx context.Context) { logStartupMessage(color.Blue("Cache migration initiated ....")) - var wg sync.WaitGroup - errs := make([]error, len(c.cache)) - for i, dc := range c.cache { + g := errgroup.WithNErrs(len(c.cache)) + for index, dc := range c.cache { if dc == nil { continue } - wg.Add(1) - // start migration from V1 to V2 - go func(ctx context.Context, dc *diskCache, errs []error, idx int) { - defer wg.Done() - if err := migrateOldCache(ctx, dc); err != nil { - errs[idx] = err - logger.LogIf(ctx, err) - return - } - // start purge routine after migration completes. - go dc.purge() - }(ctx, dc, errs, i) + index := index + g.Go(func() error { + // start migration from V1 to V2 + return migrateOldCache(ctx, c.cache[index]) + }, index) } - wg.Wait() errCnt := 0 - for _, err := range errs { + for index, err := range g.Wait() { if err != nil { errCnt++ + logger.LogIf(ctx, err) + continue } + go c.cache[index].purge() } + if errCnt > 0 { return } + // update migration status c.migMutex.Lock() defer c.migMutex.Unlock() diff --git a/cmd/format-xl.go b/cmd/format-xl.go index 4080172e3..d14a8babc 100644 --- a/cmd/format-xl.go +++ b/cmd/format-xl.go @@ -23,12 +23,12 @@ import ( "fmt" "io/ioutil" "reflect" - "sync" "encoding/hex" humanize "github.com/dustin/go-humanize" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/sync/errgroup" sha256 "github.com/minio/sha256-simd" ) @@ -315,40 +315,30 @@ func quorumUnformattedDisks(errs []error) bool { // loadFormatXLAll - load all format config from all input disks in parallel. func loadFormatXLAll(storageDisks []StorageAPI) ([]*formatXLV3, []error) { - // Initialize sync waitgroup. - var wg sync.WaitGroup - // Initialize list of errors. - var sErrs = make([]error, len(storageDisks)) + g := errgroup.WithNErrs(len(storageDisks)) // Initialize format configs. var formats = make([]*formatXLV3, len(storageDisks)) // Load format from each disk in parallel - for index, disk := range storageDisks { - if disk == nil { - sErrs[index] = errDiskNotFound - continue - } - wg.Add(1) - // Launch go-routine per disk. - go func(index int, disk StorageAPI) { - defer wg.Done() - - format, lErr := loadFormatXL(disk) - if lErr != nil { - sErrs[index] = lErr - return + for index := range storageDisks { + index := index + g.Go(func() error { + if storageDisks[index] == nil { + return errDiskNotFound + } + format, err := loadFormatXL(storageDisks[index]) + if err != nil { + return err } formats[index] = format - }(index, disk) + return nil + }, index) } - // Wait for all go-routines to finish. - wg.Wait() - - // Return all formats and nil - return formats, sErrs + // Return all formats and errors if any. + return formats, g.Wait() } func saveFormatXL(disk StorageAPI, format interface{}) error { @@ -643,28 +633,22 @@ func formatXLV3Check(reference *formatXLV3, format *formatXLV3) error { // saveFormatXLAll - populates `format.json` on disks in its order. func saveFormatXLAll(ctx context.Context, storageDisks []StorageAPI, formats []*formatXLV3) error { - var errs = make([]error, len(storageDisks)) - - var wg sync.WaitGroup + g := errgroup.WithNErrs(len(storageDisks)) // Write `format.json` to all disks. - for index, disk := range storageDisks { - if formats[index] == nil || disk == nil { - errs[index] = errDiskNotFound - continue - } - wg.Add(1) - go func(index int, disk StorageAPI, format *formatXLV3) { - defer wg.Done() - errs[index] = saveFormatXL(disk, format) - }(index, disk, formats[index]) + for index := range storageDisks { + index := index + g.Go(func() error { + if formats[index] == nil || storageDisks[index] == nil { + return errDiskNotFound + } + return saveFormatXL(storageDisks[index], formats[index]) + }, index) } - // Wait for the routines to finish. - wg.Wait() - writeQuorum := len(storageDisks)/2 + 1 - return reduceWriteQuorumErrs(ctx, errs, nil, writeQuorum) + // Wait for the routines to finish. + return reduceWriteQuorumErrs(ctx, g.Wait(), nil, writeQuorum) } // relinquishes the underlying connection for all storage disks. @@ -682,17 +666,19 @@ func closeStorageDisks(storageDisks []StorageAPI) { func initStorageDisksWithErrors(endpoints EndpointList) ([]StorageAPI, []error) { // Bootstrap disks. storageDisks := make([]StorageAPI, len(endpoints)) - errs := make([]error, len(endpoints)) - var wg sync.WaitGroup - for index, endpoint := range endpoints { - wg.Add(1) - go func(index int, endpoint Endpoint) { - defer wg.Done() - storageDisks[index], errs[index] = newStorageAPI(endpoint) - }(index, endpoint) + g := errgroup.WithNErrs(len(endpoints)) + for index := range endpoints { + index := index + g.Go(func() error { + storageDisk, err := newStorageAPI(endpoints[index]) + if err != nil { + return err + } + storageDisks[index] = storageDisk + return nil + }, index) } - wg.Wait() - return storageDisks, errs + return storageDisks, g.Wait() } // formatXLV3ThisEmpty - find out if '.This' field is empty @@ -793,31 +779,24 @@ func initFormatXLMetaVolume(storageDisks []StorageAPI, formats []*formatXLV3) er // This happens for the first time, but keep this here since this // is the only place where it can be made expensive optimizing all // other calls. Create minio meta volume, if it doesn't exist yet. - var wg sync.WaitGroup // Initialize errs to collect errors inside go-routine. - var errs = make([]error, len(storageDisks)) + g := errgroup.WithNErrs(len(storageDisks)) // Initialize all disks in parallel. - for index, disk := range storageDisks { - if formats[index] == nil || disk == nil { - // Ignore create meta volume on disks which are not found. - continue - } - wg.Add(1) - go func(index int, disk StorageAPI) { - // Indicate this wait group is done. - defer wg.Done() - - errs[index] = makeFormatXLMetaVolumes(disk) - }(index, disk) + for index := range storageDisks { + index := index + g.Go(func() error { + if formats[index] == nil || storageDisks[index] == nil { + // Ignore create meta volume on disks which are not found. + return nil + } + return makeFormatXLMetaVolumes(storageDisks[index]) + }, index) } - // Wait for all cleanup to finish. - wg.Wait() - // Return upon first error. - for _, err := range errs { + for _, err := range g.Wait() { if err == nil { continue } diff --git a/cmd/notification.go b/cmd/notification.go index e09521fc8..2aa756537 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -38,6 +38,7 @@ import ( "github.com/minio/minio/pkg/madmin" xnet "github.com/minio/minio/pkg/net" "github.com/minio/minio/pkg/policy" + "github.com/minio/minio/pkg/sync/errgroup" ) // NotificationSys - notification system. @@ -72,24 +73,6 @@ type NotificationPeerErr struct { Err error // Error returned by the remote peer for an rpc call } -// DeleteBucket - calls DeleteBucket RPC call on all peers. -func (sys *NotificationSys) DeleteBucket(ctx context.Context, bucketName string) { - go func() { - var wg sync.WaitGroup - for _, client := range sys.peerClients { - wg.Add(1) - go func(client *peerRESTClient) { - defer wg.Done() - if err := client.DeleteBucket(bucketName); err != nil { - logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name) - logger.LogIf(ctx, err) - } - }(client) - } - wg.Wait() - }() -} - // A NotificationGroup is a collection of goroutines working on subtasks that are part of // the same overall task. // @@ -438,43 +421,44 @@ func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerE // ServerInfo - calls ServerInfo RPC call on all peers. func (sys *NotificationSys) ServerInfo(ctx context.Context) []ServerInfo { serverInfo := make([]ServerInfo, len(sys.peerClients)) - var wg sync.WaitGroup + + g := errgroup.WithNErrs(len(sys.peerClients)) for index, client := range sys.peerClients { if client == nil { continue } - wg.Add(1) - go func(idx int, client *peerRESTClient) { - defer wg.Done() + index := index + g.Go(func() error { // Try to fetch serverInfo remotely in three attempts. for i := 0; i < 3; i++ { - info, err := client.ServerInfo() - if err == nil { - serverInfo[idx] = ServerInfo{ - Addr: client.host.String(), - Data: &info, - } - return + serverInfo[index] = ServerInfo{ + Addr: sys.peerClients[index].host.String(), } - serverInfo[idx] = ServerInfo{ - Addr: client.host.String(), - Data: &info, - Error: err.Error(), + info, err := sys.peerClients[index].ServerInfo() + if err != nil { + serverInfo[index].Error = err.Error() } + serverInfo[index].Data = &info // Last iteration log the error. if i == 2 { - reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) - ctx := logger.SetReqInfo(ctx, reqInfo) - logger.LogIf(ctx, err) + return err } // Wait for one second and no need wait after last attempt. if i < 2 { time.Sleep(1 * time.Second) } } - }(index, client) + return nil + }, index) + } + for index, err := range g.Wait() { + if err != nil { + addr := sys.peerClients[index].host.String() + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr) + ctx := logger.SetReqInfo(ctx, reqInfo) + logger.LogIf(ctx, err) + } } - wg.Wait() return serverInfo } @@ -482,166 +466,163 @@ func (sys *NotificationSys) ServerInfo(ctx context.Context) []ServerInfo { func (sys *NotificationSys) GetLocks(ctx context.Context) []*PeerLocks { locksResp := make([]*PeerLocks, len(sys.peerClients)) - var wg sync.WaitGroup + g := errgroup.WithNErrs(len(sys.peerClients)) for index, client := range sys.peerClients { if client == nil { continue } - wg.Add(1) - go func(idx int, client *peerRESTClient) { - defer wg.Done() + index := index + g.Go(func() error { // Try to fetch serverInfo remotely in three attempts. for i := 0; i < 3; i++ { - serverLocksResp, err := client.GetLocks() + serverLocksResp, err := sys.peerClients[index].GetLocks() if err == nil { - locksResp[idx] = &PeerLocks{ - Addr: client.host.String(), + locksResp[index] = &PeerLocks{ + Addr: sys.peerClients[index].host.String(), Locks: serverLocksResp, } - return + return nil } // Last iteration log the error. if i == 2 { - reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String()) - ctx := logger.SetReqInfo(ctx, reqInfo) - logger.LogOnceIf(ctx, err, client.host.String()) + return err } // Wait for one second and no need wait after last attempt. if i < 2 { time.Sleep(1 * time.Second) } } - }(index, client) + return nil + }, index) + } + for index, err := range g.Wait() { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", + sys.peerClients[index].host.String()) + ctx := logger.SetReqInfo(ctx, reqInfo) + logger.LogOnceIf(ctx, err, sys.peerClients[index].host.String()) } - wg.Wait() return locksResp } // SetBucketPolicy - calls SetBucketPolicy RPC call on all peers. func (sys *NotificationSys) SetBucketPolicy(ctx context.Context, bucketName string, bucketPolicy *policy.Policy) { go func() { - var wg sync.WaitGroup - for _, client := range sys.peerClients { + ng := WithNPeers(len(sys.peerClients)) + for idx, client := range sys.peerClients { if client == nil { continue } - wg.Add(1) - go func(client *peerRESTClient) { - defer wg.Done() - if err := client.SetBucketPolicy(bucketName, bucketPolicy); err != nil { - logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name) - logger.LogIf(ctx, err) - } - }(client) + client := client + ng.Go(ctx, func() error { + return client.SetBucketPolicy(bucketName, bucketPolicy) + }, idx, *client.host) } - wg.Wait() + ng.Wait() + }() +} + +// DeleteBucket - calls DeleteBucket RPC call on all peers. +func (sys *NotificationSys) DeleteBucket(ctx context.Context, bucketName string) { + go func() { + ng := WithNPeers(len(sys.peerClients)) + for idx, client := range sys.peerClients { + if client == nil { + continue + } + client := client + ng.Go(ctx, func() error { + return client.DeleteBucket(bucketName) + }, idx, *client.host) + } + ng.Wait() }() } // RemoveBucketPolicy - calls RemoveBucketPolicy RPC call on all peers. func (sys *NotificationSys) RemoveBucketPolicy(ctx context.Context, bucketName string) { go func() { - var wg sync.WaitGroup - for _, client := range sys.peerClients { + ng := WithNPeers(len(sys.peerClients)) + for idx, client := range sys.peerClients { if client == nil { continue } - wg.Add(1) - go func(client *peerRESTClient) { - defer wg.Done() - if err := client.RemoveBucketPolicy(bucketName); err != nil { - logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name) - logger.LogIf(ctx, err) - } - }(client) + client := client + ng.Go(ctx, func() error { + return client.RemoveBucketPolicy(bucketName) + }, idx, *client.host) } - wg.Wait() + ng.Wait() }() } // SetBucketLifecycle - calls SetBucketLifecycle on all peers. -func (sys *NotificationSys) SetBucketLifecycle(ctx context.Context, bucketName string, bucketLifecycle *lifecycle.Lifecycle) { +func (sys *NotificationSys) SetBucketLifecycle(ctx context.Context, bucketName string, + bucketLifecycle *lifecycle.Lifecycle) { go func() { - var wg sync.WaitGroup - for _, client := range sys.peerClients { + ng := WithNPeers(len(sys.peerClients)) + for idx, client := range sys.peerClients { if client == nil { continue } - wg.Add(1) - go func(client *peerRESTClient) { - defer wg.Done() - if err := client.SetBucketLifecycle(bucketName, bucketLifecycle); err != nil { - logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name) - logger.LogIf(ctx, err) - } - }(client) + client := client + ng.Go(ctx, func() error { + return client.SetBucketLifecycle(bucketName, bucketLifecycle) + }, idx, *client.host) } - wg.Wait() + ng.Wait() }() } // RemoveBucketLifecycle - calls RemoveLifecycle on all peers. func (sys *NotificationSys) RemoveBucketLifecycle(ctx context.Context, bucketName string) { go func() { - var wg sync.WaitGroup - for _, client := range sys.peerClients { + ng := WithNPeers(len(sys.peerClients)) + for idx, client := range sys.peerClients { if client == nil { continue } - wg.Add(1) - go func(client *peerRESTClient) { - defer wg.Done() - if err := client.RemoveBucketLifecycle(bucketName); err != nil { - logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name) - logger.LogIf(ctx, err) - } - }(client) + client := client + ng.Go(ctx, func() error { + return client.RemoveBucketLifecycle(bucketName) + }, idx, *client.host) } - wg.Wait() + ng.Wait() }() } // PutBucketNotification - calls PutBucketNotification RPC call on all peers. func (sys *NotificationSys) PutBucketNotification(ctx context.Context, bucketName string, rulesMap event.RulesMap) { go func() { - var wg sync.WaitGroup - for _, client := range sys.peerClients { + ng := WithNPeers(len(sys.peerClients)) + for idx, client := range sys.peerClients { if client == nil { continue } - wg.Add(1) - go func(client *peerRESTClient, rulesMap event.RulesMap) { - defer wg.Done() - if err := client.PutBucketNotification(bucketName, rulesMap); err != nil { - logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name) - logger.LogIf(ctx, err) - } - }(client, rulesMap.Clone()) + client := client + ng.Go(ctx, func() error { + return client.PutBucketNotification(bucketName, rulesMap) + }, idx, *client.host) } - wg.Wait() + ng.Wait() }() } // ListenBucketNotification - calls ListenBucketNotification RPC call on all peers. -func (sys *NotificationSys) ListenBucketNotification(ctx context.Context, bucketName string, eventNames []event.Name, pattern string, - targetID event.TargetID, localPeer xnet.Host) { +func (sys *NotificationSys) ListenBucketNotification(ctx context.Context, bucketName string, + eventNames []event.Name, pattern string, targetID event.TargetID, localPeer xnet.Host) { go func() { - var wg sync.WaitGroup - for _, client := range sys.peerClients { + ng := WithNPeers(len(sys.peerClients)) + for idx, client := range sys.peerClients { if client == nil { continue } - wg.Add(1) - go func(client *peerRESTClient) { - defer wg.Done() - if err := client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer); err != nil { - logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name) - logger.LogIf(ctx, err) - } - }(client) + client := client + ng.Go(ctx, func() error { + return client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer) + }, idx, *client.host) } - wg.Wait() + ng.Wait() }() } @@ -981,78 +962,90 @@ func (sys *NotificationSys) CollectNetPerfInfo(size int64) map[string][]ServerNe // DrivePerfInfo - Drive speed (read and write) information func (sys *NotificationSys) DrivePerfInfo(size int64) []madmin.ServerDrivesPerfInfo { reply := make([]madmin.ServerDrivesPerfInfo, len(sys.peerClients)) - var wg sync.WaitGroup - for i, client := range sys.peerClients { + + g := errgroup.WithNErrs(len(sys.peerClients)) + for index, client := range sys.peerClients { if client == nil { continue } - wg.Add(1) - go func(client *peerRESTClient, idx int) { - defer wg.Done() - di, err := client.DrivePerfInfo(size) - if err != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", client.host.String()) - ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, err) - di.Addr = client.host.String() - di.Error = err.Error() - } - reply[idx] = di - }(client, i) + index := index + g.Go(func() error { + var err error + reply[index], err = sys.peerClients[index].DrivePerfInfo(size) + return err + }, index) + } + + for index, err := range g.Wait() { + if err != nil { + addr := sys.peerClients[index].host.String() + reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr) + ctx := logger.SetReqInfo(context.Background(), reqInfo) + logger.LogIf(ctx, err) + reply[index].Addr = addr + reply[index].Error = err.Error() + } } - wg.Wait() return reply } // MemUsageInfo - Mem utilization information func (sys *NotificationSys) MemUsageInfo() []ServerMemUsageInfo { reply := make([]ServerMemUsageInfo, len(sys.peerClients)) - var wg sync.WaitGroup - for i, client := range sys.peerClients { + + g := errgroup.WithNErrs(len(sys.peerClients)) + for index, client := range sys.peerClients { if client == nil { continue } - wg.Add(1) - go func(client *peerRESTClient, idx int) { - defer wg.Done() - memi, err := client.MemUsageInfo() - if err != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", client.host.String()) - ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, err) - memi.Addr = client.host.String() - memi.Error = err.Error() - } - reply[idx] = memi - }(client, i) + index := index + g.Go(func() error { + var err error + reply[index], err = sys.peerClients[index].MemUsageInfo() + return err + }, index) + } + + for index, err := range g.Wait() { + if err != nil { + addr := sys.peerClients[index].host.String() + reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr) + ctx := logger.SetReqInfo(context.Background(), reqInfo) + logger.LogIf(ctx, err) + reply[index].Addr = addr + reply[index].Error = err.Error() + } } - wg.Wait() return reply } // CPULoadInfo - CPU utilization information func (sys *NotificationSys) CPULoadInfo() []ServerCPULoadInfo { reply := make([]ServerCPULoadInfo, len(sys.peerClients)) - var wg sync.WaitGroup - for i, client := range sys.peerClients { + + g := errgroup.WithNErrs(len(sys.peerClients)) + for index, client := range sys.peerClients { if client == nil { continue } - wg.Add(1) - go func(client *peerRESTClient, idx int) { - defer wg.Done() - cpui, err := client.CPULoadInfo() - if err != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", client.host.String()) - ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogIf(ctx, err) - cpui.Addr = client.host.String() - cpui.Error = err.Error() - } - reply[idx] = cpui - }(client, i) + index := index + g.Go(func() error { + var err error + reply[index], err = sys.peerClients[index].CPULoadInfo() + return err + }, index) + } + + for index, err := range g.Wait() { + if err != nil { + addr := sys.peerClients[index].host.String() + reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr) + ctx := logger.SetReqInfo(context.Background(), reqInfo) + logger.LogIf(ctx, err) + reply[index].Addr = addr + reply[index].Error = err.Error() + } } - wg.Wait() return reply } diff --git a/cmd/posix-list-dir_test.go b/cmd/posix-list-dir_test.go index 93e2f664a..1ad0225cd 100644 --- a/cmd/posix-list-dir_test.go +++ b/cmd/posix-list-dir_test.go @@ -129,8 +129,8 @@ func setupTestReadDirGeneric(t *testing.T) (testResults []result) { // Test to read non-empty directory with symlinks. func setupTestReadDirSymlink(t *testing.T) (testResults []result) { - if runtime.GOOS != "Windows" { - t.Log("symlinks not available on windows") + if runtime.GOOS == globalWindowsOSName { + t.Skip("symlinks not available on windows") return nil } dir := mustSetupDir(t) diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 49499e51b..8200baea2 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -306,19 +306,21 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP // StorageInfo - combines output of StorageInfo across all erasure coded object sets. func (s *xlSets) StorageInfo(ctx context.Context) StorageInfo { var storageInfo StorageInfo - var wg sync.WaitGroup storageInfos := make([]StorageInfo, len(s.sets)) storageInfo.Backend.Type = BackendErasure - for index, set := range s.sets { - wg.Add(1) - go func(id int, set *xlObjects) { - defer wg.Done() - storageInfos[id] = set.StorageInfo(ctx) - }(index, set) + + g := errgroup.WithNErrs(len(s.sets)) + for index := range s.sets { + index := index + g.Go(func() error { + storageInfos[index] = s.sets[index].StorageInfo(ctx) + return nil + }, index) } + // Wait for the go routines. - wg.Wait() + g.Wait() for _, lstorageInfo := range storageInfos { storageInfo.Used += lstorageInfo.Used @@ -458,11 +460,12 @@ func undoMakeBucketSets(bucket string, sets []*xlObjects, errs []error) { // Undo previous make bucket entry on all underlying sets. for index := range sets { index := index - if errs[index] == nil { - g.Go(func() error { + g.Go(func() error { + if errs[index] == nil { return sets[index].DeleteBucket(context.Background(), bucket) - }, index) - } + } + return nil + }, index) } // Wait for all delete bucket to finish. @@ -618,11 +621,12 @@ func undoDeleteBucketSets(bucket string, sets []*xlObjects, errs []error) { // Undo previous delete bucket on all underlying sets. for index := range sets { index := index - if errs[index] == nil { - g.Go(func() error { + g.Go(func() error { + if errs[index] == nil { return sets[index].MakeBucketWithLocation(context.Background(), bucket, "") - }, index) - } + } + return nil + }, index) } g.Wait() @@ -742,19 +746,24 @@ func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, destBucke func listDirSetsFactory(ctx context.Context, sets ...*xlObjects) ListDirFunc { listDirInternal := func(bucket, prefixDir, prefixEntry string, disks []StorageAPI) (mergedEntries []string) { var diskEntries = make([][]string, len(disks)) - var wg sync.WaitGroup + g := errgroup.WithNErrs(len(disks)) for index, disk := range disks { if disk == nil { continue } - wg.Add(1) - go func(index int, disk StorageAPI) { - defer wg.Done() - diskEntries[index], _ = disk.ListDir(bucket, prefixDir, -1, xlMetaJSONFile) - }(index, disk) + index := index + g.Go(func() error { + var err error + diskEntries[index], err = disks[index].ListDir(bucket, prefixDir, -1, xlMetaJSONFile) + return err + }, index) } - wg.Wait() + for _, err := range g.Wait() { + if err != nil { + logger.LogIf(ctx, err) + } + } // Find elements in entries which are not in mergedEntries for _, entries := range diskEntries { @@ -1405,21 +1414,21 @@ func isTestSetup(infos []DiskInfo, errs []error) bool { func getAllDiskInfos(storageDisks []StorageAPI) ([]DiskInfo, []error) { infos := make([]DiskInfo, len(storageDisks)) - errs := make([]error, len(storageDisks)) - var wg sync.WaitGroup - for i := range storageDisks { - if storageDisks[i] == nil { - errs[i] = errDiskNotFound - continue - } - wg.Add(1) - go func(i int) { - defer wg.Done() - infos[i], errs[i] = storageDisks[i].DiskInfo() - }(i) + g := errgroup.WithNErrs(len(storageDisks)) + for index := range storageDisks { + index := index + g.Go(func() error { + var err error + if storageDisks[index] != nil { + infos[index], err = storageDisks[index].DiskInfo() + } else { + // Disk not found. + err = errDiskNotFound + } + return err + }, index) } - wg.Wait() - return infos, errs + return infos, g.Wait() } // Mark root disks as down so as not to heal them. diff --git a/cmd/xl-v1-bucket.go b/cmd/xl-v1-bucket.go index ebcc8f11a..839e4d94f 100644 --- a/cmd/xl-v1-bucket.go +++ b/cmd/xl-v1-bucket.go @@ -19,12 +19,12 @@ package cmd import ( "context" "sort" - "sync" "github.com/minio/minio-go/v6/pkg/s3utils" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/lifecycle" "github.com/minio/minio/pkg/policy" + "github.com/minio/minio/pkg/sync/errgroup" ) // list all errors that can be ignore in a bucket operation. @@ -42,83 +42,71 @@ func (xl xlObjects) MakeBucketWithLocation(ctx context.Context, bucket, location return BucketNameInvalid{Bucket: bucket} } - // Initialize sync waitgroup. - var wg sync.WaitGroup + storageDisks := xl.getDisks() - // Initialize list of errors. - var dErrs = make([]error, len(xl.getDisks())) + g := errgroup.WithNErrs(len(storageDisks)) // Make a volume entry on all underlying storage disks. - for index, disk := range xl.getDisks() { - if disk == nil { - dErrs[index] = errDiskNotFound - continue - } - wg.Add(1) - // Make a volume inside a go-routine. - go func(index int, disk StorageAPI) { - defer wg.Done() - err := disk.MakeVol(bucket) - if err != nil { - if err != errVolumeExists { - logger.LogIf(ctx, err) + for index := range storageDisks { + index := index + g.Go(func() error { + if storageDisks[index] != nil { + if err := storageDisks[index].MakeVol(bucket); err != nil { + if err != errVolumeExists { + logger.LogIf(ctx, err) + } + return err } - dErrs[index] = err + return nil } - }(index, disk) + return errDiskNotFound + }, index) } - // Wait for all make vol to finish. - wg.Wait() - - writeQuorum := len(xl.getDisks())/2 + 1 - err := reduceWriteQuorumErrs(ctx, dErrs, bucketOpIgnoredErrs, writeQuorum) + writeQuorum := len(storageDisks)/2 + 1 + err := reduceWriteQuorumErrs(ctx, g.Wait(), bucketOpIgnoredErrs, writeQuorum) if err == errXLWriteQuorum { // Purge successfully created buckets if we don't have writeQuorum. - undoMakeBucket(xl.getDisks(), bucket) + undoMakeBucket(storageDisks, bucket) } return toObjectErr(err, bucket) } -func (xl xlObjects) undoDeleteBucket(bucket string) { - // Initialize sync waitgroup. - var wg sync.WaitGroup +func undoDeleteBucket(storageDisks []StorageAPI, bucket string) { + g := errgroup.WithNErrs(len(storageDisks)) // Undo previous make bucket entry on all underlying storage disks. - for index, disk := range xl.getDisks() { - if disk == nil { + for index := range storageDisks { + if storageDisks[index] == nil { continue } - wg.Add(1) - // Delete a bucket inside a go-routine. - go func(index int, disk StorageAPI) { - defer wg.Done() - _ = disk.MakeVol(bucket) - }(index, disk) + index := index + g.Go(func() error { + _ = storageDisks[index].MakeVol(bucket) + return nil + }, index) } // Wait for all make vol to finish. - wg.Wait() + g.Wait() } // undo make bucket operation upon quorum failure. func undoMakeBucket(storageDisks []StorageAPI, bucket string) { - // Initialize sync waitgroup. - var wg sync.WaitGroup + g := errgroup.WithNErrs(len(storageDisks)) // Undo previous make bucket entry on all underlying storage disks. - for index, disk := range storageDisks { - if disk == nil { + for index := range storageDisks { + if storageDisks[index] == nil { continue } - wg.Add(1) - // Delete a bucket inside a go-routine. - go func(index int, disk StorageAPI) { - defer wg.Done() - _ = disk.DeleteVol(bucket) - }(index, disk) + index := index + g.Go(func() error { + _ = storageDisks[index].DeleteVol(bucket) + return nil + }, index) } // Wait for all make vol to finish. - wg.Wait() + g.Wait() } // getBucketInfo - returns the BucketInfo from one of the load balanced disks. @@ -245,42 +233,34 @@ func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error { defer bucketLock.Unlock() // Collect if all disks report volume not found. - var wg sync.WaitGroup - var dErrs = make([]error, len(xl.getDisks())) - - // Remove a volume entry on all underlying storage disks. storageDisks := xl.getDisks() - for index, disk := range storageDisks { - if disk == nil { - dErrs[index] = errDiskNotFound - continue - } - wg.Add(1) - // Delete volume inside a go-routine. - go func(index int, disk StorageAPI) { - defer wg.Done() - // Attempt to delete bucket. - err := disk.DeleteVol(bucket) - if err != nil { - dErrs[index] = err - return - } - // Cleanup all the previously incomplete multiparts. - err = cleanupDir(ctx, disk, minioMetaMultipartBucket, bucket) - if err != nil && err != errVolumeNotFound { - dErrs[index] = err + g := errgroup.WithNErrs(len(storageDisks)) + + for index := range storageDisks { + index := index + g.Go(func() error { + if storageDisks[index] != nil { + if err := storageDisks[index].DeleteVol(bucket); err != nil { + return err + } + err := cleanupDir(ctx, storageDisks[index], minioMetaMultipartBucket, bucket) + if err != nil && err != errVolumeNotFound { + return err + } + return nil } - }(index, disk) + return errDiskNotFound + }, index) } // Wait for all the delete vols to finish. - wg.Wait() + dErrs := g.Wait() - writeQuorum := len(xl.getDisks())/2 + 1 + writeQuorum := len(storageDisks)/2 + 1 err := reduceWriteQuorumErrs(ctx, dErrs, bucketOpIgnoredErrs, writeQuorum) if err == errXLWriteQuorum { - xl.undoDeleteBucket(bucket) + undoDeleteBucket(storageDisks, bucket) } if err != nil { return toObjectErr(err, bucket) diff --git a/cmd/xl-v1-common.go b/cmd/xl-v1-common.go index 0df3ef764..0e5462993 100644 --- a/cmd/xl-v1-common.go +++ b/cmd/xl-v1-common.go @@ -19,7 +19,8 @@ package cmd import ( "context" "path" - "sync" + + "github.com/minio/minio/pkg/sync/errgroup" ) // getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice. @@ -53,35 +54,33 @@ func (xl xlObjects) parentDirIsObject(ctx context.Context, bucket, parent string // isObject - returns `true` if the prefix is an object i.e if // `xl.json` exists at the leaf, false otherwise. func (xl xlObjects) isObject(bucket, prefix string) (ok bool) { - var errs = make([]error, len(xl.getDisks())) - var wg sync.WaitGroup - for index, disk := range xl.getDisks() { + storageDisks := xl.getDisks() + + g := errgroup.WithNErrs(len(storageDisks)) + + for index, disk := range storageDisks { if disk == nil { continue } - wg.Add(1) - go func(index int, disk StorageAPI) { - defer wg.Done() + index := index + g.Go(func() error { // Check if 'prefix' is an object on this 'disk', else continue the check the next disk - fi, err := disk.StatFile(bucket, path.Join(prefix, xlMetaJSONFile)) + fi, err := storageDisks[index].StatFile(bucket, pathJoin(prefix, xlMetaJSONFile)) if err != nil { - errs[index] = err - return + return err } if fi.Size == 0 { - errs[index] = errCorruptedFormat - return + return errCorruptedFormat } - }(index, disk) + return nil + }, index) } - wg.Wait() - // NOTE: Observe we are not trying to read `xl.json` and figure out the actual // quorum intentionally, but rely on the default case scenario. Actual quorum // verification will happen by top layer by using getObjectInfo() and will be // ignored if necessary. - readQuorum := len(xl.getDisks()) / 2 + readQuorum := len(storageDisks) / 2 - return reduceReadQuorumErrs(context.Background(), errs, objectOpIgnoredErrs, readQuorum) == nil + return reduceReadQuorumErrs(context.Background(), g.Wait(), objectOpIgnoredErrs, readQuorum) == nil } diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index fe5c805dc..bc17e7ea4 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -20,11 +20,11 @@ import ( "context" "fmt" "io" - "sync" "time" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/madmin" + "github.com/minio/minio/pkg/sync/errgroup" ) func (xl xlObjects) ReloadFormat(ctx context.Context, dryRun bool) error { @@ -57,40 +57,31 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, w dryRun bool) (res madmin.HealResultItem, err error) { // Initialize sync waitgroup. - var wg sync.WaitGroup - - // Initialize list of errors. - var dErrs = make([]error, len(storageDisks)) + g := errgroup.WithNErrs(len(storageDisks)) // Disk states slices beforeState := make([]string, len(storageDisks)) afterState := make([]string, len(storageDisks)) // Make a volume entry on all underlying storage disks. - for index, disk := range storageDisks { - if disk == nil { - dErrs[index] = errDiskNotFound - beforeState[index] = madmin.DriveStateOffline - afterState[index] = madmin.DriveStateOffline - continue - } - wg.Add(1) - - // Make a volume inside a go-routine. - go func(index int, disk StorageAPI) { - defer wg.Done() - if _, serr := disk.StatVol(bucket); serr != nil { + for index := range storageDisks { + index := index + g.Go(func() error { + if storageDisks[index] == nil { + beforeState[index] = madmin.DriveStateOffline + afterState[index] = madmin.DriveStateOffline + return errDiskNotFound + } + if _, serr := storageDisks[index].StatVol(bucket); serr != nil { if serr == errDiskNotFound { beforeState[index] = madmin.DriveStateOffline afterState[index] = madmin.DriveStateOffline - dErrs[index] = serr - return + return serr } if serr != errVolumeNotFound { beforeState[index] = madmin.DriveStateCorrupt afterState[index] = madmin.DriveStateCorrupt - dErrs[index] = serr - return + return serr } beforeState[index] = madmin.DriveStateMissing @@ -98,23 +89,22 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, w // mutate only if not a dry-run if dryRun { - return + return nil } - makeErr := disk.MakeVol(bucket) - dErrs[index] = makeErr + makeErr := storageDisks[index].MakeVol(bucket) if makeErr == nil { afterState[index] = madmin.DriveStateOk } - return + return makeErr } beforeState[index] = madmin.DriveStateOk afterState[index] = madmin.DriveStateOk - }(index, disk) + return nil + }, index) } - // Wait for all make vol to finish. - wg.Wait() + errs := g.Wait() // Initialize heal result info res = madmin.HealResultItem{ @@ -122,13 +112,13 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, w Bucket: bucket, DiskCount: len(storageDisks), } - for i, before := range beforeState { + for i := range beforeState { if storageDisks[i] != nil { drive := storageDisks[i].String() res.Before.Drives = append(res.Before.Drives, madmin.HealDriveInfo{ UUID: "", Endpoint: drive, - State: before, + State: beforeState[i], }) res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{ UUID: "", @@ -138,7 +128,7 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, w } } - reducedErr := reduceWriteQuorumErrs(ctx, dErrs, bucketOpIgnoredErrs, writeQuorum) + reducedErr := reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum) if reducedErr == errXLWriteQuorum { // Purge successfully created buckets if we don't have writeQuorum. undoMakeBucket(storageDisks, bucket) @@ -597,29 +587,25 @@ func defaultHealResult(latestXLMeta xlMetaV1, storageDisks []StorageAPI, errs [] // Stat all directories. func statAllDirs(ctx context.Context, storageDisks []StorageAPI, bucket, prefix string) []error { - var errs = make([]error, len(storageDisks)) - var wg sync.WaitGroup + g := errgroup.WithNErrs(len(storageDisks)) for index, disk := range storageDisks { if disk == nil { continue } - wg.Add(1) - go func(index int, disk StorageAPI) { - defer wg.Done() - entries, err := disk.ListDir(bucket, prefix, 1, "") + index := index + g.Go(func() error { + entries, err := storageDisks[index].ListDir(bucket, prefix, 1, "") if err != nil { - errs[index] = err - return + return err } if len(entries) > 0 { - errs[index] = errVolumeNotEmpty - return + return errVolumeNotEmpty } - }(index, disk) + return nil + }, index) } - wg.Wait() - return errs + return g.Wait() } // ObjectDir is considered dangling/corrupted if any only diff --git a/cmd/xl-v1-metadata.go b/cmd/xl-v1-metadata.go index 1aff7da11..e3e1df28d 100644 --- a/cmd/xl-v1-metadata.go +++ b/cmd/xl-v1-metadata.go @@ -24,11 +24,11 @@ import ( "net/http" "path" "sort" - "sync" "time" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/sync/errgroup" "github.com/minio/sha256-simd" ) @@ -452,31 +452,23 @@ func renameXLMetadata(ctx context.Context, disks []StorageAPI, srcBucket, srcEnt // writeUniqueXLMetadata - writes unique `xl.json` content for each disk in order. func writeUniqueXLMetadata(ctx context.Context, disks []StorageAPI, bucket, prefix string, xlMetas []xlMetaV1, quorum int) ([]StorageAPI, error) { - var wg sync.WaitGroup - var mErrs = make([]error, len(disks)) + g := errgroup.WithNErrs(len(disks)) // Start writing `xl.json` to all disks in parallel. - for index, disk := range disks { - if disk == nil { - mErrs[index] = errDiskNotFound - continue - } - wg.Add(1) - - // Pick one xlMeta for a disk at index. - xlMetas[index].Erasure.Index = index + 1 - - // Write `xl.json` in a routine. - go func(index int, disk StorageAPI, xlMeta xlMetaV1) { - defer wg.Done() - - // Write unique `xl.json` for a disk at index. - mErrs[index] = writeXLMetadata(ctx, disk, bucket, prefix, xlMeta) - }(index, disk, xlMetas[index]) + for index := range disks { + index := index + g.Go(func() error { + if disks[index] == nil { + return errDiskNotFound + } + // Pick one xlMeta for a disk at index. + xlMetas[index].Erasure.Index = index + 1 + return writeXLMetadata(ctx, disks[index], bucket, prefix, xlMetas[index]) + }, index) } // Wait for all the routines. - wg.Wait() + mErrs := g.Wait() err := reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, quorum) return evalDisks(disks, mErrs), err diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 1ad879761..c176b29b0 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -24,12 +24,12 @@ import ( "sort" "strconv" "strings" - "sync" "time" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/mimedb" + "github.com/minio/minio/pkg/sync/errgroup" ) func (xl xlObjects) getUploadIDDir(bucket, object, uploadID string) string { @@ -57,21 +57,23 @@ func (xl xlObjects) checkUploadIDExists(ctx context.Context, bucket, object, upl // Removes part given by partName belonging to a mulitpart upload from minioMetaBucket func (xl xlObjects) removeObjectPart(bucket, object, uploadID, partName string) { curpartPath := path.Join(bucket, object, uploadID, partName) - var wg sync.WaitGroup - for i, disk := range xl.getDisks() { + storageDisks := xl.getDisks() + + g := errgroup.WithNErrs(len(storageDisks)) + for index, disk := range storageDisks { if disk == nil { continue } - wg.Add(1) - go func(index int, disk StorageAPI) { - defer wg.Done() + index := index + g.Go(func() error { // Ignoring failure to remove parts that weren't present in CompleteMultipartUpload // requests. xl.json is the authoritative source of truth on which parts constitute // the object. The presence of parts that don't belong in the object doesn't affect correctness. - _ = disk.DeleteFile(minioMetaMultipartBucket, curpartPath) - }(i, disk) + _ = storageDisks[index].DeleteFile(minioMetaMultipartBucket, curpartPath) + return nil + }, index) } - wg.Wait() + g.Wait() } // statPart - returns fileInfo structure for a successful stat on part file. @@ -104,31 +106,29 @@ func (xl xlObjects) statPart(ctx context.Context, bucket, object, uploadID, part // commitXLMetadata - commit `xl.json` from source prefix to destination prefix in the given slice of disks. func commitXLMetadata(ctx context.Context, disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPrefix string, quorum int) ([]StorageAPI, error) { - var wg sync.WaitGroup - var mErrs = make([]error, len(disks)) - srcJSONFile := path.Join(srcPrefix, xlMetaJSONFile) dstJSONFile := path.Join(dstPrefix, xlMetaJSONFile) + g := errgroup.WithNErrs(len(disks)) + // Rename `xl.json` to all disks in parallel. - for index, disk := range disks { - if disk == nil { - mErrs[index] = errDiskNotFound - continue - } - wg.Add(1) - // Rename `xl.json` in a routine. - go func(index int, disk StorageAPI) { - defer wg.Done() + for index := range disks { + index := index + g.Go(func() error { + if disks[index] == nil { + return errDiskNotFound + } + // Delete any dangling directories. - defer disk.DeleteFile(srcBucket, srcPrefix) + defer disks[index].DeleteFile(srcBucket, srcPrefix) // Renames `xl.json` from source prefix to destination prefix. - mErrs[index] = disk.RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile) - }(index, disk) + return disks[index].RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile) + }, index) } + // Wait for all the routines. - wg.Wait() + mErrs := g.Wait() err := reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, quorum) return evalDisks(disks, mErrs), err diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 197b8e64a..8a08faf81 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -22,11 +22,11 @@ import ( "io" "net/http" "path" - "sync" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/mimedb" + "github.com/minio/minio/pkg/sync/errgroup" ) // list all errors which can be ignored in object operations. @@ -34,25 +34,26 @@ var objectOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied) // putObjectDir hints the bottom layer to create a new directory. func (xl xlObjects) putObjectDir(ctx context.Context, bucket, object string, writeQuorum int) error { - var wg sync.WaitGroup + storageDisks := xl.getDisks() + + g := errgroup.WithNErrs(len(storageDisks)) - errs := make([]error, len(xl.getDisks())) // Prepare object creation in all disks - for index, disk := range xl.getDisks() { - if disk == nil { + for index := range storageDisks { + if storageDisks[index] == nil { continue } - wg.Add(1) - go func(index int, disk StorageAPI) { - defer wg.Done() - if err := disk.MakeVol(pathJoin(bucket, object)); err != nil && err != errVolumeExists { - errs[index] = err + index := index + g.Go(func() error { + err := storageDisks[index].MakeVol(pathJoin(bucket, object)) + if err != nil && err != errVolumeExists { + return err } - }(index, disk) + return nil + }, index) } - wg.Wait() - return reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) + return reduceWriteQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, writeQuorum) } /// Object Operations @@ -335,36 +336,34 @@ func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startO } // getObjectInfoDir - This getObjectInfo is specific to object directory lookup. -func (xl xlObjects) getObjectInfoDir(ctx context.Context, bucket, object string) (oi ObjectInfo, err error) { - var wg sync.WaitGroup +func (xl xlObjects) getObjectInfoDir(ctx context.Context, bucket, object string) (ObjectInfo, error) { + storageDisks := xl.getDisks() + + g := errgroup.WithNErrs(len(storageDisks)) - errs := make([]error, len(xl.getDisks())) // Prepare object creation in a all disks - for index, disk := range xl.getDisks() { + for index, disk := range storageDisks { if disk == nil { continue } - wg.Add(1) - go func(index int, disk StorageAPI) { - defer wg.Done() + index := index + g.Go(func() error { // Check if 'prefix' is an object on this 'disk'. - entries, err := disk.ListDir(bucket, object, 1, "") + entries, err := storageDisks[index].ListDir(bucket, object, 1, "") if err != nil { - errs[index] = err - return + return err } if len(entries) > 0 { // Not a directory if not empty. - errs[index] = errFileNotFound - return + return errFileNotFound } - }(index, disk) + return nil + }, index) } - wg.Wait() - - readQuorum := len(xl.getDisks()) / 2 - return dirObjectInfo(bucket, object, 0, map[string]string{}), reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum) + readQuorum := len(storageDisks) / 2 + err := reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum) + return dirObjectInfo(bucket, object, 0, map[string]string{}), err } // GetObjectInfo - reads object metadata and replies back ObjectInfo. @@ -424,7 +423,6 @@ func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string) (o } func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, errs []error) { - var wg sync.WaitGroup // Undo rename object on disks where RenameFile succeeded. // If srcEntry/dstEntry are objects then add a trailing slash to copy @@ -433,56 +431,51 @@ func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry str srcEntry = retainSlash(srcEntry) dstEntry = retainSlash(dstEntry) } + g := errgroup.WithNErrs(len(disks)) for index, disk := range disks { if disk == nil { continue } - // Undo rename object in parallel. - wg.Add(1) - go func(index int, disk StorageAPI) { - defer wg.Done() - if errs[index] != nil { - return + index := index + g.Go(func() error { + if errs[index] == nil { + _ = disks[index].RenameFile(dstBucket, dstEntry, srcBucket, srcEntry) } - _ = disk.RenameFile(dstBucket, dstEntry, srcBucket, srcEntry) - }(index, disk) + return nil + }, index) } - wg.Wait() + g.Wait() } // rename - common function that renamePart and renameObject use to rename // the respective underlying storage layer representations. func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, writeQuorum int, ignoredErr []error) ([]StorageAPI, error) { - // Initialize sync waitgroup. - var wg sync.WaitGroup - - // Initialize list of errors. - var errs = make([]error, len(disks)) if isDir { dstEntry = retainSlash(dstEntry) srcEntry = retainSlash(srcEntry) } + g := errgroup.WithNErrs(len(disks)) + // Rename file on all underlying storage disks. - for index, disk := range disks { - if disk == nil { - errs[index] = errDiskNotFound - continue - } - wg.Add(1) - go func(index int, disk StorageAPI) { - defer wg.Done() - if err := disk.RenameFile(srcBucket, srcEntry, dstBucket, dstEntry); err != nil { + for index := range disks { + index := index + g.Go(func() error { + if disks[index] == nil { + return errDiskNotFound + } + if err := disks[index].RenameFile(srcBucket, srcEntry, dstBucket, dstEntry); err != nil { if !IsErrIgnored(err, ignoredErr...) { - errs[index] = err + return err } } - }(index, disk) + return nil + }, index) } // Wait for all renames to finish. - wg.Wait() + errs := g.Wait() // We can safely allow RenameFile errors up to len(xl.getDisks()) - writeQuorum // otherwise return failure. Cleanup successful renames. @@ -744,39 +737,31 @@ func (xl xlObjects) deleteObject(ctx context.Context, bucket, object string, wri } } - // Initialize sync waitgroup. - var wg sync.WaitGroup + g := errgroup.WithNErrs(len(disks)) - // Initialize list of errors. - var dErrs = make([]error, len(disks)) - - for index, disk := range disks { - if disk == nil { - dErrs[index] = errDiskNotFound - continue - } - wg.Add(1) - go func(index int, disk StorageAPI, isDir bool) { - defer wg.Done() - var e error + for index := range disks { + index := index + g.Go(func() error { + if disks[index] == nil { + return errDiskNotFound + } + var err error if isDir { // DeleteFile() simply tries to remove a directory // and will succeed only if that directory is empty. - e = disk.DeleteFile(minioMetaTmpBucket, tmpObj) + err = disks[index].DeleteFile(minioMetaTmpBucket, tmpObj) } else { - e = cleanupDir(ctx, disk, minioMetaTmpBucket, tmpObj) + err = cleanupDir(ctx, disks[index], minioMetaTmpBucket, tmpObj) } - if e != nil && e != errVolumeNotFound { - dErrs[index] = e + if err != nil && err != errVolumeNotFound { + return err } - }(index, disk, isDir) + return nil + }, index) } - // Wait for all routines to finish. - wg.Wait() - // return errors if any during deletion - return reduceWriteQuorumErrs(ctx, dErrs, objectOpIgnoredErrs, writeQuorum) + return reduceWriteQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, writeQuorum) } // deleteObject - wrapper for delete object, deletes an object from diff --git a/cmd/xl-v1-utils.go b/cmd/xl-v1-utils.go index ca90c7dde..5e20a407d 100644 --- a/cmd/xl-v1-utils.go +++ b/cmd/xl-v1-utils.go @@ -21,10 +21,10 @@ import ( "errors" "hash/crc32" "path" - "sync" jsoniter "github.com/json-iterator/go" "github.com/minio/minio/cmd/logger" + "github.com/minio/minio/pkg/sync/errgroup" ) // Returns number of errors that occurred the most (incl. nil) and the @@ -180,28 +180,23 @@ func readXLMeta(ctx context.Context, disk StorageAPI, bucket string, object stri // Reads all `xl.json` metadata as a xlMetaV1 slice. // Returns error slice indicating the failed metadata reads. func readAllXLMetadata(ctx context.Context, disks []StorageAPI, bucket, object string) ([]xlMetaV1, []error) { - errs := make([]error, len(disks)) metadataArray := make([]xlMetaV1, len(disks)) - var wg sync.WaitGroup + + g := errgroup.WithNErrs(len(disks)) // Read `xl.json` parallelly across disks. - for index, disk := range disks { - if disk == nil { - errs[index] = errDiskNotFound - continue - } - wg.Add(1) - // Read `xl.json` in routine. - go func(index int, disk StorageAPI) { - defer wg.Done() - metadataArray[index], errs[index] = readXLMeta(ctx, disk, bucket, object) - }(index, disk) + for index := range disks { + index := index + g.Go(func() (err error) { + if disks[index] == nil { + return errDiskNotFound + } + metadataArray[index], err = readXLMeta(ctx, disks[index], bucket, object) + return err + }, index) } - // Wait for all the routines to finish. - wg.Wait() - // Return all the metadata. - return metadataArray, errs + return metadataArray, g.Wait() } // Return shuffled partsMetadata depending on distribution. diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 8b746b2a1..f0991d9d2 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -19,10 +19,10 @@ package cmd import ( "context" "sort" - "sync" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/bpool" + "github.com/minio/minio/pkg/sync/errgroup" ) // XL constants. @@ -71,34 +71,31 @@ func (d byDiskTotal) Less(i, j int) bool { // getDisksInfo - fetch disks info across all other storage API. func getDisksInfo(disks []StorageAPI) (disksInfo []DiskInfo, onlineDisks int, offlineDisks int) { disksInfo = make([]DiskInfo, len(disks)) - errs := make([]error, len(disks)) - var wg sync.WaitGroup - for i, storageDisk := range disks { - if storageDisk == nil { - // Storage disk is empty, perhaps ignored disk or not available. - errs[i] = errDiskNotFound - continue - } - wg.Add(1) - go func(id int, sDisk StorageAPI) { - defer wg.Done() - info, err := sDisk.DiskInfo() + + g := errgroup.WithNErrs(len(disks)) + for index := range disks { + index := index + g.Go(func() error { + if disks[index] == nil { + // Storage disk is empty, perhaps ignored disk or not available. + return errDiskNotFound + } + info, err := disks[index].DiskInfo() if err != nil { - reqInfo := (&logger.ReqInfo{}).AppendTags("disk", sDisk.String()) + if IsErr(err, baseErrs...) { + return err + } + reqInfo := (&logger.ReqInfo{}).AppendTags("disk", disks[index].String()) ctx := logger.SetReqInfo(context.Background(), reqInfo) logger.LogIf(ctx, err) - if IsErr(err, baseErrs...) { - errs[id] = err - return - } } - disksInfo[id] = info - }(i, storageDisk) + disksInfo[index] = info + return nil + }, index) } - // Wait for the routines. - wg.Wait() - for _, err := range errs { + // Wait for the routines. + for _, err := range g.Wait() { if err != nil { offlineDisks++ continue