object: handle Error responses and handle errDiskFull. (#1331)

This commit is contained in:
Harshavardhana 2016-04-19 02:42:10 -07:00 committed by Anand Babu (AB) Periasamy
parent 6bc17a3aea
commit e0f8fed011
11 changed files with 77 additions and 202 deletions

View File

@ -83,7 +83,7 @@ const (
ErrMalformedPOSTRequest ErrMalformedPOSTRequest
ErrSignatureVersionNotSupported ErrSignatureVersionNotSupported
ErrBucketNotEmpty ErrBucketNotEmpty
ErrRootPathFull ErrStorageFull
ErrObjectExistsAsPrefix ErrObjectExistsAsPrefix
ErrAllAccessDisabled ErrAllAccessDisabled
ErrMalformedPolicy ErrMalformedPolicy
@ -295,9 +295,9 @@ var errorCodeResponse = map[APIErrorCode]APIError{
Description: "The bucket you tried to delete is not empty.", Description: "The bucket you tried to delete is not empty.",
HTTPStatusCode: http.StatusConflict, HTTPStatusCode: http.StatusConflict,
}, },
ErrRootPathFull: { ErrStorageFull: {
Code: "RootPathFull", Code: "StorageFull",
Description: "Root path has reached its minimum free disk threshold. Please delete few objects to proceed.", Description: "Storage backend has reached its minimum free disk threshold. Please delete few objects to proceed.",
HTTPStatusCode: http.StatusInternalServerError, HTTPStatusCode: http.StatusInternalServerError,
}, },
ErrObjectExistsAsPrefix: { ErrObjectExistsAsPrefix: {

View File

@ -562,8 +562,8 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
if err != nil { if err != nil {
errorIf(err.Trace(), "PutObject failed.", nil) errorIf(err.Trace(), "PutObject failed.", nil)
switch err.ToGoError().(type) { switch err.ToGoError().(type) {
case RootPathFull: case StorageFull:
writeErrorResponse(w, r, ErrRootPathFull, r.URL.Path) writeErrorResponse(w, r, ErrStorageFull, r.URL.Path)
case BucketNotFound: case BucketNotFound:
writeErrorResponse(w, r, ErrNoSuchBucket, r.URL.Path) writeErrorResponse(w, r, ErrNoSuchBucket, r.URL.Path)
case BucketNameInvalid: case BucketNameInvalid:

2
fs.go
View File

@ -111,7 +111,7 @@ func checkDiskFree(diskPath string, minFreeDisk int64) (err error) {
// space used for journalling, inodes etc. // space used for journalling, inodes etc.
availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100 availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100
if int64(availableDiskSpace) <= minFreeDisk { if int64(availableDiskSpace) <= minFreeDisk {
return errDiskPathFull return errDiskFull
} }
// Success. // Success.

View File

@ -29,6 +29,16 @@ const (
b = "bytes=" b = "bytes="
) )
// InvalidRange - invalid range
type InvalidRange struct {
Start int64
Length int64
}
func (e InvalidRange) Error() string {
return fmt.Sprintf("Invalid range start:%d length:%d", e.Start, e.Length)
}
// HttpRange specifies the byte range to be sent to the client. // HttpRange specifies the byte range to be sent to the client.
type httpRange struct { type httpRange struct {
start, length, size int64 start, length, size int64

View File

@ -54,6 +54,8 @@ func splitNetPath(networkPath string) (netAddr, netPath string) {
// disks as well. // disks as well.
func toStorageErr(err error) error { func toStorageErr(err error) error {
switch err.Error() { switch err.Error() {
case errDiskFull.Error():
return errDiskFull
case errVolumeNotFound.Error(): case errVolumeNotFound.Error():
return errVolumeNotFound return errVolumeNotFound
case errVolumeExists.Error(): case errVolumeExists.Error():

View File

@ -250,6 +250,9 @@ func (o objectAPI) NewMultipartUpload(bucket, object string) (string, *probe.Err
if e == errVolumeNotFound { if e == errVolumeNotFound {
e = o.storage.MakeVol(minioMetaVolume) e = o.storage.MakeVol(minioMetaVolume)
if e != nil { if e != nil {
if e == errDiskFull {
return "", probe.NewError(StorageFull{})
}
return "", probe.NewError(e) return "", probe.NewError(e)
} }
} }
@ -272,6 +275,9 @@ func (o objectAPI) NewMultipartUpload(bucket, object string) (string, *probe.Err
return "", probe.NewError(e) return "", probe.NewError(e)
} }
} else { } else {
if e == errDiskFull {
return "", probe.NewError(StorageFull{})
}
return "", probe.NewError(e) return "", probe.NewError(e)
} }
return uploadID, nil return uploadID, nil
@ -320,8 +326,10 @@ func (o objectAPI) PutObjectPart(bucket, object, uploadID string, partID int, si
} else if e == errIsNotRegular { } else if e == errIsNotRegular {
return "", probe.NewError(ObjectExistsAsPrefix{ return "", probe.NewError(ObjectExistsAsPrefix{
Bucket: bucket, Bucket: bucket,
Prefix: object, Object: object,
}) })
} else if e == errDiskFull {
return "", probe.NewError(StorageFull{})
} }
return "", probe.NewError(e) return "", probe.NewError(e)
} }
@ -336,6 +344,9 @@ func (o objectAPI) PutObjectPart(bucket, object, uploadID string, partID int, si
if size > 0 { if size > 0 {
if _, e = io.CopyN(multiWriter, data, size); e != nil { if _, e = io.CopyN(multiWriter, data, size); e != nil {
safeCloseAndRemove(fileWriter) safeCloseAndRemove(fileWriter)
if e == io.ErrUnexpectedEOF {
return "", probe.NewError(IncompleteBody{})
}
return "", probe.NewError(e) return "", probe.NewError(e)
} }
} else { } else {
@ -468,8 +479,10 @@ func (o objectAPI) CompleteMultipartUpload(bucket string, object string, uploadI
} else if e == errIsNotRegular { } else if e == errIsNotRegular {
return "", probe.NewError(ObjectExistsAsPrefix{ return "", probe.NewError(ObjectExistsAsPrefix{
Bucket: bucket, Bucket: bucket,
Prefix: object, Object: object,
}) })
} else if e == errDiskFull {
return "", probe.NewError(StorageFull{})
} }
return "", probe.NewError(e) return "", probe.NewError(e)
} }

View File

@ -48,6 +48,8 @@ func (o objectAPI) MakeBucket(bucket string) *probe.Error {
if e := o.storage.MakeVol(bucket); e != nil { if e := o.storage.MakeVol(bucket); e != nil {
if e == errVolumeExists { if e == errVolumeExists {
return probe.NewError(BucketExists{Bucket: bucket}) return probe.NewError(BucketExists{Bucket: bucket})
} else if e == errDiskFull {
return probe.NewError(StorageFull{})
} }
return probe.NewError(e) return probe.NewError(e)
} }
@ -209,8 +211,10 @@ func (o objectAPI) PutObject(bucket string, object string, size int64, data io.R
} else if e == errIsNotRegular { } else if e == errIsNotRegular {
return "", probe.NewError(ObjectExistsAsPrefix{ return "", probe.NewError(ObjectExistsAsPrefix{
Bucket: bucket, Bucket: bucket,
Prefix: object, Object: object,
}) })
} else if e == errDiskFull {
return "", probe.NewError(StorageFull{})
} }
return "", probe.NewError(e) return "", probe.NewError(e)
} }
@ -225,6 +229,9 @@ func (o objectAPI) PutObject(bucket string, object string, size int64, data io.R
if size > 0 { if size > 0 {
if _, e = io.CopyN(multiWriter, data, size); e != nil { if _, e = io.CopyN(multiWriter, data, size); e != nil {
safeCloseAndRemove(fileWriter) safeCloseAndRemove(fileWriter)
if e == io.ErrUnexpectedEOF {
return "", probe.NewError(IncompleteBody{})
}
return "", probe.NewError(e) return "", probe.NewError(e)
} }
} else { } else {

View File

@ -18,112 +18,54 @@ package main
import "fmt" import "fmt"
// InvalidArgument invalid argument // StorageFull storage ran out of space
type InvalidArgument struct{} type StorageFull struct{}
func (e InvalidArgument) Error() string { func (e StorageFull) Error() string {
return "Invalid argument" return "Storage reached its minimum free disk threshold."
} }
// UnsupportedFilesystem unsupported filesystem type // GenericError - generic object layer error.
type UnsupportedFilesystem struct { type GenericError struct {
Type string Bucket string
} Object string
func (e UnsupportedFilesystem) Error() string {
return "Unsupported filesystem: " + e.Type
}
// RootPathFull root path out of space
type RootPathFull struct {
Path string
}
func (e RootPathFull) Error() string {
return "Root path " + e.Path + " reached its minimum free disk threshold."
} }
// BucketNotFound bucket does not exist // BucketNotFound bucket does not exist
type BucketNotFound struct { type BucketNotFound GenericError
Bucket string
}
func (e BucketNotFound) Error() string { func (e BucketNotFound) Error() string {
return "Bucket not found: " + e.Bucket return "Bucket not found: " + e.Bucket
} }
// BucketNotEmpty bucket is not empty // BucketNotEmpty bucket is not empty
type BucketNotEmpty struct { type BucketNotEmpty GenericError
Bucket string
}
func (e BucketNotEmpty) Error() string { func (e BucketNotEmpty) Error() string {
return "Bucket not empty: " + e.Bucket return "Bucket not empty: " + e.Bucket
} }
// ObjectNotFound object does not exist // ObjectNotFound object does not exist
type ObjectNotFound struct { type ObjectNotFound GenericError
Bucket string
Object string
}
func (e ObjectNotFound) Error() string { func (e ObjectNotFound) Error() string {
return "Object not found: " + e.Bucket + "#" + e.Object return "Object not found: " + e.Bucket + "#" + e.Object
} }
// ObjectExistsAsPrefix object already exists with a requested prefix. // ObjectExistsAsPrefix object already exists with a requested prefix.
type ObjectExistsAsPrefix struct { type ObjectExistsAsPrefix GenericError
Bucket string
Prefix string
}
func (e ObjectExistsAsPrefix) Error() string { func (e ObjectExistsAsPrefix) Error() string {
return "Object exists on : " + e.Bucket + " as prefix " + e.Prefix return "Object exists on : " + e.Bucket + " as prefix " + e.Object
}
// ObjectCorrupted object found to be corrupted
type ObjectCorrupted struct {
Object string
}
func (e ObjectCorrupted) Error() string {
return "Object found corrupted: " + e.Object
} }
// BucketExists bucket exists // BucketExists bucket exists
type BucketExists struct { type BucketExists GenericError
Bucket string
}
func (e BucketExists) Error() string { func (e BucketExists) Error() string {
return "Bucket exists: " + e.Bucket return "Bucket exists: " + e.Bucket
} }
// CorruptedBackend backend found to be corrupted
type CorruptedBackend struct {
Backend string
}
func (e CorruptedBackend) Error() string {
return "Corrupted backend: " + e.Backend
}
// NotImplemented function not implemented
type NotImplemented struct {
Function string
}
func (e NotImplemented) Error() string {
return "Not implemented: " + e.Function
}
// InvalidDisksArgument invalid number of disks per node
type InvalidDisksArgument struct{}
func (e InvalidDisksArgument) Error() string {
return "Invalid number of disks per node"
}
// BadDigest - Content-MD5 you specified did not match what we received. // BadDigest - Content-MD5 you specified did not match what we received.
type BadDigest struct { type BadDigest struct {
ExpectedMD5 string ExpectedMD5 string
@ -143,8 +85,7 @@ func (e UnsupportedDelimiter) Error() string {
return fmt.Sprintf("delimiter '%s' is not supported. Only '/' is supported", e.Delimiter) return fmt.Sprintf("delimiter '%s' is not supported. Only '/' is supported", e.Delimiter)
} }
// InvalidUploadIDKeyCombination - invalid upload id and key marker // InvalidUploadIDKeyCombination - invalid upload id and key marker combination.
// combination.
type InvalidUploadIDKeyCombination struct { type InvalidUploadIDKeyCombination struct {
UploadIDMarker, KeyMarker string UploadIDMarker, KeyMarker string
} }
@ -162,132 +103,41 @@ func (e InvalidMarkerPrefixCombination) Error() string {
return fmt.Sprintf("Invalid combination of marker '%s' and prefix '%s'", e.Marker, e.Prefix) return fmt.Sprintf("Invalid combination of marker '%s' and prefix '%s'", e.Marker, e.Prefix)
} }
// InternalError - generic internal error
type InternalError struct{}
// BackendError - generic disk backend error
type BackendError struct {
Path string
}
// BackendCorrupted - path has corrupted data
type BackendCorrupted BackendError
// APINotImplemented - generic API not implemented error
type APINotImplemented struct {
API string
}
// GenericBucketError - generic bucket error
type GenericBucketError struct {
Bucket string
}
// BucketPolicyNotFound - no bucket policy found. // BucketPolicyNotFound - no bucket policy found.
type BucketPolicyNotFound GenericBucketError type BucketPolicyNotFound GenericError
func (e BucketPolicyNotFound) Error() string { func (e BucketPolicyNotFound) Error() string {
return "No bucket policy found for bucket: " + e.Bucket return "No bucket policy found for bucket: " + e.Bucket
} }
// GenericObjectError - generic object error
type GenericObjectError struct {
Bucket string
Object string
}
// ImplementationError - generic implementation error
type ImplementationError struct {
Bucket string
Object string
Err error
}
/// Bucket related errors /// Bucket related errors
// BucketNameInvalid - bucketname provided is invalid // BucketNameInvalid - bucketname provided is invalid
type BucketNameInvalid GenericBucketError type BucketNameInvalid GenericError
/// Object related errors
// ObjectNameInvalid - object name provided is invalid
type ObjectNameInvalid GenericObjectError
// Return string an error formatted as the given text
func (e ImplementationError) Error() string {
error := ""
if e.Bucket != "" {
error = error + "Bucket: " + e.Bucket + " "
}
if e.Object != "" {
error = error + "Object: " + e.Object + " "
}
error = error + "Error: " + e.Err.Error()
return error
}
// EmbedError - wrapper function for error object
func EmbedError(bucket, object string, err error) ImplementationError {
return ImplementationError{
Bucket: bucket,
Object: object,
Err: err,
}
}
// Return string an error formatted as the given text
func (e InternalError) Error() string {
return "Internal error occured"
}
// Return string an error formatted as the given text
func (e APINotImplemented) Error() string {
return "Api not implemented: " + e.API
}
// Return string an error formatted as the given text // Return string an error formatted as the given text
func (e BucketNameInvalid) Error() string { func (e BucketNameInvalid) Error() string {
return "Bucket name invalid: " + e.Bucket return "Bucket name invalid: " + e.Bucket
} }
/// Object related errors
// ObjectNameInvalid - object name provided is invalid
type ObjectNameInvalid GenericError
// Return string an error formatted as the given text // Return string an error formatted as the given text
func (e ObjectNameInvalid) Error() string { func (e ObjectNameInvalid) Error() string {
return "Object name invalid: " + e.Bucket + "#" + e.Object return "Object name invalid: " + e.Bucket + "#" + e.Object
} }
// IncompleteBody You did not provide the number of bytes specified by the Content-Length HTTP header // IncompleteBody You did not provide the number of bytes specified by the Content-Length HTTP header
type IncompleteBody GenericObjectError type IncompleteBody GenericError
// Return string an error formatted as the given text // Return string an error formatted as the given text
func (e IncompleteBody) Error() string { func (e IncompleteBody) Error() string {
return e.Bucket + "#" + e.Object + "has incomplete body" return e.Bucket + "#" + e.Object + "has incomplete body"
} }
// Return string an error formatted as the given text
func (e BackendCorrupted) Error() string {
return "Backend corrupted: " + e.Path
}
// OperationNotPermitted - operation not permitted
type OperationNotPermitted struct {
Op string
Reason string
}
func (e OperationNotPermitted) Error() string {
return "Operation " + e.Op + " not permitted for reason: " + e.Reason
}
// InvalidRange - invalid range
type InvalidRange struct {
Start int64
Length int64
}
func (e InvalidRange) Error() string {
return fmt.Sprintf("Invalid range start:%d length:%d", e.Start, e.Length)
}
/// Multipart related errors /// Multipart related errors
// MalformedUploadID malformed upload id. // MalformedUploadID malformed upload id.
@ -323,10 +173,3 @@ type InvalidPartOrder struct {
func (e InvalidPartOrder) Error() string { func (e InvalidPartOrder) Error() string {
return "Invalid part order sent for " + e.UploadID return "Invalid part order sent for " + e.UploadID
} }
// MalformedXML invalid xml format
type MalformedXML struct{}
func (e MalformedXML) Error() string {
return "Malformed XML"
}

View File

@ -460,8 +460,8 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
md5Sum, err := api.ObjectAPI.PutObject(bucket, object, size, readCloser, metadata) md5Sum, err := api.ObjectAPI.PutObject(bucket, object, size, readCloser, metadata)
if err != nil { if err != nil {
switch err.ToGoError().(type) { switch err.ToGoError().(type) {
case RootPathFull: case StorageFull:
writeErrorResponse(w, r, ErrRootPathFull, r.URL.Path) writeErrorResponse(w, r, ErrStorageFull, r.URL.Path)
case BucketNotFound: case BucketNotFound:
writeErrorResponse(w, r, ErrNoSuchBucket, r.URL.Path) writeErrorResponse(w, r, ErrNoSuchBucket, r.URL.Path)
case BucketNameInvalid: case BucketNameInvalid:
@ -684,8 +684,8 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
return return
} }
switch e.(type) { switch e.(type) {
case RootPathFull: case StorageFull:
writeErrorResponse(w, r, ErrRootPathFull, r.URL.Path) writeErrorResponse(w, r, ErrStorageFull, r.URL.Path)
case BucketNotFound: case BucketNotFound:
writeErrorResponse(w, r, ErrNoSuchBucket, r.URL.Path) writeErrorResponse(w, r, ErrNoSuchBucket, r.URL.Path)
case BucketNameInvalid: case BucketNameInvalid:
@ -738,8 +738,8 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
if err != nil { if err != nil {
errorIf(err.Trace(), "NewMultipartUpload failed.", nil) errorIf(err.Trace(), "NewMultipartUpload failed.", nil)
switch err.ToGoError().(type) { switch err.ToGoError().(type) {
case RootPathFull: case StorageFull:
writeErrorResponse(w, r, ErrRootPathFull, r.URL.Path) writeErrorResponse(w, r, ErrStorageFull, r.URL.Path)
case BucketNameInvalid: case BucketNameInvalid:
writeErrorResponse(w, r, ErrInvalidBucketName, r.URL.Path) writeErrorResponse(w, r, ErrInvalidBucketName, r.URL.Path)
case BucketNotFound: case BucketNotFound:
@ -856,8 +856,8 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
return return
} }
switch e.(type) { switch e.(type) {
case RootPathFull: case StorageFull:
writeErrorResponse(w, r, ErrRootPathFull, r.URL.Path) writeErrorResponse(w, r, ErrStorageFull, r.URL.Path)
case InvalidUploadID: case InvalidUploadID:
writeErrorResponse(w, r, ErrNoSuchUpload, r.URL.Path) writeErrorResponse(w, r, ErrNoSuchUpload, r.URL.Path)
case BadDigest: case BadDigest:

View File

@ -18,8 +18,8 @@ package main
import "errors" import "errors"
// errDiskPathFull - cannot create volume or files when disk is full. // errDiskFull - cannot create volume or files when disk is full.
var errDiskPathFull = errors.New("Disk path full.") var errDiskFull = errors.New("Disk path full.")
// errFileNotFound - cannot find the file. // errFileNotFound - cannot find the file.
var errFileNotFound = errors.New("File not found.") var errFileNotFound = errors.New("File not found.")

View File

@ -413,8 +413,8 @@ func writeWebErrorResponse(w http.ResponseWriter, err error) {
// Convert error type to api error code. // Convert error type to api error code.
var apiErrCode APIErrorCode var apiErrCode APIErrorCode
switch err.(type) { switch err.(type) {
case RootPathFull: case StorageFull:
apiErrCode = ErrRootPathFull apiErrCode = ErrStorageFull
case BucketNotFound: case BucketNotFound:
apiErrCode = ErrNoSuchBucket apiErrCode = ErrNoSuchBucket
case BucketNameInvalid: case BucketNameInvalid: