fix: fallback listing on drives that are unformatted, disconnected (#13249)

This commit is contained in:
Harshavardhana 2021-09-23 17:24:24 -07:00 committed by GitHub
parent a1271d984f
commit 769f0b1e24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 12 deletions

View File

@ -209,6 +209,10 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (
} }
}() }()
o.ID = "" o.ID = ""
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Resuming listing from drives failed %w, proceeding to do raw listing", err))
}
} }
// Do listing in-place. // Do listing in-place.

View File

@ -365,7 +365,6 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
} }
const retryDelay = 500 * time.Millisecond const retryDelay = 500 * time.Millisecond
// Load first part metadata...
// All operations are performed without locks, so we must be careful and allow for failures. // All operations are performed without locks, so we must be careful and allow for failures.
// Read metadata associated with the object from a disk. // Read metadata associated with the object from a disk.
if retries > 0 { if retries > 0 {
@ -373,6 +372,9 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
if disk == nil { if disk == nil {
continue continue
} }
if !disk.IsOnline() {
continue
}
_, err := disk.ReadVersion(ctx, minioMetaBucket, _, err := disk.ReadVersion(ctx, minioMetaBucket,
o.objectPath(0), "", false) o.objectPath(0), "", false)
if err != nil { if err != nil {
@ -384,6 +386,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
} }
} }
// Load first part metadata...
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{}, true) fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{}, true)
if err != nil { if err != nil {
@ -441,6 +444,9 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
if disk == nil { if disk == nil {
continue continue
} }
if !disk.IsOnline() {
continue
}
_, err := disk.ReadVersion(ctx, minioMetaBucket, _, err := disk.ReadVersion(ctx, minioMetaBucket,
o.objectPath(partN), "", false) o.objectPath(partN), "", false)
if err != nil { if err != nil {
@ -452,7 +458,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
} }
} }
// Load first part metadata... // Load partN metadata...
fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{}, true) fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{}, true)
if err != nil { if err != nil {
time.Sleep(retryDelay) time.Sleep(retryDelay)
@ -785,11 +791,13 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
defer cancel() defer cancel()
fallback := func(err error) bool { fallback := func(err error) bool {
if err == nil { switch err.(type) {
return false case StorageErr:
// all supported disk errors
// attempt a fallback.
return true
} }
return err.Error() == errUnformattedDisk.Error() || return false
err.Error() == errVolumeNotFound.Error()
} }
askDisks := len(disks) askDisks := len(disks)
readers := make([]*metacacheReader, askDisks) readers := make([]*metacacheReader, askDisks)
@ -844,6 +852,8 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
if werr != io.EOF && werr != nil && if werr != io.EOF && werr != nil &&
werr.Error() != errFileNotFound.Error() && werr.Error() != errFileNotFound.Error() &&
werr.Error() != errVolumeNotFound.Error() && werr.Error() != errVolumeNotFound.Error() &&
werr.Error() != errDiskNotFound.Error() &&
werr.Error() != errUnformattedDisk.Error() &&
!errors.Is(werr, context.Canceled) { !errors.Is(werr, context.Canceled) {
logger.LogIf(ctx, werr) logger.LogIf(ctx, werr)
} }
@ -874,12 +884,11 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
continue continue
case nil: case nil:
default: default:
if err.Error() == errFileNotFound.Error() { switch err.Error() {
atEOF++ case errFileNotFound.Error(),
fnf++ errVolumeNotFound.Error(),
continue errUnformattedDisk.Error(),
} errDiskNotFound.Error():
if err.Error() == errVolumeNotFound.Error() {
atEOF++ atEOF++
fnf++ fnf++
continue continue