Show SlowDown error message if backend is busy (#7521)

or if there are too many open file descriptors.
This commit is contained in:
poornas 2019-05-02 07:09:57 -07:00 committed by kannappanr
parent 64998fc4ab
commit cf2a436bc8
8 changed files with 55 additions and 3 deletions

View File

@ -1485,7 +1485,6 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
if err == nil {
return ErrNone
}
// Verify if the underlying error is signature mismatch.
switch err {
case errInvalidArgument:
@ -1535,6 +1534,8 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
apiErr = ErrKMSAuthFailure
case errOperationTimedOut, context.Canceled, context.DeadlineExceeded:
apiErr = ErrOperationTimedOut
case errNetworkConnReset:
apiErr = ErrSlowDown
}
// Compression errors

View File

@ -206,6 +206,9 @@ func osErrToFSFileErr(err error) error {
if isSysErrPathNotFound(err) {
return errFileNotFound
}
if isSysErrTooManyFiles(err) {
return errTooManyOpenFiles
}
return err
}

View File

@ -57,6 +57,8 @@ func toObjectErr(err error, params ...string) error {
}
case errDiskFull:
err = StorageFull{}
case errTooManyOpenFiles:
err = SlowDown{}
case errFileAccessDenied:
if len(params) >= 2 {
err = PrefixAccessDenied{
@ -137,6 +139,13 @@ func (e StorageFull) Error() string {
return "Storage reached its minimum free disk threshold."
}
// SlowDown too many file descriptors open or backend busy .
type SlowDown struct{}
func (e SlowDown) Error() string {
return "Please reduce your request rate"
}
// InsufficientReadQuorum storage cannot satisfy quorum for read operation.
type InsufficientReadQuorum struct{}

View File

@ -136,3 +136,12 @@ func isSysErrCrossDevice(err error) bool {
e, ok := err.(*os.LinkError)
return ok && e.Err == syscall.EXDEV
}
// Check if given error corresponds to too many open files
func isSysErrTooManyFiles(err error) bool {
if err == syscall.ENFILE || err == syscall.EMFILE {
return true
}
pathErr, ok := err.(*os.PathError)
return ok && (pathErr.Err == syscall.ENFILE || pathErr.Err == syscall.EMFILE)
}

View File

@ -726,6 +726,8 @@ func (s *posix) ReadAll(volume, path string) (buf []byte, err error) {
return nil, errVolumeNotFound
} else if isSysErrIO(err) {
return nil, errFaultyDisk
} else if isSysErrTooManyFiles(err) {
return nil, errTooManyOpenFiles
}
return nil, err
}
@ -829,6 +831,8 @@ func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verif
return 0, errFileAccessDenied
case isSysErrIO(err):
return 0, errFaultyDisk
case isSysErrTooManyFiles(err):
return 0, errTooManyOpenFiles
default:
return 0, err
}
@ -939,6 +943,8 @@ func (s *posix) openFile(volume, path string, mode int) (f *os.File, err error)
return nil, errFileAccessDenied
case isSysErrIO(err):
return nil, errFaultyDisk
case isSysErrTooManyFiles(err):
return nil, errTooManyOpenFiles
default:
return nil, err
}
@ -1001,6 +1007,8 @@ func (s *posix) ReadFileStream(volume, path string, offset, length int64) (io.Re
return nil, errFileAccessDenied
case isSysErrIO(err):
return nil, errFaultyDisk
case isSysErrTooManyFiles(err):
return nil, errTooManyOpenFiles
default:
return nil, err
}

View File

@ -48,6 +48,9 @@ var errDiskAccessDenied = errors.New("disk access denied")
// errFileNotFound - cannot find the file.
var errFileNotFound = errors.New("file not found")
// errTooManyOpenFiles - too many open files.
var errTooManyOpenFiles = errors.New("too many open files")
// errFileNameTooLong - given file name is too long than supported length.
var errFileNameTooLong = errors.New("file name too long")

View File

@ -44,6 +44,9 @@ func isNetworkError(err error) bool {
if err.Error() == errConnectionStale.Error() {
return true
}
if strings.Contains(err.Error(), "connection reset by peer") {
return true
}
if uerr, isURLError := err.(*url.Error); isURLError {
if uerr.Timeout() {
return true
@ -56,6 +59,19 @@ func isNetworkError(err error) bool {
return isNetOpError
}
// Attempt to approximate network error with a
// typed network error, otherwise default to
// errDiskNotFound
func toNetworkError(err error) error {
if err == nil {
return err
}
if strings.Contains(err.Error(), "connection reset by peer") {
return errNetworkConnReset
}
return errDiskNotFound
}
// Converts rpc.ServerError to underlying error. This function is
// written so that the storageAPI errors are consistent across network
// disks as well.
@ -65,7 +81,7 @@ func toStorageErr(err error) error {
}
if isNetworkError(err) {
return errDiskNotFound
return toNetworkError(err)
}
switch err.Error() {
@ -234,7 +250,7 @@ func (client *storageRESTClient) CreateFile(volume, path string, length int64, r
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
values.Set(storageRESTLength, strconv.Itoa(int(length)))
respBody, err := client.call(storageRESTMethodCreateFile, values, r, length)
respBody, err := client.call(storageRESTMethodCreateFile, values, ioutil.NopCloser(r), length)
defer http.DrainBody(respBody)
return err
}

View File

@ -85,3 +85,6 @@ var errNoSuchPolicy = errors.New("Specified canned policy does not exist")
// error returned when access is denied.
var errAccessDenied = errors.New("Do not have enough permissions to access this resource")
// errNetworkConnReset - connection reset by peer
var errNetworkConnReset = errors.New("connection reset by peer")