Return "SlowDown" to S3 clients for network related errors (#7610)

Consider errors returned by httpClient.Do() as network errors. This is because
the http clients returns different types of errors and it is hard to catch
all the error types.
This commit is contained in:
Krishna Srinivas 2019-05-29 10:21:47 -07:00 committed by kannappanr
parent cb7f9ba286
commit 74e2fe0879
7 changed files with 23 additions and 64 deletions

View File

@ -879,16 +879,6 @@ var errorCodes = errorCodeMap{
Description: "Object name already exists as a directory.", Description: "Object name already exists as a directory.",
HTTPStatusCode: http.StatusConflict, HTTPStatusCode: http.StatusConflict,
}, },
ErrReadQuorum: {
Code: "XMinioReadQuorum",
Description: "Multiple disk failures, unable to reconstruct data.",
HTTPStatusCode: http.StatusServiceUnavailable,
},
ErrWriteQuorum: {
Code: "XMinioWriteQuorum",
Description: "Multiple disks failures, unable to write data.",
HTTPStatusCode: http.StatusServiceUnavailable,
},
ErrInvalidObjectName: { ErrInvalidObjectName: {
Code: "XMinioInvalidObjectName", Code: "XMinioInvalidObjectName",
Description: "Object name contains unsupported characters.", Description: "Object name contains unsupported characters.",
@ -1534,7 +1524,7 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
apiErr = ErrKMSAuthFailure apiErr = ErrKMSAuthFailure
case errOperationTimedOut, context.Canceled, context.DeadlineExceeded: case errOperationTimedOut, context.Canceled, context.DeadlineExceeded:
apiErr = ErrOperationTimedOut apiErr = ErrOperationTimedOut
case errNetworkConnReset: case errDiskNotFound:
apiErr = ErrSlowDown apiErr = ErrSlowDown
} }
@ -1593,9 +1583,9 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
case InvalidPart: case InvalidPart:
apiErr = ErrInvalidPart apiErr = ErrInvalidPart
case InsufficientWriteQuorum: case InsufficientWriteQuorum:
apiErr = ErrWriteQuorum apiErr = ErrSlowDown
case InsufficientReadQuorum: case InsufficientReadQuorum:
apiErr = ErrReadQuorum apiErr = ErrSlowDown
case UnsupportedDelimiter: case UnsupportedDelimiter:
apiErr = ErrNotImplemented apiErr = ErrNotImplemented
case InvalidMarkerPrefixCombination: case InvalidMarkerPrefixCombination:

View File

