xl: Avoid possible race during bulk Multi Delete (#7644)

errs was passed to many goroutines but they are all allowed
to update errs if any error happens during deletion, which
can cause a data race.

This commit will avoid issuing bulk delete operations in parallel
to avoid the warning race.
This commit is contained in:
Anis Elleuch 2019-05-14 22:43:22 +01:00 committed by kannappanr
parent b3f22eac56
commit 9b4a81ee60

View File

@ -803,30 +803,22 @@ func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects
} }
} }
// Initialize sync waitgroup.
var wg = &sync.WaitGroup{}
// Initialize list of errors. // Initialize list of errors.
var opErrs = make([]error, len(disks)) var opErrs = make([]error, len(disks))
var delObjErrs = make([][]error, len(disks)) var delObjErrs = make([][]error, len(disks))
// Remove objects in bulk for each disk
for index, disk := range disks { for index, disk := range disks {
if disk == nil { if disk == nil {
opErrs[index] = errDiskNotFound opErrs[index] = errDiskNotFound
continue continue
} }
wg.Add(1) delObjErrs[index], opErrs[index] = cleanupObjectsBulk(ctx, disk, minioMetaTmpBucket, tmpObjs, errs)
go func(index int, disk StorageAPI) { if opErrs[index] == errVolumeNotFound {
defer wg.Done() opErrs[index] = nil
delObjErrs[index], opErrs[index] = cleanupObjectsBulk(ctx, disk, minioMetaTmpBucket, tmpObjs, errs) }
if opErrs[index] == errVolumeNotFound {
opErrs[index] = nil
}
}(index, disk)
} }
// Wait for all routines to finish.
wg.Wait()
// Return errors if any during deletion // Return errors if any during deletion
if err := reduceWriteQuorumErrs(ctx, opErrs, objectOpIgnoredErrs, len(xl.getDisks())/2+1); err != nil { if err := reduceWriteQuorumErrs(ctx, opErrs, objectOpIgnoredErrs, len(xl.getDisks())/2+1); err != nil {
return nil, err return nil, err