diff --git a/cmd/storage-rpc-client.go b/cmd/storage-rpc-client.go index acb2b1376..ebe62e35d 100644 --- a/cmd/storage-rpc-client.go +++ b/cmd/storage-rpc-client.go @@ -23,7 +23,6 @@ import ( "net/rpc" "net/url" "path" - "sync/atomic" "github.com/minio/minio/pkg/disk" ) @@ -136,15 +135,6 @@ func (n *networkStorage) String() string { return n.rpcClient.ServerAddr() + ":" + n.rpcClient.ServiceEndpoint() } -// Network IO error count is kept at 256 with some simple -// math. Before we reject the disk completely. The combination -// of retry logic and total error count roughly comes around -// 2.5secs ( 2 * 5 * time.Millisecond * 256) which is when we -// basically take the disk offline completely. This is considered -// sufficient time tradeoff to avoid large delays in-terms of -// incoming i/o. -const maxAllowedNetworkIOError = 256 // maximum allowed network IOError. - // Init - attempts a login to reconnect. func (n *networkStorage) Init() error { err := n.rpcClient.Login() @@ -160,18 +150,6 @@ func (n *networkStorage) Close() (err error) { // DiskInfo - fetch disk information for a remote disk. func (n *networkStorage) DiskInfo() (info disk.Info, err error) { - defer func() { - if err == errDiskNotFound { - atomic.AddInt32(&n.networkIOErrCount, 1) - } - }() - - // Take remote disk offline if the total network errors. - // are more than maximum allowable IO error limit. - if n.networkIOErrCount > maxAllowedNetworkIOError { - return disk.Info{}, errFaultyRemoteDisk - } - args := AuthRPCArgs{} if err = n.rpcClient.Call("Storage.DiskInfoHandler", &args, &info); err != nil { return disk.Info{}, toStorageErr(err) @@ -181,18 +159,6 @@ func (n *networkStorage) DiskInfo() (info disk.Info, err error) { // MakeVol - create a volume on a remote disk. func (n *networkStorage) MakeVol(volume string) (err error) { - defer func() { - if err == errDiskNotFound { - atomic.AddInt32(&n.networkIOErrCount, 1) - } - }() - - // Take remote disk offline if the total network errors. - // are more than maximum allowable IO error limit. - if n.networkIOErrCount > maxAllowedNetworkIOError { - return errFaultyRemoteDisk - } - reply := AuthRPCReply{} args := GenericVolArgs{Vol: volume} if err := n.rpcClient.Call("Storage.MakeVolHandler", &args, &reply); err != nil { @@ -203,18 +169,6 @@ func (n *networkStorage) MakeVol(volume string) (err error) { // ListVols - List all volumes on a remote disk. func (n *networkStorage) ListVols() (vols []VolInfo, err error) { - defer func() { - if err == errDiskNotFound { - atomic.AddInt32(&n.networkIOErrCount, 1) - } - }() - - // Take remote disk offline if the total network errors. - // are more than maximum allowable IO error limit. - if n.networkIOErrCount > maxAllowedNetworkIOError { - return nil, errFaultyRemoteDisk - } - ListVols := ListVolsReply{} err = n.rpcClient.Call("Storage.ListVolsHandler", &AuthRPCArgs{}, &ListVols) if err != nil { @@ -225,18 +179,6 @@ func (n *networkStorage) ListVols() (vols []VolInfo, err error) { // StatVol - get volume info over the network. func (n *networkStorage) StatVol(volume string) (volInfo VolInfo, err error) { - defer func() { - if err == errDiskNotFound { - atomic.AddInt32(&n.networkIOErrCount, 1) - } - }() - - // Take remote disk offline if the total network errors. - // are more than maximum allowable IO error limit. - if n.networkIOErrCount > maxAllowedNetworkIOError { - return VolInfo{}, errFaultyRemoteDisk - } - args := GenericVolArgs{Vol: volume} if err = n.rpcClient.Call("Storage.StatVolHandler", &args, &volInfo); err != nil { return VolInfo{}, toStorageErr(err) @@ -246,18 +188,6 @@ func (n *networkStorage) StatVol(volume string) (volInfo VolInfo, err error) { // DeleteVol - Deletes a volume over the network. func (n *networkStorage) DeleteVol(volume string) (err error) { - defer func() { - if err == errDiskNotFound { - atomic.AddInt32(&n.networkIOErrCount, 1) - } - }() - - // Take remote disk offline if the total network errors. - // are more than maximum allowable IO error limit. - if n.networkIOErrCount > maxAllowedNetworkIOError { - return errFaultyRemoteDisk - } - reply := AuthRPCReply{} args := GenericVolArgs{Vol: volume} if err := n.rpcClient.Call("Storage.DeleteVolHandler", &args, &reply); err != nil { @@ -269,17 +199,6 @@ func (n *networkStorage) DeleteVol(volume string) (err error) { // File operations. func (n *networkStorage) PrepareFile(volume, path string, length int64) (err error) { - defer func() { - if err == errDiskNotFound { - atomic.AddInt32(&n.networkIOErrCount, 1) - } - }() - // Take remote disk offline if the total network errors. - // are more than maximum allowable IO error limit. - if n.networkIOErrCount > maxAllowedNetworkIOError { - return errFaultyRemoteDisk - } - reply := AuthRPCReply{} if err = n.rpcClient.Call("Storage.PrepareFileHandler", &PrepareFileArgs{ Vol: volume, @@ -293,18 +212,6 @@ func (n *networkStorage) PrepareFile(volume, path string, length int64) (err err // AppendFile - append file writes buffer to a remote network path. func (n *networkStorage) AppendFile(volume, path string, buffer []byte) (err error) { - defer func() { - if err == errDiskNotFound { - atomic.AddInt32(&n.networkIOErrCount, 1) - } - }() - - // Take remote disk offline if the total network errors. - // are more than maximum allowable IO error limit. - if n.networkIOErrCount > maxAllowedNetworkIOError { - return errFaultyRemoteDisk - } - reply := AuthRPCReply{} if err = n.rpcClient.Call("Storage.AppendFileHandler", &AppendFileArgs{ Vol: volume, @@ -318,18 +225,6 @@ func (n *networkStorage) AppendFile(volume, path string, buffer []byte) (err err // StatFile - get latest Stat information for a file at path. func (n *networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err error) { - defer func() { - if err == errDiskNotFound { - atomic.AddInt32(&n.networkIOErrCount, 1) - } - }() - - // Take remote disk offline if the total network errors. - // are more than maximum allowable IO error limit. - if n.networkIOErrCount > maxAllowedNetworkIOError { - return FileInfo{}, errFaultyRemoteDisk - } - if err = n.rpcClient.Call("Storage.StatFileHandler", &StatFileArgs{ Vol: volume, Path: path, @@ -344,18 +239,6 @@ func (n *networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err e // This API is meant to be used on files which have small memory footprint, do // not use this on large files as it would cause server to crash. func (n *networkStorage) ReadAll(volume, path string) (buf []byte, err error) { - defer func() { - if err == errDiskNotFound { - atomic.AddInt32(&n.networkIOErrCount, 1) - } - }() - - // Take remote disk offline if the total network errors. - // are more than maximum allowable IO error limit. - if n.networkIOErrCount > maxAllowedNetworkIOError { - return nil, errFaultyRemoteDisk - } - if err = n.rpcClient.Call("Storage.ReadAllHandler", &ReadAllArgs{ Vol: volume, Path: path, @@ -367,12 +250,6 @@ func (n *networkStorage) ReadAll(volume, path string) (buf []byte, err error) { // ReadFile - reads a file at remote path and fills the buffer. func (n *networkStorage) ReadFile(volume string, path string, offset int64, buffer []byte) (m int64, err error) { - defer func() { - if err == errDiskNotFound { - atomic.AddInt32(&n.networkIOErrCount, 1) - } - }() - defer func() { if r := recover(); r != nil { // Recover any panic from allocation, and return error. @@ -380,12 +257,6 @@ func (n *networkStorage) ReadFile(volume string, path string, offset int64, buff } }() // Do not crash the server. - // Take remote disk offline if the total network errors. - // are more than maximum allowable IO error limit. - if n.networkIOErrCount > maxAllowedNetworkIOError { - return 0, errFaultyRemoteDisk - } - var result []byte err = n.rpcClient.Call("Storage.ReadFileHandler", &ReadFileArgs{ Vol: volume, @@ -403,18 +274,6 @@ func (n *networkStorage) ReadFile(volume string, path string, offset int64, buff // ListDir - list all entries at prefix. func (n *networkStorage) ListDir(volume, path string) (entries []string, err error) { - defer func() { - if err == errDiskNotFound { - atomic.AddInt32(&n.networkIOErrCount, 1) - } - }() - - // Take remote disk offline if the total network errors. - // are more than maximum allowable IO error limit. - if n.networkIOErrCount > maxAllowedNetworkIOError { - return nil, errFaultyRemoteDisk - } - if err = n.rpcClient.Call("Storage.ListDirHandler", &ListDirArgs{ Vol: volume, Path: path, @@ -427,18 +286,6 @@ func (n *networkStorage) ListDir(volume, path string) (entries []string, err err // DeleteFile - Delete a file at path. func (n *networkStorage) DeleteFile(volume, path string) (err error) { - defer func() { - if err == errDiskNotFound { - atomic.AddInt32(&n.networkIOErrCount, 1) - } - }() - - // Take remote disk offline if the total network errors. - // are more than maximum allowable IO error limit. - if n.networkIOErrCount > maxAllowedNetworkIOError { - return errFaultyRemoteDisk - } - reply := AuthRPCReply{} if err = n.rpcClient.Call("Storage.DeleteFileHandler", &DeleteFileArgs{ Vol: volume, @@ -451,18 +298,6 @@ func (n *networkStorage) DeleteFile(volume, path string) (err error) { // RenameFile - rename a remote file from source to destination. func (n *networkStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) { - defer func() { - if err == errDiskNotFound { - atomic.AddInt32(&n.networkIOErrCount, 1) - } - }() - - // Take remote disk offline if the total network errors. - // are more than maximum allowable IO error limit. - if n.networkIOErrCount > maxAllowedNetworkIOError { - return errFaultyRemoteDisk - } - reply := AuthRPCReply{} if err = n.rpcClient.Call("Storage.RenameFileHandler", &RenameFileArgs{ SrcVol: srcVolume,