fix: healing buckets during pool expansion (#11224)

fixes #11209
This commit is contained in:
Harshavardhana 2021-01-05 13:24:22 -08:00 committed by GitHub
parent ad511b0eb8
commit 4ed45ce543
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 6 additions and 5 deletions

View File

@ -45,12 +45,12 @@ func (er erasureObjects) HealBucket(ctx context.Context, bucket string, opts mad
writeQuorum := getWriteQuorum(len(storageDisks)) writeQuorum := getWriteQuorum(len(storageDisks))
// Heal bucket. // Heal bucket.
return healBucket(ctx, storageDisks, storageEndpoints, bucket, writeQuorum, opts.DryRun) return healBucket(ctx, storageDisks, storageEndpoints, bucket, writeQuorum, opts)
} }
// Heal bucket - create buckets on disks where it does not exist. // Heal bucket - create buckets on disks where it does not exist.
func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints []string, bucket string, writeQuorum int, func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints []string, bucket string, writeQuorum int,
dryRun bool) (res madmin.HealResultItem, err error) { opts madmin.HealOpts) (res madmin.HealResultItem, err error) {
// Initialize sync waitgroup. // Initialize sync waitgroup.
g := errgroup.WithNErrs(len(storageDisks)) g := errgroup.WithNErrs(len(storageDisks))
@ -84,7 +84,7 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints
afterState[index] = madmin.DriveStateMissing afterState[index] = madmin.DriveStateMissing
// mutate only if not a dry-run // mutate only if not a dry-run
if dryRun { if opts.DryRun {
return nil return nil
} }
@ -99,7 +99,7 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints
errs := g.Wait() errs := g.Wait()
reducedErr := reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum-1) reducedErr := reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum-1)
if reducedErr == errVolumeNotFound { if errors.Is(reducedErr, errVolumeNotFound) && !opts.Recreate {
return res, nil return res, nil
} }

View File

@ -336,7 +336,7 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
} }
} }
for _, bucket := range buckets { for _, bucket := range buckets {
if _, err = newObject.HealBucket(ctx, bucket.Name, madmin.HealOpts{}); err != nil { if _, err = newObject.HealBucket(ctx, bucket.Name, madmin.HealOpts{Recreate: true}); err != nil {
return fmt.Errorf("Unable to list buckets to heal: %w", err) return fmt.Errorf("Unable to list buckets to heal: %w", err)
} }
} }

View File

@ -42,6 +42,7 @@ type HealOpts struct {
Recursive bool `json:"recursive"` Recursive bool `json:"recursive"`
DryRun bool `json:"dryRun"` DryRun bool `json:"dryRun"`
Remove bool `json:"remove"` Remove bool `json:"remove"`
Recreate bool `json:"recreate"` // only used when bucket needs to be healed
ScanMode HealScanMode `json:"scanMode"` ScanMode HealScanMode `json:"scanMode"`
} }