Use errgroups instead of sync.WaitGroup as needed (#8354)

This commit is contained in:
Harshavardhana 2019-10-14 09:44:51 -07:00 committed by GitHub
parent c33bae057f
commit 68a519a468
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 512 additions and 601 deletions

View File

@ -71,8 +71,8 @@ func initFederatorBackend(objLayer ObjectLayer) {
// Add buckets that are not registered with the DNS
g := errgroup.WithNErrs(len(b))
for index := range b {
index := index
bucketSet.Add(b[index].Name)
index := index
g.Go(func() error {
r, gerr := globalDNSConfig.Get(b[index].Name)
if gerr != nil {
@ -99,7 +99,6 @@ func initFederatorBackend(objLayer ObjectLayer) {
// Remove buckets that are in DNS for this server, but aren't local
for index := range dnsBuckets {
index := index
g.Go(func() error {
// This is a local bucket that exists, so we can continue
if bucketSet.Contains(dnsBuckets[index].Key) {

View File

@ -16,6 +16,7 @@ import (
"github.com/minio/minio/cmd/config/cache"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/color"
"github.com/minio/minio/pkg/sync/errgroup"
"github.com/minio/minio/pkg/wildcard"
)
@ -450,36 +451,32 @@ func checkAtimeSupport(dir string) (err error) {
func (c *cacheObjects) migrateCacheFromV1toV2(ctx context.Context) {
logStartupMessage(color.Blue("Cache migration initiated ...."))
var wg sync.WaitGroup
errs := make([]error, len(c.cache))
for i, dc := range c.cache {
g := errgroup.WithNErrs(len(c.cache))
for index, dc := range c.cache {
if dc == nil {
continue
}
wg.Add(1)
index := index
g.Go(func() error {
// start migration from V1 to V2
go func(ctx context.Context, dc *diskCache, errs []error, idx int) {
defer wg.Done()
if err := migrateOldCache(ctx, dc); err != nil {
errs[idx] = err
logger.LogIf(ctx, err)
return
return migrateOldCache(ctx, c.cache[index])
}, index)
}
// start purge routine after migration completes.
go dc.purge()
}(ctx, dc, errs, i)
}
wg.Wait()
errCnt := 0
for _, err := range errs {
for index, err := range g.Wait() {
if err != nil {
errCnt++
logger.LogIf(ctx, err)
continue
}
go c.cache[index].purge()
}
if errCnt > 0 {
return
}
// update migration status
c.migMutex.Lock()
defer c.migMutex.Unlock()

View File

@ -23,12 +23,12 @@ import (
"fmt"
"io/ioutil"
"reflect"
"sync"
"encoding/hex"
humanize "github.com/dustin/go-humanize"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/sync/errgroup"
sha256 "github.com/minio/sha256-simd"
)
@ -315,40 +315,30 @@ func quorumUnformattedDisks(errs []error) bool {
// loadFormatXLAll - load all format config from all input disks in parallel.
func loadFormatXLAll(storageDisks []StorageAPI) ([]*formatXLV3, []error) {
// Initialize sync waitgroup.
var wg sync.WaitGroup
// Initialize list of errors.
var sErrs = make([]error, len(storageDisks))
g := errgroup.WithNErrs(len(storageDisks))
// Initialize format configs.
var formats = make([]*formatXLV3, len(storageDisks))
// Load format from each disk in parallel
for index, disk := range storageDisks {
if disk == nil {
sErrs[index] = errDiskNotFound
continue
for index := range storageDisks {
index := index
g.Go(func() error {
if storageDisks[index] == nil {
return errDiskNotFound
}
wg.Add(1)
// Launch go-routine per disk.
go func(index int, disk StorageAPI) {
defer wg.Done()
format, lErr := loadFormatXL(disk)
if lErr != nil {
sErrs[index] = lErr
return
format, err := loadFormatXL(storageDisks[index])
if err != nil {
return err
}
formats[index] = format
}(index, disk)
return nil
}, index)
}
// Wait for all go-routines to finish.
wg.Wait()
// Return all formats and nil
return formats, sErrs
// Return all formats and errors if any.
return formats, g.Wait()
}
func saveFormatXL(disk StorageAPI, format interface{}) error {
@ -643,28 +633,22 @@ func formatXLV3Check(reference *formatXLV3, format *formatXLV3) error {
// saveFormatXLAll - populates `format.json` on disks in its order.
func saveFormatXLAll(ctx context.Context, storageDisks []StorageAPI, formats []*formatXLV3) error {
var errs = make([]error, len(storageDisks))
var wg sync.WaitGroup
g := errgroup.WithNErrs(len(storageDisks))
// Write `format.json` to all disks.
for index, disk := range storageDisks {
if formats[index] == nil || disk == nil {
errs[index] = errDiskNotFound
continue
for index := range storageDisks {
index := index
g.Go(func() error {
if formats[index] == nil || storageDisks[index] == nil {
return errDiskNotFound
}
wg.Add(1)
go func(index int, disk StorageAPI, format *formatXLV3) {
defer wg.Done()
errs[index] = saveFormatXL(disk, format)
}(index, disk, formats[index])
return saveFormatXL(storageDisks[index], formats[index])
}, index)
}
// Wait for the routines to finish.
wg.Wait()
writeQuorum := len(storageDisks)/2 + 1
return reduceWriteQuorumErrs(ctx, errs, nil, writeQuorum)
// Wait for the routines to finish.
return reduceWriteQuorumErrs(ctx, g.Wait(), nil, writeQuorum)
}
// relinquishes the underlying connection for all storage disks.
@ -682,17 +666,19 @@ func closeStorageDisks(storageDisks []StorageAPI) {
func initStorageDisksWithErrors(endpoints EndpointList) ([]StorageAPI, []error) {
// Bootstrap disks.
storageDisks := make([]StorageAPI, len(endpoints))
errs := make([]error, len(endpoints))
var wg sync.WaitGroup
for index, endpoint := range endpoints {
wg.Add(1)
go func(index int, endpoint Endpoint) {
defer wg.Done()
storageDisks[index], errs[index] = newStorageAPI(endpoint)
}(index, endpoint)
g := errgroup.WithNErrs(len(endpoints))
for index := range endpoints {
index := index
g.Go(func() error {
storageDisk, err := newStorageAPI(endpoints[index])
if err != nil {
return err
}
wg.Wait()
return storageDisks, errs
storageDisks[index] = storageDisk
return nil
}, index)
}
return storageDisks, g.Wait()
}
// formatXLV3ThisEmpty - find out if '.This' field is empty
@ -793,31 +779,24 @@ func initFormatXLMetaVolume(storageDisks []StorageAPI, formats []*formatXLV3) er
// This happens for the first time, but keep this here since this
// is the only place where it can be made expensive optimizing all
// other calls. Create minio meta volume, if it doesn't exist yet.
var wg sync.WaitGroup
// Initialize errs to collect errors inside go-routine.
var errs = make([]error, len(storageDisks))
g := errgroup.WithNErrs(len(storageDisks))
// Initialize all disks in parallel.
for index, disk := range storageDisks {
if formats[index] == nil || disk == nil {
for index := range storageDisks {
index := index
g.Go(func() error {
if formats[index] == nil || storageDisks[index] == nil {
// Ignore create meta volume on disks which are not found.
continue
return nil
}
wg.Add(1)
go func(index int, disk StorageAPI) {
// Indicate this wait group is done.
defer wg.Done()
errs[index] = makeFormatXLMetaVolumes(disk)
}(index, disk)
return makeFormatXLMetaVolumes(storageDisks[index])
}, index)
}
// Wait for all cleanup to finish.
wg.Wait()
// Return upon first error.
for _, err := range errs {
for _, err := range g.Wait() {
if err == nil {
continue
}

View File

@ -38,6 +38,7 @@ import (
"github.com/minio/minio/pkg/madmin"
xnet "github.com/minio/minio/pkg/net"
"github.com/minio/minio/pkg/policy"
"github.com/minio/minio/pkg/sync/errgroup"
)
// NotificationSys - notification system.
@ -72,24 +73,6 @@ type NotificationPeerErr struct {
Err error // Error returned by the remote peer for an rpc call
}
// DeleteBucket - calls DeleteBucket RPC call on all peers.
func (sys *NotificationSys) DeleteBucket(ctx context.Context, bucketName string) {
go func() {
var wg sync.WaitGroup
for _, client := range sys.peerClients {
wg.Add(1)
go func(client *peerRESTClient) {
defer wg.Done()
if err := client.DeleteBucket(bucketName); err != nil {
logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name)
logger.LogIf(ctx, err)
}
}(client)
}
wg.Wait()
}()
}
// A NotificationGroup is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
@ -438,43 +421,44 @@ func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerE
// ServerInfo - calls ServerInfo RPC call on all peers.
func (sys *NotificationSys) ServerInfo(ctx context.Context) []ServerInfo {
serverInfo := make([]ServerInfo, len(sys.peerClients))
var wg sync.WaitGroup
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(idx int, client *peerRESTClient) {
defer wg.Done()
index := index
g.Go(func() error {
// Try to fetch serverInfo remotely in three attempts.
for i := 0; i < 3; i++ {
info, err := client.ServerInfo()
if err == nil {
serverInfo[idx] = ServerInfo{
Addr: client.host.String(),
Data: &info,
serverInfo[index] = ServerInfo{
Addr: sys.peerClients[index].host.String(),
}
return
}
serverInfo[idx] = ServerInfo{
Addr: client.host.String(),
Data: &info,
Error: err.Error(),
info, err := sys.peerClients[index].ServerInfo()
if err != nil {
serverInfo[index].Error = err.Error()
}
serverInfo[index].Data = &info
// Last iteration log the error.
if i == 2 {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogIf(ctx, err)
return err
}
// Wait for one second and no need wait after last attempt.
if i < 2 {
time.Sleep(1 * time.Second)
}
}
}(index, client)
return nil
}, index)
}
for index, err := range g.Wait() {
if err != nil {
addr := sys.peerClients[index].host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr)
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogIf(ctx, err)
}
}
wg.Wait()
return serverInfo
}
@ -482,166 +466,163 @@ func (sys *NotificationSys) ServerInfo(ctx context.Context) []ServerInfo {
func (sys *NotificationSys) GetLocks(ctx context.Context) []*PeerLocks {
locksResp := make([]*PeerLocks, len(sys.peerClients))
var wg sync.WaitGroup
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(idx int, client *peerRESTClient) {
defer wg.Done()
index := index
g.Go(func() error {
// Try to fetch serverInfo remotely in three attempts.
for i := 0; i < 3; i++ {
serverLocksResp, err := client.GetLocks()
serverLocksResp, err := sys.peerClients[index].GetLocks()
if err == nil {
locksResp[idx] = &PeerLocks{
Addr: client.host.String(),
locksResp[index] = &PeerLocks{
Addr: sys.peerClients[index].host.String(),
Locks: serverLocksResp,
}
return
return nil
}
// Last iteration log the error.
if i == 2 {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogOnceIf(ctx, err, client.host.String())
return err
}
// Wait for one second and no need wait after last attempt.
if i < 2 {
time.Sleep(1 * time.Second)
}
}
}(index, client)
return nil
}, index)
}
for index, err := range g.Wait() {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress",
sys.peerClients[index].host.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogOnceIf(ctx, err, sys.peerClients[index].host.String())
}
wg.Wait()
return locksResp
}
// SetBucketPolicy - calls SetBucketPolicy RPC call on all peers.
func (sys *NotificationSys) SetBucketPolicy(ctx context.Context, bucketName string, bucketPolicy *policy.Policy) {
go func() {
var wg sync.WaitGroup
for _, client := range sys.peerClients {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(client *peerRESTClient) {
defer wg.Done()
if err := client.SetBucketPolicy(bucketName, bucketPolicy); err != nil {
logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name)
logger.LogIf(ctx, err)
client := client
ng.Go(ctx, func() error {
return client.SetBucketPolicy(bucketName, bucketPolicy)
}, idx, *client.host)
}
}(client)
ng.Wait()
}()
}
wg.Wait()
// DeleteBucket - calls DeleteBucket RPC call on all peers.
func (sys *NotificationSys) DeleteBucket(ctx context.Context, bucketName string) {
go func() {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(ctx, func() error {
return client.DeleteBucket(bucketName)
}, idx, *client.host)
}
ng.Wait()
}()
}
// RemoveBucketPolicy - calls RemoveBucketPolicy RPC call on all peers.
func (sys *NotificationSys) RemoveBucketPolicy(ctx context.Context, bucketName string) {
go func() {
var wg sync.WaitGroup
for _, client := range sys.peerClients {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(client *peerRESTClient) {
defer wg.Done()
if err := client.RemoveBucketPolicy(bucketName); err != nil {
logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name)
logger.LogIf(ctx, err)
client := client
ng.Go(ctx, func() error {
return client.RemoveBucketPolicy(bucketName)
}, idx, *client.host)
}
}(client)
}
wg.Wait()
ng.Wait()
}()
}
// SetBucketLifecycle - calls SetBucketLifecycle on all peers.
func (sys *NotificationSys) SetBucketLifecycle(ctx context.Context, bucketName string, bucketLifecycle *lifecycle.Lifecycle) {
func (sys *NotificationSys) SetBucketLifecycle(ctx context.Context, bucketName string,
bucketLifecycle *lifecycle.Lifecycle) {
go func() {
var wg sync.WaitGroup
for _, client := range sys.peerClients {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(client *peerRESTClient) {
defer wg.Done()
if err := client.SetBucketLifecycle(bucketName, bucketLifecycle); err != nil {
logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name)
logger.LogIf(ctx, err)
client := client
ng.Go(ctx, func() error {
return client.SetBucketLifecycle(bucketName, bucketLifecycle)
}, idx, *client.host)
}
}(client)
}
wg.Wait()
ng.Wait()
}()
}
// RemoveBucketLifecycle - calls RemoveLifecycle on all peers.
func (sys *NotificationSys) RemoveBucketLifecycle(ctx context.Context, bucketName string) {
go func() {
var wg sync.WaitGroup
for _, client := range sys.peerClients {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(client *peerRESTClient) {
defer wg.Done()
if err := client.RemoveBucketLifecycle(bucketName); err != nil {
logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name)
logger.LogIf(ctx, err)
client := client
ng.Go(ctx, func() error {
return client.RemoveBucketLifecycle(bucketName)
}, idx, *client.host)
}
}(client)
}
wg.Wait()
ng.Wait()
}()
}
// PutBucketNotification - calls PutBucketNotification RPC call on all peers.
func (sys *NotificationSys) PutBucketNotification(ctx context.Context, bucketName string, rulesMap event.RulesMap) {
go func() {
var wg sync.WaitGroup
for _, client := range sys.peerClients {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(client *peerRESTClient, rulesMap event.RulesMap) {
defer wg.Done()
if err := client.PutBucketNotification(bucketName, rulesMap); err != nil {
logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name)
logger.LogIf(ctx, err)
client := client
ng.Go(ctx, func() error {
return client.PutBucketNotification(bucketName, rulesMap)
}, idx, *client.host)
}
}(client, rulesMap.Clone())
}
wg.Wait()
ng.Wait()
}()
}
// ListenBucketNotification - calls ListenBucketNotification RPC call on all peers.
func (sys *NotificationSys) ListenBucketNotification(ctx context.Context, bucketName string, eventNames []event.Name, pattern string,
targetID event.TargetID, localPeer xnet.Host) {
func (sys *NotificationSys) ListenBucketNotification(ctx context.Context, bucketName string,
eventNames []event.Name, pattern string, targetID event.TargetID, localPeer xnet.Host) {
go func() {
var wg sync.WaitGroup
for _, client := range sys.peerClients {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(client *peerRESTClient) {
defer wg.Done()
if err := client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer); err != nil {
logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name)
logger.LogIf(ctx, err)
client := client
ng.Go(ctx, func() error {
return client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer)
}, idx, *client.host)
}
}(client)
}
wg.Wait()
ng.Wait()
}()
}
@ -981,78 +962,90 @@ func (sys *NotificationSys) CollectNetPerfInfo(size int64) map[string][]ServerNe
// DrivePerfInfo - Drive speed (read and write) information
func (sys *NotificationSys) DrivePerfInfo(size int64) []madmin.ServerDrivesPerfInfo {
reply := make([]madmin.ServerDrivesPerfInfo, len(sys.peerClients))
var wg sync.WaitGroup
for i, client := range sys.peerClients {
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(client *peerRESTClient, idx int) {
defer wg.Done()
di, err := client.DrivePerfInfo(size)
index := index
g.Go(func() error {
var err error
reply[index], err = sys.peerClients[index].DrivePerfInfo(size)
return err
}, index)
}
for index, err := range g.Wait() {
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", client.host.String())
addr := sys.peerClients[index].host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
di.Addr = client.host.String()
di.Error = err.Error()
reply[index].Addr = addr
reply[index].Error = err.Error()
}
reply[idx] = di
}(client, i)
}
wg.Wait()
return reply
}
// MemUsageInfo - Mem utilization information
func (sys *NotificationSys) MemUsageInfo() []ServerMemUsageInfo {
reply := make([]ServerMemUsageInfo, len(sys.peerClients))
var wg sync.WaitGroup
for i, client := range sys.peerClients {
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(client *peerRESTClient, idx int) {
defer wg.Done()
memi, err := client.MemUsageInfo()
index := index
g.Go(func() error {
var err error
reply[index], err = sys.peerClients[index].MemUsageInfo()
return err
}, index)
}
for index, err := range g.Wait() {
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", client.host.String())
addr := sys.peerClients[index].host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
memi.Addr = client.host.String()
memi.Error = err.Error()
reply[index].Addr = addr
reply[index].Error = err.Error()
}
reply[idx] = memi
}(client, i)
}
wg.Wait()
return reply
}
// CPULoadInfo - CPU utilization information
func (sys *NotificationSys) CPULoadInfo() []ServerCPULoadInfo {
reply := make([]ServerCPULoadInfo, len(sys.peerClients))
var wg sync.WaitGroup
for i, client := range sys.peerClients {
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(client *peerRESTClient, idx int) {
defer wg.Done()
cpui, err := client.CPULoadInfo()
index := index
g.Go(func() error {
var err error
reply[index], err = sys.peerClients[index].CPULoadInfo()
return err
}, index)
}
for index, err := range g.Wait() {
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", client.host.String())
addr := sys.peerClients[index].host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
cpui.Addr = client.host.String()
cpui.Error = err.Error()
reply[index].Addr = addr
reply[index].Error = err.Error()
}
reply[idx] = cpui
}(client, i)
}
wg.Wait()
return reply
}

View File

@ -129,8 +129,8 @@ func setupTestReadDirGeneric(t *testing.T) (testResults []result) {
// Test to read non-empty directory with symlinks.
func setupTestReadDirSymlink(t *testing.T) (testResults []result) {
if runtime.GOOS != "Windows" {
t.Log("symlinks not available on windows")
if runtime.GOOS == globalWindowsOSName {
t.Skip("symlinks not available on windows")
return nil
}
dir := mustSetupDir(t)

View File

@ -306,19 +306,21 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP
// StorageInfo - combines output of StorageInfo across all erasure coded object sets.
func (s *xlSets) StorageInfo(ctx context.Context) StorageInfo {
var storageInfo StorageInfo
var wg sync.WaitGroup
storageInfos := make([]StorageInfo, len(s.sets))
storageInfo.Backend.Type = BackendErasure
for index, set := range s.sets {
wg.Add(1)
go func(id int, set *xlObjects) {
defer wg.Done()
storageInfos[id] = set.StorageInfo(ctx)
}(index, set)
g := errgroup.WithNErrs(len(s.sets))
for index := range s.sets {
index := index
g.Go(func() error {
storageInfos[index] = s.sets[index].StorageInfo(ctx)
return nil
}, index)
}
// Wait for the go routines.
wg.Wait()
g.Wait()
for _, lstorageInfo := range storageInfos {
storageInfo.Used += lstorageInfo.Used
@ -458,11 +460,12 @@ func undoMakeBucketSets(bucket string, sets []*xlObjects, errs []error) {
// Undo previous make bucket entry on all underlying sets.
for index := range sets {
index := index
if errs[index] == nil {
g.Go(func() error {
if errs[index] == nil {
return sets[index].DeleteBucket(context.Background(), bucket)
}, index)
}
return nil
}, index)
}
// Wait for all delete bucket to finish.
@ -618,11 +621,12 @@ func undoDeleteBucketSets(bucket string, sets []*xlObjects, errs []error) {
// Undo previous delete bucket on all underlying sets.
for index := range sets {
index := index
if errs[index] == nil {
g.Go(func() error {
if errs[index] == nil {
return sets[index].MakeBucketWithLocation(context.Background(), bucket, "")
}, index)
}
return nil
}, index)
}
g.Wait()
@ -742,19 +746,24 @@ func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, destBucke
func listDirSetsFactory(ctx context.Context, sets ...*xlObjects) ListDirFunc {
listDirInternal := func(bucket, prefixDir, prefixEntry string, disks []StorageAPI) (mergedEntries []string) {
var diskEntries = make([][]string, len(disks))
var wg sync.WaitGroup
g := errgroup.WithNErrs(len(disks))
for index, disk := range disks {
if disk == nil {
continue
}
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
diskEntries[index], _ = disk.ListDir(bucket, prefixDir, -1, xlMetaJSONFile)
}(index, disk)
index := index
g.Go(func() error {
var err error
diskEntries[index], err = disks[index].ListDir(bucket, prefixDir, -1, xlMetaJSONFile)
return err
}, index)
}
wg.Wait()
for _, err := range g.Wait() {
if err != nil {
logger.LogIf(ctx, err)
}
}
// Find elements in entries which are not in mergedEntries
for _, entries := range diskEntries {
@ -1405,21 +1414,21 @@ func isTestSetup(infos []DiskInfo, errs []error) bool {
func getAllDiskInfos(storageDisks []StorageAPI) ([]DiskInfo, []error) {
infos := make([]DiskInfo, len(storageDisks))
errs := make([]error, len(storageDisks))
var wg sync.WaitGroup
for i := range storageDisks {
if storageDisks[i] == nil {
errs[i] = errDiskNotFound
continue
g := errgroup.WithNErrs(len(storageDisks))
for index := range storageDisks {
index := index
g.Go(func() error {
var err error
if storageDisks[index] != nil {
infos[index], err = storageDisks[index].DiskInfo()
} else {
// Disk not found.
err = errDiskNotFound
}
wg.Add(1)
go func(i int) {
defer wg.Done()
infos[i], errs[i] = storageDisks[i].DiskInfo()
}(i)
return err
}, index)
}
wg.Wait()
return infos, errs
return infos, g.Wait()
}
// Mark root disks as down so as not to heal them.

View File

@ -19,12 +19,12 @@ package cmd
import (
"context"
"sort"
"sync"
"github.com/minio/minio-go/v6/pkg/s3utils"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/lifecycle"
"github.com/minio/minio/pkg/policy"
"github.com/minio/minio/pkg/sync/errgroup"
)
// list all errors that can be ignore in a bucket operation.
@ -42,83 +42,71 @@ func (xl xlObjects) MakeBucketWithLocation(ctx context.Context, bucket, location
return BucketNameInvalid{Bucket: bucket}
}
// Initialize sync waitgroup.
var wg sync.WaitGroup
storageDisks := xl.getDisks()
// Initialize list of errors.
var dErrs = make([]error, len(xl.getDisks()))
g := errgroup.WithNErrs(len(storageDisks))
// Make a volume entry on all underlying storage disks.
for index, disk := range xl.getDisks() {
if disk == nil {
dErrs[index] = errDiskNotFound
continue
}
wg.Add(1)
// Make a volume inside a go-routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
err := disk.MakeVol(bucket)
if err != nil {
for index := range storageDisks {
index := index
g.Go(func() error {
if storageDisks[index] != nil {
if err := storageDisks[index].MakeVol(bucket); err != nil {
if err != errVolumeExists {
logger.LogIf(ctx, err)
}
dErrs[index] = err
return err
}
}(index, disk)
return nil
}
return errDiskNotFound
}, index)
}
// Wait for all make vol to finish.
wg.Wait()
writeQuorum := len(xl.getDisks())/2 + 1
err := reduceWriteQuorumErrs(ctx, dErrs, bucketOpIgnoredErrs, writeQuorum)
writeQuorum := len(storageDisks)/2 + 1
err := reduceWriteQuorumErrs(ctx, g.Wait(), bucketOpIgnoredErrs, writeQuorum)
if err == errXLWriteQuorum {
// Purge successfully created buckets if we don't have writeQuorum.
undoMakeBucket(xl.getDisks(), bucket)
undoMakeBucket(storageDisks, bucket)
}
return toObjectErr(err, bucket)
}
func (xl xlObjects) undoDeleteBucket(bucket string) {
// Initialize sync waitgroup.
var wg sync.WaitGroup
func undoDeleteBucket(storageDisks []StorageAPI, bucket string) {
g := errgroup.WithNErrs(len(storageDisks))
// Undo previous make bucket entry on all underlying storage disks.
for index, disk := range xl.getDisks() {
if disk == nil {
for index := range storageDisks {
if storageDisks[index] == nil {
continue
}
wg.Add(1)
// Delete a bucket inside a go-routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
_ = disk.MakeVol(bucket)
}(index, disk)
index := index
g.Go(func() error {
_ = storageDisks[index].MakeVol(bucket)
return nil
}, index)
}
// Wait for all make vol to finish.
wg.Wait()
g.Wait()
}
// undo make bucket operation upon quorum failure.
func undoMakeBucket(storageDisks []StorageAPI, bucket string) {
// Initialize sync waitgroup.
var wg sync.WaitGroup
g := errgroup.WithNErrs(len(storageDisks))
// Undo previous make bucket entry on all underlying storage disks.
for index, disk := range storageDisks {
if disk == nil {
for index := range storageDisks {
if storageDisks[index] == nil {
continue
}
wg.Add(1)
// Delete a bucket inside a go-routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
_ = disk.DeleteVol(bucket)
}(index, disk)
index := index
g.Go(func() error {
_ = storageDisks[index].DeleteVol(bucket)
return nil
}, index)
}
// Wait for all make vol to finish.
wg.Wait()
g.Wait()
}
// getBucketInfo - returns the BucketInfo from one of the load balanced disks.
@ -245,42 +233,34 @@ func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error {
defer bucketLock.Unlock()
// Collect if all disks report volume not found.
var wg sync.WaitGroup
var dErrs = make([]error, len(xl.getDisks()))
// Remove a volume entry on all underlying storage disks.
storageDisks := xl.getDisks()
for index, disk := range storageDisks {
if disk == nil {
dErrs[index] = errDiskNotFound
continue
}
wg.Add(1)
// Delete volume inside a go-routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
// Attempt to delete bucket.
err := disk.DeleteVol(bucket)
if err != nil {
dErrs[index] = err
return
}
// Cleanup all the previously incomplete multiparts.
err = cleanupDir(ctx, disk, minioMetaMultipartBucket, bucket)
if err != nil && err != errVolumeNotFound {
dErrs[index] = err
g := errgroup.WithNErrs(len(storageDisks))
for index := range storageDisks {
index := index
g.Go(func() error {
if storageDisks[index] != nil {
if err := storageDisks[index].DeleteVol(bucket); err != nil {
return err
}
}(index, disk)
err := cleanupDir(ctx, storageDisks[index], minioMetaMultipartBucket, bucket)
if err != nil && err != errVolumeNotFound {
return err
}
return nil
}
return errDiskNotFound
}, index)
}
// Wait for all the delete vols to finish.
wg.Wait()
dErrs := g.Wait()
writeQuorum := len(xl.getDisks())/2 + 1
writeQuorum := len(storageDisks)/2 + 1
err := reduceWriteQuorumErrs(ctx, dErrs, bucketOpIgnoredErrs, writeQuorum)
if err == errXLWriteQuorum {
xl.undoDeleteBucket(bucket)
undoDeleteBucket(storageDisks, bucket)
}
if err != nil {
return toObjectErr(err, bucket)

View File

@ -19,7 +19,8 @@ package cmd
import (
"context"
"path"
"sync"
"github.com/minio/minio/pkg/sync/errgroup"
)
// getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice.
@ -53,35 +54,33 @@ func (xl xlObjects) parentDirIsObject(ctx context.Context, bucket, parent string
// isObject - returns `true` if the prefix is an object i.e if
// `xl.json` exists at the leaf, false otherwise.
func (xl xlObjects) isObject(bucket, prefix string) (ok bool) {
var errs = make([]error, len(xl.getDisks()))
var wg sync.WaitGroup
for index, disk := range xl.getDisks() {
storageDisks := xl.getDisks()
g := errgroup.WithNErrs(len(storageDisks))
for index, disk := range storageDisks {
if disk == nil {
continue
}
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
index := index
g.Go(func() error {
// Check if 'prefix' is an object on this 'disk', else continue the check the next disk
fi, err := disk.StatFile(bucket, path.Join(prefix, xlMetaJSONFile))
fi, err := storageDisks[index].StatFile(bucket, pathJoin(prefix, xlMetaJSONFile))
if err != nil {
errs[index] = err
return
return err
}
if fi.Size == 0 {
errs[index] = errCorruptedFormat
return
return errCorruptedFormat
}
}(index, disk)
return nil
}, index)
}
wg.Wait()
// NOTE: Observe we are not trying to read `xl.json` and figure out the actual
// quorum intentionally, but rely on the default case scenario. Actual quorum
// verification will happen by top layer by using getObjectInfo() and will be
// ignored if necessary.
readQuorum := len(xl.getDisks()) / 2
readQuorum := len(storageDisks) / 2
return reduceReadQuorumErrs(context.Background(), errs, objectOpIgnoredErrs, readQuorum) == nil
return reduceReadQuorumErrs(context.Background(), g.Wait(), objectOpIgnoredErrs, readQuorum) == nil
}

View File

@ -20,11 +20,11 @@ import (
"context"
"fmt"
"io"
"sync"
"time"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/madmin"
"github.com/minio/minio/pkg/sync/errgroup"
)
func (xl xlObjects) ReloadFormat(ctx context.Context, dryRun bool) error {
@ -57,40 +57,31 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, w
dryRun bool) (res madmin.HealResultItem, err error) {
// Initialize sync waitgroup.
var wg sync.WaitGroup
// Initialize list of errors.
var dErrs = make([]error, len(storageDisks))
g := errgroup.WithNErrs(len(storageDisks))
// Disk states slices
beforeState := make([]string, len(storageDisks))
afterState := make([]string, len(storageDisks))
// Make a volume entry on all underlying storage disks.
for index, disk := range storageDisks {
if disk == nil {
dErrs[index] = errDiskNotFound
for index := range storageDisks {
index := index
g.Go(func() error {
if storageDisks[index] == nil {
beforeState[index] = madmin.DriveStateOffline
afterState[index] = madmin.DriveStateOffline
continue
return errDiskNotFound
}
wg.Add(1)
// Make a volume inside a go-routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
if _, serr := disk.StatVol(bucket); serr != nil {
if _, serr := storageDisks[index].StatVol(bucket); serr != nil {
if serr == errDiskNotFound {
beforeState[index] = madmin.DriveStateOffline
afterState[index] = madmin.DriveStateOffline
dErrs[index] = serr
return
return serr
}
if serr != errVolumeNotFound {
beforeState[index] = madmin.DriveStateCorrupt
afterState[index] = madmin.DriveStateCorrupt
dErrs[index] = serr
return
return serr
}
beforeState[index] = madmin.DriveStateMissing
@ -98,23 +89,22 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, w
// mutate only if not a dry-run
if dryRun {
return
return nil
}
makeErr := disk.MakeVol(bucket)
dErrs[index] = makeErr
makeErr := storageDisks[index].MakeVol(bucket)
if makeErr == nil {
afterState[index] = madmin.DriveStateOk
}
return
return makeErr
}
beforeState[index] = madmin.DriveStateOk
afterState[index] = madmin.DriveStateOk
}(index, disk)
return nil
}, index)
}
// Wait for all make vol to finish.
wg.Wait()
errs := g.Wait()
// Initialize heal result info
res = madmin.HealResultItem{
@ -122,13 +112,13 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, w
Bucket: bucket,
DiskCount: len(storageDisks),
}
for i, before := range beforeState {
for i := range beforeState {
if storageDisks[i] != nil {
drive := storageDisks[i].String()
res.Before.Drives = append(res.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: drive,
State: before,
State: beforeState[i],
})
res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{
UUID: "",
@ -138,7 +128,7 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, w
}
}
reducedErr := reduceWriteQuorumErrs(ctx, dErrs, bucketOpIgnoredErrs, writeQuorum)
reducedErr := reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum)
if reducedErr == errXLWriteQuorum {
// Purge successfully created buckets if we don't have writeQuorum.
undoMakeBucket(storageDisks, bucket)
@ -597,29 +587,25 @@ func defaultHealResult(latestXLMeta xlMetaV1, storageDisks []StorageAPI, errs []
// Stat all directories.
func statAllDirs(ctx context.Context, storageDisks []StorageAPI, bucket, prefix string) []error {
var errs = make([]error, len(storageDisks))
var wg sync.WaitGroup
g := errgroup.WithNErrs(len(storageDisks))
for index, disk := range storageDisks {
if disk == nil {
continue
}
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
entries, err := disk.ListDir(bucket, prefix, 1, "")
index := index
g.Go(func() error {
entries, err := storageDisks[index].ListDir(bucket, prefix, 1, "")
if err != nil {
errs[index] = err
return
return err
}
if len(entries) > 0 {
errs[index] = errVolumeNotEmpty
return
return errVolumeNotEmpty
}
}(index, disk)
return nil
}, index)
}
wg.Wait()
return errs
return g.Wait()
}
// ObjectDir is considered dangling/corrupted if any only

View File

@ -24,11 +24,11 @@ import (
"net/http"
"path"
"sort"
"sync"
"time"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/sync/errgroup"
"github.com/minio/sha256-simd"
)
@ -452,31 +452,23 @@ func renameXLMetadata(ctx context.Context, disks []StorageAPI, srcBucket, srcEnt
// writeUniqueXLMetadata - writes unique `xl.json` content for each disk in order.
func writeUniqueXLMetadata(ctx context.Context, disks []StorageAPI, bucket, prefix string, xlMetas []xlMetaV1, quorum int) ([]StorageAPI, error) {
var wg sync.WaitGroup
var mErrs = make([]error, len(disks))
g := errgroup.WithNErrs(len(disks))
// Start writing `xl.json` to all disks in parallel.
for index, disk := range disks {
if disk == nil {
mErrs[index] = errDiskNotFound
continue
for index := range disks {
index := index
g.Go(func() error {
if disks[index] == nil {
return errDiskNotFound
}
wg.Add(1)
// Pick one xlMeta for a disk at index.
xlMetas[index].Erasure.Index = index + 1
// Write `xl.json` in a routine.
go func(index int, disk StorageAPI, xlMeta xlMetaV1) {
defer wg.Done()
// Write unique `xl.json` for a disk at index.
mErrs[index] = writeXLMetadata(ctx, disk, bucket, prefix, xlMeta)
}(index, disk, xlMetas[index])
return writeXLMetadata(ctx, disks[index], bucket, prefix, xlMetas[index])
}, index)
}
// Wait for all the routines.
wg.Wait()
mErrs := g.Wait()
err := reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, quorum)
return evalDisks(disks, mErrs), err

View File

@ -24,12 +24,12 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/mimedb"
"github.com/minio/minio/pkg/sync/errgroup"
)
func (xl xlObjects) getUploadIDDir(bucket, object, uploadID string) string {
@ -57,21 +57,23 @@ func (xl xlObjects) checkUploadIDExists(ctx context.Context, bucket, object, upl
// Removes part given by partName belonging to a mulitpart upload from minioMetaBucket
func (xl xlObjects) removeObjectPart(bucket, object, uploadID, partName string) {
curpartPath := path.Join(bucket, object, uploadID, partName)
var wg sync.WaitGroup
for i, disk := range xl.getDisks() {
storageDisks := xl.getDisks()
g := errgroup.WithNErrs(len(storageDisks))
for index, disk := range storageDisks {
if disk == nil {
continue
}
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
index := index
g.Go(func() error {
// Ignoring failure to remove parts that weren't present in CompleteMultipartUpload
// requests. xl.json is the authoritative source of truth on which parts constitute
// the object. The presence of parts that don't belong in the object doesn't affect correctness.
_ = disk.DeleteFile(minioMetaMultipartBucket, curpartPath)
}(i, disk)
_ = storageDisks[index].DeleteFile(minioMetaMultipartBucket, curpartPath)
return nil
}, index)
}
wg.Wait()
g.Wait()
}
// statPart - returns fileInfo structure for a successful stat on part file.
@ -104,31 +106,29 @@ func (xl xlObjects) statPart(ctx context.Context, bucket, object, uploadID, part
// commitXLMetadata - commit `xl.json` from source prefix to destination prefix in the given slice of disks.
func commitXLMetadata(ctx context.Context, disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPrefix string, quorum int) ([]StorageAPI, error) {
var wg sync.WaitGroup
var mErrs = make([]error, len(disks))
srcJSONFile := path.Join(srcPrefix, xlMetaJSONFile)
dstJSONFile := path.Join(dstPrefix, xlMetaJSONFile)
g := errgroup.WithNErrs(len(disks))
// Rename `xl.json` to all disks in parallel.
for index, disk := range disks {
if disk == nil {
mErrs[index] = errDiskNotFound
continue
for index := range disks {
index := index
g.Go(func() error {
if disks[index] == nil {
return errDiskNotFound
}
wg.Add(1)
// Rename `xl.json` in a routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
// Delete any dangling directories.
defer disk.DeleteFile(srcBucket, srcPrefix)
defer disks[index].DeleteFile(srcBucket, srcPrefix)
// Renames `xl.json` from source prefix to destination prefix.
mErrs[index] = disk.RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile)
}(index, disk)
return disks[index].RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile)
}, index)
}
// Wait for all the routines.
wg.Wait()
mErrs := g.Wait()
err := reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, quorum)
return evalDisks(disks, mErrs), err

View File

@ -22,11 +22,11 @@ import (
"io"
"net/http"
"path"
"sync"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/mimedb"
"github.com/minio/minio/pkg/sync/errgroup"
)
// list all errors which can be ignored in object operations.
@ -34,25 +34,26 @@ var objectOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied)
// putObjectDir hints the bottom layer to create a new directory.
func (xl xlObjects) putObjectDir(ctx context.Context, bucket, object string, writeQuorum int) error {
var wg sync.WaitGroup
storageDisks := xl.getDisks()
g := errgroup.WithNErrs(len(storageDisks))
errs := make([]error, len(xl.getDisks()))
// Prepare object creation in all disks
for index, disk := range xl.getDisks() {
if disk == nil {
for index := range storageDisks {
if storageDisks[index] == nil {
continue
}
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
if err := disk.MakeVol(pathJoin(bucket, object)); err != nil && err != errVolumeExists {
errs[index] = err
index := index
g.Go(func() error {
err := storageDisks[index].MakeVol(pathJoin(bucket, object))
if err != nil && err != errVolumeExists {
return err
}
}(index, disk)
return nil
}, index)
}
wg.Wait()
return reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
return reduceWriteQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, writeQuorum)
}
/// Object Operations
@ -335,36 +336,34 @@ func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startO
}
// getObjectInfoDir - This getObjectInfo is specific to object directory lookup.
func (xl xlObjects) getObjectInfoDir(ctx context.Context, bucket, object string) (oi ObjectInfo, err error) {
var wg sync.WaitGroup
func (xl xlObjects) getObjectInfoDir(ctx context.Context, bucket, object string) (ObjectInfo, error) {
storageDisks := xl.getDisks()
g := errgroup.WithNErrs(len(storageDisks))
errs := make([]error, len(xl.getDisks()))
// Prepare object creation in a all disks
for index, disk := range xl.getDisks() {
for index, disk := range storageDisks {
if disk == nil {
continue
}
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
index := index
g.Go(func() error {
// Check if 'prefix' is an object on this 'disk'.
entries, err := disk.ListDir(bucket, object, 1, "")
entries, err := storageDisks[index].ListDir(bucket, object, 1, "")
if err != nil {
errs[index] = err
return
return err
}
if len(entries) > 0 {
// Not a directory if not empty.
errs[index] = errFileNotFound
return
return errFileNotFound
}
}(index, disk)
return nil
}, index)
}
wg.Wait()
readQuorum := len(xl.getDisks()) / 2
return dirObjectInfo(bucket, object, 0, map[string]string{}), reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum)
readQuorum := len(storageDisks) / 2
err := reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum)
return dirObjectInfo(bucket, object, 0, map[string]string{}), err
}
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
@ -424,7 +423,6 @@ func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string) (o
}
func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, errs []error) {
var wg sync.WaitGroup
// Undo rename object on disks where RenameFile succeeded.
// If srcEntry/dstEntry are objects then add a trailing slash to copy
@ -433,56 +431,51 @@ func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry str
srcEntry = retainSlash(srcEntry)
dstEntry = retainSlash(dstEntry)
}
g := errgroup.WithNErrs(len(disks))
for index, disk := range disks {
if disk == nil {
continue
}
// Undo rename object in parallel.
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
if errs[index] != nil {
return
index := index
g.Go(func() error {
if errs[index] == nil {
_ = disks[index].RenameFile(dstBucket, dstEntry, srcBucket, srcEntry)
}
_ = disk.RenameFile(dstBucket, dstEntry, srcBucket, srcEntry)
}(index, disk)
return nil
}, index)
}
wg.Wait()
g.Wait()
}
// rename - common function that renamePart and renameObject use to rename
// the respective underlying storage layer representations.
func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, writeQuorum int, ignoredErr []error) ([]StorageAPI, error) {
// Initialize sync waitgroup.
var wg sync.WaitGroup
// Initialize list of errors.
var errs = make([]error, len(disks))
if isDir {
dstEntry = retainSlash(dstEntry)
srcEntry = retainSlash(srcEntry)
}
g := errgroup.WithNErrs(len(disks))
// Rename file on all underlying storage disks.
for index, disk := range disks {
if disk == nil {
errs[index] = errDiskNotFound
continue
for index := range disks {
index := index
g.Go(func() error {
if disks[index] == nil {
return errDiskNotFound
}
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
if err := disk.RenameFile(srcBucket, srcEntry, dstBucket, dstEntry); err != nil {
if err := disks[index].RenameFile(srcBucket, srcEntry, dstBucket, dstEntry); err != nil {
if !IsErrIgnored(err, ignoredErr...) {
errs[index] = err
return err
}
}
}(index, disk)
return nil
}, index)
}
// Wait for all renames to finish.
wg.Wait()
errs := g.Wait()
// We can safely allow RenameFile errors up to len(xl.getDisks()) - writeQuorum
// otherwise return failure. Cleanup successful renames.
@ -744,39 +737,31 @@ func (xl xlObjects) deleteObject(ctx context.Context, bucket, object string, wri
}
}
// Initialize sync waitgroup.
var wg sync.WaitGroup
g := errgroup.WithNErrs(len(disks))
// Initialize list of errors.
var dErrs = make([]error, len(disks))
for index, disk := range disks {
if disk == nil {
dErrs[index] = errDiskNotFound
continue
for index := range disks {
index := index
g.Go(func() error {
if disks[index] == nil {
return errDiskNotFound
}
wg.Add(1)
go func(index int, disk StorageAPI, isDir bool) {
defer wg.Done()
var e error
var err error
if isDir {
// DeleteFile() simply tries to remove a directory
// and will succeed only if that directory is empty.
e = disk.DeleteFile(minioMetaTmpBucket, tmpObj)
err = disks[index].DeleteFile(minioMetaTmpBucket, tmpObj)
} else {
e = cleanupDir(ctx, disk, minioMetaTmpBucket, tmpObj)
err = cleanupDir(ctx, disks[index], minioMetaTmpBucket, tmpObj)
}
if e != nil && e != errVolumeNotFound {
dErrs[index] = e
if err != nil && err != errVolumeNotFound {
return err
}
}(index, disk, isDir)
return nil
}, index)
}
// Wait for all routines to finish.
wg.Wait()
// return errors if any during deletion
return reduceWriteQuorumErrs(ctx, dErrs, objectOpIgnoredErrs, writeQuorum)
return reduceWriteQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, writeQuorum)
}
// deleteObject - wrapper for delete object, deletes an object from

View File

@ -21,10 +21,10 @@ import (
"errors"
"hash/crc32"
"path"
"sync"
jsoniter "github.com/json-iterator/go"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/sync/errgroup"
)
// Returns number of errors that occurred the most (incl. nil) and the
@ -180,28 +180,23 @@ func readXLMeta(ctx context.Context, disk StorageAPI, bucket string, object stri
// Reads all `xl.json` metadata as a xlMetaV1 slice.
// Returns error slice indicating the failed metadata reads.
func readAllXLMetadata(ctx context.Context, disks []StorageAPI, bucket, object string) ([]xlMetaV1, []error) {
errs := make([]error, len(disks))
metadataArray := make([]xlMetaV1, len(disks))
var wg sync.WaitGroup
// Read `xl.json` parallelly across disks.
for index, disk := range disks {
if disk == nil {
errs[index] = errDiskNotFound
continue
}
wg.Add(1)
// Read `xl.json` in routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
metadataArray[index], errs[index] = readXLMeta(ctx, disk, bucket, object)
}(index, disk)
}
// Wait for all the routines to finish.
wg.Wait()
g := errgroup.WithNErrs(len(disks))
// Read `xl.json` parallelly across disks.
for index := range disks {
index := index
g.Go(func() (err error) {
if disks[index] == nil {
return errDiskNotFound
}
metadataArray[index], err = readXLMeta(ctx, disks[index], bucket, object)
return err
}, index)
}
// Return all the metadata.
return metadataArray, errs
return metadataArray, g.Wait()
}
// Return shuffled partsMetadata depending on distribution.

View File

@ -19,10 +19,10 @@ package cmd
import (
"context"
"sort"
"sync"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bpool"
"github.com/minio/minio/pkg/sync/errgroup"
)
// XL constants.
@ -71,34 +71,31 @@ func (d byDiskTotal) Less(i, j int) bool {
// getDisksInfo - fetch disks info across all other storage API.
func getDisksInfo(disks []StorageAPI) (disksInfo []DiskInfo, onlineDisks int, offlineDisks int) {
disksInfo = make([]DiskInfo, len(disks))
errs := make([]error, len(disks))
var wg sync.WaitGroup
for i, storageDisk := range disks {
if storageDisk == nil {
g := errgroup.WithNErrs(len(disks))
for index := range disks {
index := index
g.Go(func() error {
if disks[index] == nil {
// Storage disk is empty, perhaps ignored disk or not available.
errs[i] = errDiskNotFound
continue
return errDiskNotFound
}
wg.Add(1)
go func(id int, sDisk StorageAPI) {
defer wg.Done()
info, err := sDisk.DiskInfo()
info, err := disks[index].DiskInfo()
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("disk", sDisk.String())
if IsErr(err, baseErrs...) {
return err
}
reqInfo := (&logger.ReqInfo{}).AppendTags("disk", disks[index].String())
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
if IsErr(err, baseErrs...) {
errs[id] = err
return
}
disksInfo[index] = info
return nil
}, index)
}
disksInfo[id] = info
}(i, storageDisk)
}
// Wait for the routines.
wg.Wait()
for _, err := range errs {
// Wait for the routines.
for _, err := range g.Wait() {
if err != nil {
offlineDisks++
continue