remove errCh usage from HealObjects() simplify it (#14414)

errCh is not needed instead, rely on errs slice to
capture and return errors instead.

most probably fixes #14247
This commit is contained in:
Harshavardhana 2022-02-25 12:20:41 -08:00 committed by GitHub
parent e3f24a29fa
commit e43cc316ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1730,14 +1730,13 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
// HealObjectFn closure function heals the object. // HealObjectFn closure function heals the object.
type HealObjectFn func(bucket, object, versionID string) error type HealObjectFn func(bucket, object, versionID string) error
func listAndHeal(ctx context.Context, bucket, prefix string, set *erasureObjects, healEntry func(metaCacheEntry) error, errCh chan<- error) { func listAndHeal(ctx context.Context, bucket, prefix string, set *erasureObjects, healEntry func(metaCacheEntry) error) error {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
disks, _ := set.getOnlineDisksWithHealing() disks, _ := set.getOnlineDisksWithHealing()
if len(disks) == 0 { if len(disks) == 0 {
errCh <- errors.New("listAndHeal: No non-healing disks found") return errors.New("listAndHeal: No non-healing disks found")
return
} }
// How to resolve partial results. // How to resolve partial results.
@ -1763,8 +1762,8 @@ func listAndHeal(ctx context.Context, bucket, prefix string, set *erasureObjects
reportNotFound: false, reportNotFound: false,
agreed: func(entry metaCacheEntry) { agreed: func(entry metaCacheEntry) {
if err := healEntry(entry); err != nil { if err := healEntry(entry); err != nil {
errCh <- err logger.LogIf(ctx, err)
return cancel()
} }
}, },
partial: func(entries metaCacheEntries, nAgreed int, errs []error) { partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
@ -1776,7 +1775,8 @@ func listAndHeal(ctx context.Context, bucket, prefix string, set *erasureObjects
} }
if err := healEntry(*entry); err != nil { if err := healEntry(*entry); err != nil {
errCh <- err logger.LogIf(ctx, err)
cancel()
return return
} }
}, },
@ -1784,13 +1784,13 @@ func listAndHeal(ctx context.Context, bucket, prefix string, set *erasureObjects
} }
if err := listPathRaw(ctx, lopts); err != nil { if err := listPathRaw(ctx, lopts); err != nil {
errCh <- fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts) return fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts)
return
} }
return nil
} }
func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObjectFn HealObjectFn) error { func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObjectFn HealObjectFn) error {
errCh := make(chan error)
healEntry := func(entry metaCacheEntry) error { healEntry := func(entry metaCacheEntry) error {
if entry.isDir() { if entry.isDir() {
return nil return nil
@ -1829,34 +1829,33 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
go func() { var poolErrs [][]error
defer close(errCh) for idx, erasureSet := range z.serverPools {
if z.IsSuspended(idx) {
continue
}
errs := make([]error, len(erasureSet.sets))
var wg sync.WaitGroup var wg sync.WaitGroup
for idx, erasureSet := range z.serverPools { for idx, set := range erasureSet.sets {
if z.IsSuspended(idx) { wg.Add(1)
continue go func(idx int, set *erasureObjects) {
} defer wg.Done()
for _, set := range erasureSet.sets {
wg.Add(1)
go func(set *erasureObjects) {
defer wg.Done()
listAndHeal(ctx, bucket, prefix, set, healEntry, errCh) errs[idx] = listAndHeal(ctx, bucket, prefix, set, healEntry)
}(set) }(idx, set)
}
} }
wg.Wait() wg.Wait()
}() poolErrs = append(poolErrs, errs)
var err error }
for e := range errCh { for _, errs := range poolErrs {
// Save first non-nil error. for _, err := range errs {
if e != nil && err != nil { if err == nil {
err = e continue
cancel() }
return err
} }
} }
return err return nil
} }
func (z *erasureServerPools) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) { func (z *erasureServerPools) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) {