Fix error returned by HealObject in some cases (#11906)

The background healing can return NoSuchUpload error, the reason is that
healing code can return errFileNotFound with three parameters. Simplify
the code by returning exact errUploadNotFound error in multipart code.

Also ensure that a typed error is always returned whatever the number of
parameters because it is better than showing internal error.
This commit is contained in:
Anis Elleuch
2021-03-26 19:17:23 +01:00
committed by GitHub
parent 91eb1fe2ef
commit 8d5456c15a
3 changed files with 115 additions and 92 deletions

View File

@@ -22,139 +22,153 @@ import (
"fmt"
"io"
"path"
"strings"
)
// Converts underlying storage error. Convenience function written to
// handle all cases where we have known types of errors returned by
// underlying storage layer.
func toObjectErr(err error, params ...string) error {
if len(params) > 1 {
if HasSuffix(params[1], globalDirSuffix) {
params[1] = strings.TrimSuffix(params[1], globalDirSuffix) + slashSeparator
}
}
switch err {
case errVolumeNotFound:
apiErr := BucketNotFound{}
if len(params) >= 1 {
err = BucketNotFound{Bucket: params[0]}
apiErr.Bucket = params[0]
}
return apiErr
case errVolumeNotEmpty:
apiErr := BucketNotEmpty{}
if len(params) >= 1 {
err = BucketNotEmpty{Bucket: params[0]}
apiErr.Bucket = params[0]
}
return apiErr
case errVolumeExists:
apiErr := BucketExists{}
if len(params) >= 1 {
err = BucketExists{Bucket: params[0]}
apiErr.Bucket = params[0]
}
return apiErr
case errDiskFull:
err = StorageFull{}
return StorageFull{}
case errTooManyOpenFiles:
err = SlowDown{}
return SlowDown{}
case errFileAccessDenied:
if len(params) >= 2 {
err = PrefixAccessDenied{
Bucket: params[0],
Object: params[1],
}
apiErr := PrefixAccessDenied{}
if len(params) >= 1 {
apiErr.Bucket = params[0]
}
if len(params) >= 2 {
apiErr.Object = decodeDirObject(params[1])
}
return apiErr
case errFileParentIsFile:
if len(params) >= 2 {
err = ParentIsObject{
Bucket: params[0],
Object: params[1],
}
apiErr := ParentIsObject{}
if len(params) >= 1 {
apiErr.Bucket = params[0]
}
if len(params) >= 2 {
apiErr.Object = decodeDirObject(params[1])
}
return apiErr
case errIsNotRegular:
if len(params) >= 2 {
err = ObjectExistsAsDirectory{
Bucket: params[0],
Object: params[1],
}
apiErr := ObjectExistsAsDirectory{}
if len(params) >= 1 {
apiErr.Bucket = params[0]
}
if len(params) >= 2 {
apiErr.Object = decodeDirObject(params[1])
}
return apiErr
case errFileVersionNotFound:
switch len(params) {
case 2:
err = VersionNotFound{
Bucket: params[0],
Object: params[1],
}
case 3:
err = VersionNotFound{
Bucket: params[0],
Object: params[1],
VersionID: params[2],
}
apiErr := VersionNotFound{}
if len(params) >= 1 {
apiErr.Bucket = params[0]
}
if len(params) >= 2 {
apiErr.Object = decodeDirObject(params[1])
}
if len(params) >= 3 {
apiErr.VersionID = params[2]
}
return apiErr
case errMethodNotAllowed:
switch len(params) {
case 2:
err = MethodNotAllowed{
Bucket: params[0],
Object: params[1],
}
apiErr := MethodNotAllowed{}
if len(params) >= 1 {
apiErr.Bucket = params[0]
}
if len(params) >= 2 {
apiErr.Object = decodeDirObject(params[1])
}
return apiErr
case errFileNotFound:
switch len(params) {
case 2:
err = ObjectNotFound{
Bucket: params[0],
Object: params[1],
}
case 3:
err = InvalidUploadID{
Bucket: params[0],
Object: params[1],
UploadID: params[2],
}
apiErr := ObjectNotFound{}
if len(params) >= 1 {
apiErr.Bucket = params[0]
}
if len(params) >= 2 {
apiErr.Object = decodeDirObject(params[1])
}
return apiErr
case errUploadIDNotFound:
apiErr := InvalidUploadID{}
if len(params) >= 1 {
apiErr.Bucket = params[0]
}
if len(params) >= 2 {
apiErr.Object = decodeDirObject(params[1])
}
if len(params) >= 3 {
apiErr.UploadID = params[2]
}
return apiErr
case errFileNameTooLong:
if len(params) >= 2 {
err = ObjectNameInvalid{
Bucket: params[0],
Object: params[1],
}
apiErr := ObjectNameInvalid{}
if len(params) >= 1 {
apiErr.Bucket = params[0]
}
if len(params) >= 2 {
apiErr.Object = decodeDirObject(params[1])
}
return apiErr
case errDataTooLarge:
if len(params) >= 2 {
err = ObjectTooLarge{
Bucket: params[0],
Object: params[1],
}
apiErr := ObjectTooLarge{}
if len(params) >= 1 {
apiErr.Bucket = params[0]
}
if len(params) >= 2 {
apiErr.Object = decodeDirObject(params[1])
}
return apiErr
case errDataTooSmall:
apiErr := ObjectTooSmall{}
if len(params) >= 1 {
apiErr.Bucket = params[0]
}
if len(params) >= 2 {
err = ObjectTooSmall{
Bucket: params[0],
Object: params[1],
}
apiErr.Object = decodeDirObject(params[1])
}
return apiErr
case errErasureReadQuorum:
if len(params) == 1 {
err = InsufficientReadQuorum{
Bucket: params[0],
}
} else if len(params) >= 2 {
err = InsufficientReadQuorum{
Bucket: params[0],
Object: params[1],
}
apiErr := InsufficientReadQuorum{}
if len(params) >= 1 {
apiErr.Bucket = params[0]
}
if len(params) >= 2 {
apiErr.Object = decodeDirObject(params[1])
}
return apiErr
case errErasureWriteQuorum:
if len(params) == 1 {
err = InsufficientWriteQuorum{
Bucket: params[0],
}
} else if len(params) >= 2 {
err = InsufficientWriteQuorum{
Bucket: params[0],
Object: params[1],
}
apiErr := InsufficientWriteQuorum{}
if len(params) >= 1 {
apiErr.Bucket = params[0]
}
if len(params) >= 2 {
apiErr.Object = decodeDirObject(params[1])
}
return apiErr
case io.ErrUnexpectedEOF, io.ErrShortWrite:
err = IncompleteBody{}
return IncompleteBody{}
case context.Canceled, context.DeadlineExceeded:
err = IncompleteBody{}
return IncompleteBody{}
}
return err
}