Fix listing fallback re-using disks (#14576)

When more than 2 disks are unavailable for listing, the same disk will be used for fallback.

This makes quorum calculations incorrect since the same disk will have multiple entries.

This PR keeps track of which fallback disks have been handed out and only every returns a disk once.
This commit is contained in:
Klaus Post 2022-03-18 11:35:27 -07:00 committed by GitHub
parent 43eb5a001c
commit 61eb9d4e29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -789,14 +789,30 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
fallback := func(err error) bool { // Keep track of fallback disks
var fdMu sync.Mutex
fds := opts.fallbackDisks
fallback := func(err error) StorageAPI {
switch err.(type) { switch err.(type) {
case StorageErr: case StorageErr:
// all supported disk errors // Attempt to grab a fallback disk
// attempt a fallback. fdMu.Lock()
return true defer fdMu.Unlock()
if len(fds) == 0 {
return nil
}
fdsCopy := fds
for _, fd := range fdsCopy {
// Grab a fallback disk
fds = fds[1:]
if fd != nil {
return fd
}
}
} }
return false // Either no more disks for fallback or
// not a storage error.
return nil
} }
askDisks := len(disks) askDisks := len(disks)
readers := make([]*metacacheReader, askDisks) readers := make([]*metacacheReader, askDisks)
@ -825,25 +841,20 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
} }
// fallback only when set. // fallback only when set.
if len(opts.fallbackDisks) > 0 && fallback(werr) { for fd := fallback(werr); fd != nil; {
// This fallback is only set when // This fallback is only set when
// askDisks is less than total // askDisks is less than total
// number of disks per set. // number of disks per set.
for _, fd := range opts.fallbackDisks { werr = fd.WalkDir(ctx, WalkDirOptions{
if fd == nil { Bucket: opts.bucket,
continue BaseDir: opts.path,
} Recursive: opts.recursive,
werr = fd.WalkDir(ctx, WalkDirOptions{ ReportNotFound: opts.reportNotFound,
Bucket: opts.bucket, FilterPrefix: opts.filterPrefix,
BaseDir: opts.path, ForwardTo: opts.forwardTo,
Recursive: opts.recursive, }, w)
ReportNotFound: opts.reportNotFound, if werr == nil {
FilterPrefix: opts.filterPrefix, break
ForwardTo: opts.forwardTo,
}, w)
if werr == nil {
break
}
} }
} }
w.CloseWithError(werr) w.CloseWithError(werr)