Fix heal bucket deadlock after replacing disks (#5661)

Fixes #5659
This commit is contained in:
Harshavardhana
2018-03-16 15:09:31 -07:00
committed by kannappanr
parent 3145462ad2
commit f23944aed7
5 changed files with 49 additions and 30 deletions

View File

@@ -1,5 +1,5 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
* Minio Cloud Storage, (C) 2016, 2017, 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,22 +30,50 @@ func (xl xlObjects) HealFormat(ctx context.Context, dryRun bool) (madmin.HealRes
return madmin.HealResultItem{}, errors.Trace(NotImplemented{})
}
// checks for bucket if it exists in writeQuorum number of disks, this call
// is only used by healBucket().
func checkBucketExistsInQuorum(storageDisks []StorageAPI, bucketName string) (err error) {
var wg = &sync.WaitGroup{}
errs := make([]error, len(storageDisks))
// Prepare object creation in a all disks
for index, disk := range storageDisks {
if disk == nil {
continue
}
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
_, errs[index] = disk.StatVol(bucketName)
}(index, disk)
}
wg.Wait()
writeQuorum := len(storageDisks)/2 + 1
return reduceWriteQuorumErrs(errs, nil, writeQuorum)
}
// Heals a bucket if it doesn't exist on one of the disks, additionally
// also heals the missing entries for bucket metadata files
// `policy.json, notification.xml, listeners.json`.
func (xl xlObjects) HealBucket(ctx context.Context, bucket string, dryRun bool) (
results []madmin.HealResultItem, err error) {
if err = checkBucketExist(bucket, xl); err != nil {
return nil, err
storageDisks := xl.getDisks()
// Check if bucket doesn't exist in writeQuorum number of disks, if quorum
// number of disks returned that bucket does not exist we quickly return
// and do not proceed to heal.
if err = checkBucketExistsInQuorum(storageDisks, bucket); err != nil {
return results, err
}
// get write quorum for an object
writeQuorum := len(xl.getDisks())/2 + 1
writeQuorum := len(storageDisks)/2 + 1
// Heal bucket.
var result madmin.HealResultItem
result, err = healBucket(xl.getDisks(), bucket, writeQuorum, dryRun)
result, err = healBucket(storageDisks, bucket, writeQuorum, dryRun)
if err != nil {
return nil, err
}
@@ -84,17 +112,17 @@ func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int,
// Make a volume inside a go-routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
if _, err := disk.StatVol(bucket); err != nil {
if errors.Cause(err) == errDiskNotFound {
if _, serr := disk.StatVol(bucket); serr != nil {
if errors.Cause(serr) == errDiskNotFound {
beforeState[index] = madmin.DriveStateOffline
afterState[index] = madmin.DriveStateOffline
dErrs[index] = err
dErrs[index] = serr
return
}
if errors.Cause(err) != errVolumeNotFound {
if errors.Cause(serr) != errVolumeNotFound {
beforeState[index] = madmin.DriveStateCorrupt
afterState[index] = madmin.DriveStateCorrupt
dErrs[index] = err
dErrs[index] = serr
return
}