@ -39,8 +39,8 @@ var toAPIErrorTests = []struct {
{err: ObjectNameInvalid{}, errCode: ErrInvalidObjectName}, {err: ObjectNameInvalid{}, errCode: ErrInvalidObjectName},
{err: InvalidUploadID{}, errCode: ErrNoSuchUpload}, {err: InvalidUploadID{}, errCode: ErrNoSuchUpload},
{err: InvalidPart{}, errCode: ErrInvalidPart}, {err: InvalidPart{}, errCode: ErrInvalidPart},
{err: InsufficientReadQuorum{}, errCode: ErrReadQuorum}, {err: InsufficientReadQuorum{}, errCode: ErrSlowDown},
{err: InsufficientWriteQuorum{}, errCode: ErrWriteQuorum}, {err: InsufficientWriteQuorum{}, errCode: ErrSlowDown},
{err: UnsupportedDelimiter{}, errCode: ErrNotImplemented}, {err: UnsupportedDelimiter{}, errCode: ErrNotImplemented},
{err: InvalidMarkerPrefixCombination{}, errCode: ErrNotImplemented}, {err: InvalidMarkerPrefixCombination{}, errCode: ErrNotImplemented},
{err: InvalidUploadIDKeyCombination{}, errCode: ErrNotImplemented}, {err: InvalidUploadIDKeyCombination{}, errCode: ErrNotImplemented},

View File

@ -27,22 +27,6 @@ import (
// underlying storage layer. // underlying storage layer.
func toObjectErr(err error, params ...string) error { func toObjectErr(err error, params ...string) error {
switch err { switch err {
case errDiskNotFound:
switch len(params) {
case 1:
err = BucketNotFound{Bucket: params[0]}
case 2:
err = ObjectNotFound{
Bucket: params[0],
Object: params[1],
}
case 3:
err = InvalidUploadID{
Bucket: params[0],
Object: params[1],
UploadID: params[2],
}
}
case errVolumeNotFound: case errVolumeNotFound:
if len(params) >= 1 { if len(params) >= 1 {
err = BucketNotFound{Bucket: params[0]} err = BucketNotFound{Bucket: params[0]}

View File

@ -255,8 +255,8 @@ func testPutObjectPartDiskNotFound(obj ObjectLayer, instanceType string, disks [
} }
// As all disks at not available, bucket not found. // As all disks at not available, bucket not found.
expectedErr2 := BucketNotFound{Bucket: testCase.bucketName} expectedErr2 := errDiskNotFound
if err.Error() != expectedErr2.Error() { if err != errDiskNotFound {
t.Fatalf("Test %s: expected error %s, got %s instead.", instanceType, expectedErr2, err) t.Fatalf("Test %s: expected error %s, got %s instead.", instanceType, expectedErr2, err)
} }
} }

View File

@ -34,6 +34,17 @@ import (
// DefaultRESTTimeout - default RPC timeout is one minute. // DefaultRESTTimeout - default RPC timeout is one minute.
const DefaultRESTTimeout = 1 * time.Minute const DefaultRESTTimeout = 1 * time.Minute
// NetworkError - error type in case of errors related to http/transport
// for ex. connection refused, connection reset, dns resolution failure etc.
// All errors returned by storage-rest-server (ex errFileNotFound, errDiskNotFound) are not considered to be network errors.
type NetworkError struct {
Err error
}
func (n *NetworkError) Error() string {
return n.Err.Error()
}
// Client - http based RPC client. // Client - http based RPC client.
type Client struct { type Client struct {
httpClient *http.Client httpClient *http.Client
@ -46,7 +57,7 @@ type Client struct {
func (c *Client) Call(method string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) { func (c *Client) Call(method string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) {
req, err := http.NewRequest(http.MethodPost, c.url.String()+"/"+method+"?"+values.Encode(), body) req, err := http.NewRequest(http.MethodPost, c.url.String()+"/"+method+"?"+values.Encode(), body)
if err != nil { if err != nil {
return nil, err return nil, &NetworkError{err}
} }
req.Header.Set("Authorization", "Bearer "+c.newAuthToken()) req.Header.Set("Authorization", "Bearer "+c.newAuthToken())
@ -56,7 +67,7 @@ func (c *Client) Call(method string, values url.Values, body io.Reader, length i
} }
resp, err := c.httpClient.Do(req) resp, err := c.httpClient.Do(req)
if err != nil { if err != nil {
return nil, err return nil, &NetworkError{err}
} }
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {

View File

@ -21,7 +21,6 @@ import (
"crypto/tls" "crypto/tls"
"io" "io"
"io/ioutil" "io/ioutil"
"net"
"net/url" "net/url"
"path" "path"
"strconv" "strconv"
@ -44,32 +43,10 @@ func isNetworkError(err error) bool {
if err.Error() == errConnectionStale.Error() { if err.Error() == errConnectionStale.Error() {
return true return true
} }
if strings.Contains(err.Error(), "connection reset by peer") { if _, ok := err.(*rest.NetworkError); ok {
return true return true
} }
if uerr, isURLError := err.(*url.Error); isURLError { return false
if uerr.Timeout() {
return true
}
err = uerr.Err
}
_, isNetOpError := err.(*net.OpError)
return isNetOpError
}
// Attempt to approximate network error with a
// typed network error, otherwise default to
// errDiskNotFound
func toNetworkError(err error) error {
if err == nil {
return err
}
if strings.Contains(err.Error(), "connection reset by peer") {
return errNetworkConnReset
}
return errDiskNotFound
} }
// Converts rpc.ServerError to underlying error. This function is // Converts rpc.ServerError to underlying error. This function is
@ -81,7 +58,7 @@ func toStorageErr(err error) error {
} }
if isNetworkError(err) { if isNetworkError(err) {
return toNetworkError(err) return errDiskNotFound
} }
switch err.Error() { switch err.Error() {

View File

@ -85,6 +85,3 @@ var errNoSuchPolicy = errors.New("Specified canned policy does not exist")
// error returned when access is denied. // error returned when access is denied.
var errAccessDenied = errors.New("Do not have enough permissions to access this resource") var errAccessDenied = errors.New("Do not have enough permissions to access this resource")
// errNetworkConnReset - connection reset by peer
var errNetworkConnReset = errors.New("connection reset by peer")