Return listing when exceeding min disk errors (#19644)

When listing, with drives returning `errFileNotFound,` `errVolumeNotFound`, or `errUnformattedDisk,`, 
we could get below `minDisks` drives being left.

This would result in a quorum never being reachable for any object. Therefore, the listing 
would continue, but no results would ever be produced.

Include `fnf` in the mindisk check since it is incremented on these errors. This will stop 
listing when minDisks are left.

Allow `opts.minDisks` to not return errVolumeNotFound or errFileNotFound and return that. 
That will allow for good results even if disks return something else.

We switch `errUnformattedDisk` to a regular error. If we have enough of those, we should just fail.
This commit is contained in:
Klaus Post 2024-05-01 10:59:08 -07:00 committed by GitHub
parent 8c1bba681b
commit 0cde17ae5d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1010,8 +1010,7 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
// not a storage error. // not a storage error.
return nil return nil
} }
askDisks := len(disks) readers := make([]*metacacheReader, len(disks))
readers := make([]*metacacheReader, askDisks)
defer func() { defer func() {
for _, r := range readers { for _, r := range readers {
r.Close() r.Close()
@ -1093,16 +1092,14 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
case nil: case nil:
default: default:
switch err.Error() { switch err.Error() {
case errFileNotFound.Error(), case errFileNotFound.Error():
errVolumeNotFound.Error(),
errUnformattedDisk.Error():
atEOF++ atEOF++
fnf++ fnf++
// This is a special case, to handle bucket does continue
// not exist situations. case errVolumeNotFound.Error():
if errors.Is(err, errVolumeNotFound) { atEOF++
vnf++ fnf++
} vnf++
continue continue
} }
hasErr++ hasErr++
@ -1141,8 +1138,17 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
topEntries[i] = entry topEntries[i] = entry
} }
// Stop if we exceed number of bad disks // Since minDisks is set to quorum, we return if we have enough.
if hasErr > len(disks)-opts.minDisks && hasErr > 0 { if vnf > 0 && vnf >= len(readers)-opts.minDisks {
return errVolumeNotFound
}
// Since minDisks is set to quorum, we return if we have enough.
if fnf > 0 && fnf >= len(readers)-opts.minDisks {
return errFileNotFound
}
// Stop if we exceed number of bad disks.
if hasErr > 0 && hasErr+fnf > len(disks)-opts.minDisks {
if opts.finished != nil { if opts.finished != nil {
opts.finished(errs) opts.finished(errs)
} }
@ -1160,10 +1166,6 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
return errors.New(strings.Join(combinedErr, ", ")) return errors.New(strings.Join(combinedErr, ", "))
} }
if vnf == len(readers) {
return errVolumeNotFound
}
// Break if all at EOF or error. // Break if all at EOF or error.
if atEOF+hasErr == len(readers) { if atEOF+hasErr == len(readers) {
if hasErr > 0 && opts.finished != nil { if hasErr > 0 && opts.finished != nil {
@ -1172,10 +1174,6 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
break break
} }
if fnf == len(readers) {
return errFileNotFound
}
if agree == len(readers) { if agree == len(readers) {
// Everybody agreed // Everybody agreed
for _, r := range readers { for _, r := range readers {