posix: Do not take disk offline on I/O errors (#8836)

Choosing maxAllowedIOError is arbitrary and
prone to errors, when drives might be perfectly
capable of taking I/O with only few locations
return I/O error. This is a hindrance of sort
where backend filesystems like ZFS can automatically
fix and handle these scenarios.

The added problem with current approach that we
take the drive offline, making it virtually impossible
to bring it online without restart the server which
is not desirable on a busy cluster. Remove this state
such that let the backend return error appropriately
to caller and let the caller decide what to do with
the error.
This commit is contained in:
Harshavardhana 2020-01-17 13:34:43 -08:00 committed by GitHub
parent 005ebbb9b2
commit fc5213258e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 411 deletions

View File

@ -50,8 +50,7 @@ import (
const (
diskMinFreeSpace = 900 * humanize.MiByte // Min 900MiB free space.
diskMinTotalSpace = diskMinFreeSpace // Min 900MiB total space.
maxAllowedIOError = 5
readBlockSize = 4 * humanize.MiByte // Default read block size 4MiB.
readBlockSize = 4 * humanize.MiByte // Default read block size 4MiB.
// On regular files bigger than this;
readAheadSize = 16 << 20
@ -83,11 +82,10 @@ func isValidVolname(volname string) bool {
// posix - implements StorageAPI interface.
type posix struct {
// Disk usage metrics
totalUsed uint64 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
ioErrCount int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
totalUsed uint64 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
activeIOCount int32
maxActiveIOCount int32
activeIOCount int32
diskPath string
pool sync.Pool
@ -219,8 +217,12 @@ func newPosix(path string) (*posix, error) {
return &b
},
},
stopUsageCh: make(chan struct{}),
diskMount: mountinfo.IsLikelyMountPoint(path),
stopUsageCh: make(chan struct{}),
diskMount: mountinfo.IsLikelyMountPoint(path),
// Allow disk usage crawler to run upto 10 concurrent
// I/O ops, if and when activeIOCount reaches this
// value disk usage routine suspends the crawler
// and waits until activeIOCount reaches below this threshold.
maxActiveIOCount: 10,
}
@ -392,7 +394,7 @@ func (s *posix) CrawlAndGetDataUsage(endCh <-chan struct{}) (DataUsageInfo, erro
return nil
}
if strings.HasSuffix(prefix, "/xl.json") {
if strings.HasSuffix(prefix, SlashSeparator+xlMetaJSONFile) {
xlMetaBuf, err := ioutil.ReadFile(origPath)
if err != nil {
return nil
@ -434,16 +436,6 @@ type DiskInfo struct {
// DiskInfo provides current information about disk space usage,
// total free inodes and underlying filesystem.
func (s *posix) DiskInfo() (info DiskInfo, err error) {
defer func() {
if s != nil && err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return info, errFaultyDisk
}
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
@ -557,16 +549,6 @@ func (s *posix) MakeVolBulk(volumes ...string) (err error) {
// Make a volume entry.
func (s *posix) MakeVol(volume string) (err error) {
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return errFaultyDisk
}
if !isValidVolname(volume) {
return errInvalidArgument
}
@ -601,16 +583,6 @@ func (s *posix) MakeVol(volume string) (err error) {
// ListVols - list volumes.
func (s *posix) ListVols() (volsInfo []VolInfo, err error) {
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return nil, errFaultyDisk
}
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
@ -671,16 +643,6 @@ func listVols(dirPath string) ([]VolInfo, error) {
// StatVol - get volume info.
func (s *posix) StatVol(volume string) (volInfo VolInfo, err error) {
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return VolInfo{}, errFaultyDisk
}
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
@ -713,16 +675,6 @@ func (s *posix) StatVol(volume string) (volInfo VolInfo, err error) {
// DeleteVol - delete a volume.
func (s *posix) DeleteVol(volume string) (err error) {
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return errFaultyDisk
}
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
@ -756,16 +708,6 @@ func (s *posix) DeleteVol(volume string) (err error) {
func (s *posix) Walk(volume, dirPath, marker string, recursive bool, leafFile string,
readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (ch chan FileInfo, err error) {
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return nil, errFaultyDisk
}
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
@ -834,16 +776,6 @@ func (s *posix) Walk(volume, dirPath, marker string, recursive bool, leafFile st
// ListDir - return all the entries at the given directory path.
// If an entry is a directory it will be returned with a trailing SlashSeparator.
func (s *posix) ListDir(volume, dirPath string, count int, leafFile string) (entries []string, err error) {
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return nil, errFaultyDisk
}
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
@ -891,16 +823,6 @@ func (s *posix) ListDir(volume, dirPath string, count int, leafFile string) (ent
// This API is meant to be used on files which have small memory footprint, do
// not use this on large files as it would cause server to crash.
func (s *posix) ReadAll(volume, path string) (buf []byte, err error) {
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return nil, errFaultyDisk
}
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
@ -963,22 +885,10 @@ func (s *posix) ReadAll(volume, path string) (buf []byte, err error) {
// Additionally ReadFile also starts reading from an offset. ReadFile
// semantics are same as io.ReadFull.
func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verifier *BitrotVerifier) (int64, error) {
var n int
var err error
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if offset < 0 {
return 0, errInvalidArgument
}
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return 0, errFaultyDisk
}
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
@ -988,6 +898,9 @@ func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verif
if err != nil {
return 0, err
}
var n int
// Stat a volume entry.
_, err = os.Stat((volumeDir))
if err != nil {
@ -1071,16 +984,6 @@ func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verif
}
func (s *posix) openFile(volume, path string, mode int) (f *os.File, err error) {
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return nil, errFaultyDisk
}
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
@ -1137,21 +1040,10 @@ func (s *posix) openFile(volume, path string, mode int) (f *os.File, err error)
// ReadFileStream - Returns the read stream of the file.
func (s *posix) ReadFileStream(volume, path string, offset, length int64) (io.ReadCloser, error) {
var err error
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if offset < 0 {
return nil, errInvalidArgument
}
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return nil, errFaultyDisk
}
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
@ -1230,15 +1122,6 @@ func (s *posix) CreateFile(volume, path string, fileSize int64, r io.Reader) (er
if fileSize < -1 {
return errInvalidArgument
}
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return errFaultyDisk
}
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
@ -1334,16 +1217,6 @@ func (s *posix) CreateFile(volume, path string, fileSize int64, r io.Reader) (er
}
func (s *posix) WriteAll(volume, path string, reader io.Reader) (err error) {
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return errFaultyDisk
}
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
@ -1368,16 +1241,6 @@ func (s *posix) WriteAll(volume, path string, reader io.Reader) (err error) {
// AppendFile - append a byte array at path, if file doesn't exist at
// path this call explicitly creates it.
func (s *posix) AppendFile(volume, path string, buf []byte) (err error) {
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return errFaultyDisk
}
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
@ -1400,16 +1263,6 @@ func (s *posix) AppendFile(volume, path string, buf []byte) (err error) {
// StatFile - get file info.
func (s *posix) StatFile(volume, path string) (file FileInfo, err error) {
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return FileInfo{}, errFaultyDisk
}
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
@ -1501,16 +1354,6 @@ func deleteFile(basePath, deletePath string) error {
// DeleteFile - delete a file at path.
func (s *posix) DeleteFile(volume, path string) (err error) {
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return errFaultyDisk
}
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
@ -1555,16 +1398,6 @@ func (s *posix) DeleteFileBulk(volume string, paths []string) (errs []error, err
// RenameFile - rename source path to destination path atomically.
func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) {
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return errFaultyDisk
}
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
@ -1652,16 +1485,6 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e
}
func (s *posix) VerifyFile(volume, path string, fileSize int64, algo BitrotAlgorithm, sum []byte, shardSize int64) (err error) {
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return errFaultyDisk
}
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)

View File

@ -310,18 +310,6 @@ func TestPosixReadAll(t *testing.T) {
}
}
}
// TestPosixing for faulty disk.
// Setting ioErrCount > maxAllowedIOError.
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
p.storage.ioErrCount = int32(6)
} else {
t.Errorf("Expected the StorageAPI to be of type *posix")
}
_, err = posixStorage.ReadAll("abcd", "efg")
if err != errFaultyDisk {
t.Errorf("Expected err \"%s\", got err \"%s\"", errFaultyDisk, err)
}
}
// TestPosixNewPosix all the cases handled in posix storage layer initialization.
@ -390,50 +378,35 @@ func TestPosixMakeVol(t *testing.T) {
testCases := []struct {
volName string
ioErrCount int
expectedErr error
}{
// TestPosix case - 1.
// A valid case, volume creation is expected to succeed.
{
volName: "success-vol",
ioErrCount: 0,
expectedErr: nil,
},
// TestPosix case - 2.
// Case where a file exists by the name of the volume to be created.
{
volName: "vol-as-file",
ioErrCount: 0,
expectedErr: errVolumeExists,
},
// TestPosix case - 3.
{
volName: "existing-vol",
ioErrCount: 0,
expectedErr: errVolumeExists,
},
// TestPosix case - 4.
// IO error > maxAllowedIOError, should fail with errFaultyDisk.
{
volName: "vol",
ioErrCount: 6,
expectedErr: errFaultyDisk,
},
// TestPosix case - 5.
// TestPosix case with invalid volume name.
{
volName: "ab",
ioErrCount: 0,
expectedErr: errInvalidArgument,
},
}
for i, testCase := range testCases {
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
p.storage.ioErrCount = int32(testCase.ioErrCount)
} else {
if _, ok := posixStorage.(*posixDiskIDCheck); !ok {
t.Errorf("Expected the StorageAPI to be of type *posix")
}
if err := posixStorage.MakeVol(testCase.volName); err != testCase.expectedErr {
@ -503,51 +476,36 @@ func TestPosixDeleteVol(t *testing.T) {
testCases := []struct {
volName string
ioErrCount int
expectedErr error
}{
// TestPosix case - 1.
// A valida case. Empty vol, should be possible to delete.
{
volName: "success-vol",
ioErrCount: 0,
expectedErr: nil,
},
// TestPosix case - 2.
// volume is non-existent.
{
volName: "nonexistent-vol",
ioErrCount: 0,
expectedErr: errVolumeNotFound,
},
// TestPosix case - 3.
// It shouldn't be possible to delete an non-empty volume, validating the same.
{
volName: "nonempty-vol",
ioErrCount: 0,
expectedErr: errVolumeNotEmpty,
},
// TestPosix case - 4.
// IO error > maxAllowedIOError, should fail with errFaultyDisk.
{
volName: "my-disk",
ioErrCount: 6,
expectedErr: errFaultyDisk,
},
// TestPosix case - 5.
// Invalid volume name.
{
volName: "ab",
ioErrCount: 0,
expectedErr: errVolumeNotFound,
},
}
for i, testCase := range testCases {
if posixDiskCheck, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
posixDiskCheck.storage.ioErrCount = int32(testCase.ioErrCount)
} else {
if _, ok := posixStorage.(*posixDiskIDCheck); !ok {
t.Errorf("Expected the StorageAPI to be of type *posixDiskIDCheck")
}
if err = posixStorage.DeleteVol(testCase.volName); err != testCase.expectedErr {
@ -625,42 +583,28 @@ func TestPosixStatVol(t *testing.T) {
testCases := []struct {
volName string
ioErrCount int
expectedErr error
}{
// TestPosix case - 1.
{
volName: "success-vol",
ioErrCount: 0,
expectedErr: nil,
},
// TestPosix case - 2.
{
volName: "nonexistent-vol",
ioErrCount: 0,
expectedErr: errVolumeNotFound,
},
// TestPosix case - 3.
{
volName: "success-vol",
ioErrCount: 6,
expectedErr: errFaultyDisk,
},
// TestPosix case - 4.
{
volName: "ab",
ioErrCount: 0,
expectedErr: errVolumeNotFound,
},
}
for i, testCase := range testCases {
var volInfo VolInfo
// setting ioErrCnt from the test case.
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
p.storage.ioErrCount = int32(testCase.ioErrCount)
} else {
if _, ok := posixStorage.(*posixDiskIDCheck); !ok {
t.Errorf("Expected the StorageAPI to be of type *posix")
}
volInfo, err = posixStorage.StatVol(testCase.volName)
@ -729,27 +673,9 @@ func TestPosixListVols(t *testing.T) {
t.Errorf("expected: success-vol to be created")
}
// setting ioErrCnt to be > maxAllowedIOError.
// should fail with errFaultyDisk.
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
p.storage.ioErrCount = int32(6)
} else {
t.Errorf("Expected the StorageAPI to be of type *posix")
}
if _, err = posixStorage.ListVols(); err != errFaultyDisk {
t.Errorf("Expected to fail with \"%s\", but instead failed with \"%s\"", errFaultyDisk, err)
}
// removing the path and simulating disk failure
os.RemoveAll(path)
// Resetting the IO error.
// should fail with errDiskNotFound.
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
p.storage.ioErrCount = int32(0)
} else {
t.Errorf("Expected the StorageAPI to be of type *posix")
}
if _, err = posixStorage.ListVols(); err != errDiskNotFound {
t.Errorf("Expected to fail with \"%s\", but instead failed with \"%s\"", errDiskNotFound, err)
}
@ -783,9 +709,8 @@ func TestPosixPosixListDir(t *testing.T) {
}
testCases := []struct {
srcVol string
srcPath string
ioErrCnt int
srcVol string
srcPath string
// expected result.
expectedListDir []string
expectedErr error
@ -795,7 +720,6 @@ func TestPosixPosixListDir(t *testing.T) {
{
srcVol: "success-vol",
srcPath: "abc",
ioErrCnt: 0,
expectedListDir: []string{"def/", "xyz/"},
expectedErr: nil,
},
@ -804,7 +728,6 @@ func TestPosixPosixListDir(t *testing.T) {
{
srcVol: "success-vol",
srcPath: "abc/def",
ioErrCnt: 0,
expectedListDir: []string{"ghi/"},
expectedErr: nil,
},
@ -813,7 +736,6 @@ func TestPosixPosixListDir(t *testing.T) {
{
srcVol: "success-vol",
srcPath: "abc/def/ghi",
ioErrCnt: 0,
expectedListDir: []string{"success-file"},
expectedErr: nil,
},
@ -821,7 +743,6 @@ func TestPosixPosixListDir(t *testing.T) {
{
srcVol: "success-vol",
srcPath: "abcdef",
ioErrCnt: 0,
expectedErr: errFileNotFound,
},
// TestPosix case - 3.
@ -829,34 +750,20 @@ func TestPosixPosixListDir(t *testing.T) {
{
srcVol: "ab",
srcPath: "success-file",
ioErrCnt: 0,
expectedErr: errVolumeNotFound,
},
// TestPosix case - 4.
// TestPosix case with io error count > max limit.
{
srcVol: "success-vol",
srcPath: "success-file",
ioErrCnt: 6,
expectedErr: errFaultyDisk,
},
// TestPosix case - 5.
// TestPosix case with non existent volume.
{
srcVol: "non-existent-vol",
srcPath: "success-file",
ioErrCnt: 0,
expectedErr: errVolumeNotFound,
},
}
for i, testCase := range testCases {
var dirList []string
// setting ioErrCnt from the test case.
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
p.storage.ioErrCount = int32(testCase.ioErrCnt)
} else {
if _, ok := posixStorage.(*posixDiskIDCheck); !ok {
t.Errorf("Expected the StorageAPI to be of type *posix")
}
dirList, err = posixStorage.ListDir(testCase.srcVol, testCase.srcPath, -1, "")
@ -943,7 +850,6 @@ func TestPosixDeleteFile(t *testing.T) {
testCases := []struct {
srcVol string
srcPath string
ioErrCnt int
expectedErr error
}{
// TestPosix case - 1.
@ -951,7 +857,6 @@ func TestPosixDeleteFile(t *testing.T) {
{
srcVol: "success-vol",
srcPath: "success-file",
ioErrCnt: 0,
expectedErr: nil,
},
// TestPosix case - 2.
@ -959,58 +864,41 @@ func TestPosixDeleteFile(t *testing.T) {
{
srcVol: "success-vol",
srcPath: "success-file",
ioErrCnt: 0,
expectedErr: errFileNotFound,
},
// TestPosix case - 3.
// TestPosix case with io error count > max limit.
{
srcVol: "success-vol",
srcPath: "success-file",
ioErrCnt: 6,
expectedErr: errFaultyDisk,
},
// TestPosix case - 4.
// TestPosix case with segment of the volume name > 255.
{
srcVol: "my",
srcPath: "success-file",
ioErrCnt: 0,
expectedErr: errVolumeNotFound,
},
// TestPosix case - 5.
// TestPosix case - 4.
// TestPosix case with non-existent volume.
{
srcVol: "non-existent-vol",
srcPath: "success-file",
ioErrCnt: 0,
expectedErr: errVolumeNotFound,
},
// TestPosix case - 6.
// TestPosix case - 5.
// TestPosix case with src path segment > 255.
{
srcVol: "success-vol",
srcPath: "my-obj-del-0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001",
ioErrCnt: 0,
expectedErr: errFileNameTooLong,
},
// TestPosix case - 7.
// TestPosix case - 6.
// TestPosix case with undeletable parent directory.
// File can delete, dir cannot delete because no-permissions doesn't have write perms.
{
srcVol: "no-permissions",
srcPath: "dir/file",
ioErrCnt: 0,
expectedErr: nil,
},
}
for i, testCase := range testCases {
// setting ioErrCnt from the test case.
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
p.storage.ioErrCount = int32(testCase.ioErrCnt)
} else {
if _, ok := posixStorage.(*posixDiskIDCheck); !ok {
t.Errorf("Expected the StorageAPI to be of type *posix")
}
if err = posixStorage.DeleteFile(testCase.srcVol, testCase.srcPath); err != testCase.expectedErr {
@ -1242,22 +1130,6 @@ func TestPosixReadFile(t *testing.T) {
t.Errorf("expected: %s, got: %s", errFileAccessDenied, err)
}
}
// TestPosixing for faulty disk.
// setting ioErrCnt to 6.
// should fail with errFaultyDisk.
if posixType, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
posixType.storage.ioErrCount = int32(6)
// Common read buffer.
var buf = make([]byte, 10)
_, err = posixType.ReadFile("abc", "yes", 0, buf, nil)
if err != errFaultyDisk {
t.Fatalf("Expected \"Faulty Disk\", got: \"%s\"", err)
}
} else {
t.Fatalf("Expected the StorageAPI to be of type *posixDiskIDCheck")
}
}
var posixReadFileWithVerifyTests = []struct {
@ -1433,21 +1305,6 @@ func TestPosixAppendFile(t *testing.T) {
if err != errVolumeNotFound {
t.Fatalf("expected: \"Invalid argument error\", got: \"%s\"", err)
}
// TestPosix case with IO error count > max limit.
// setting ioErrCnt to 6.
// should fail with errFaultyDisk.
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
p.storage.ioErrCount = int32(6)
err = p.AppendFile("abc", "yes", []byte("hello, world"))
if err != errFaultyDisk {
t.Fatalf("Expected \"Faulty Disk\", got: \"%s\"", err)
}
} else {
t.Fatalf("Expected the StorageAPI to be of type *posix")
}
}
// TestPosix posix.RenameFile()
@ -1494,7 +1351,6 @@ func TestPosixRenameFile(t *testing.T) {
destVol string
srcPath string
destPath string
ioErrCnt int
expectedErr error
}{
// TestPosix case - 1.
@ -1503,7 +1359,6 @@ func TestPosixRenameFile(t *testing.T) {
destVol: "dest-vol",
srcPath: "file1",
destPath: "file-one",
ioErrCnt: 0,
expectedErr: nil,
},
// TestPosix case - 2.
@ -1512,7 +1367,6 @@ func TestPosixRenameFile(t *testing.T) {
destVol: "dest-vol",
srcPath: "path/",
destPath: "new-path/",
ioErrCnt: 0,
expectedErr: nil,
},
// TestPosix case - 3.
@ -1522,7 +1376,6 @@ func TestPosixRenameFile(t *testing.T) {
destVol: "dest-vol",
srcPath: "file2",
destPath: "file-one",
ioErrCnt: 0,
expectedErr: nil,
},
// TestPosix case - 4.
@ -1533,7 +1386,6 @@ func TestPosixRenameFile(t *testing.T) {
destVol: "dest-vol",
srcPath: "file3",
destPath: "file-two",
ioErrCnt: 1,
expectedErr: nil,
},
// TestPosix case - 5.
@ -1544,7 +1396,6 @@ func TestPosixRenameFile(t *testing.T) {
destVol: "dest-vol",
srcPath: "file4",
destPath: "file-three",
ioErrCnt: 5,
expectedErr: nil,
},
// TestPosix case - 6.
@ -1554,7 +1405,6 @@ func TestPosixRenameFile(t *testing.T) {
destVol: "dest-vol",
srcPath: "non-existent-file",
destPath: "file-three",
ioErrCnt: 0,
expectedErr: errFileNotFound,
},
// TestPosix case - 7.
@ -1564,7 +1414,6 @@ func TestPosixRenameFile(t *testing.T) {
destVol: "dest-vol",
srcPath: "path/",
destPath: "file-one",
ioErrCnt: 0,
expectedErr: errFileAccessDenied,
},
// TestPosix case - 8.
@ -1574,20 +1423,9 @@ func TestPosixRenameFile(t *testing.T) {
destVol: "dest-vol",
srcPath: "path/",
destPath: "new-path/",
ioErrCnt: 0,
expectedErr: errFileAccessDenied,
},
// TestPosix case - 9.
// TestPosix case with io error count is greater than maxAllowedIOError.
{
srcVol: "src-vol",
destVol: "dest-vol",
srcPath: "path/",
destPath: "new-path/",
ioErrCnt: 6,
expectedErr: errFaultyDisk,
},
// TestPosix case - 10.
// TestPosix case with source being a file and destination being a directory.
// Either both have to be files or directories.
// Expecting to fail with `errFileAccessDenied`.
@ -1596,10 +1434,9 @@ func TestPosixRenameFile(t *testing.T) {
destVol: "dest-vol",
srcPath: "file4",
destPath: "new-path/",
ioErrCnt: 0,
expectedErr: errFileAccessDenied,
},
// TestPosix case - 11.
// TestPosix case - 10.
// TestPosix case with non-existent source volume.
// Expecting to fail with `errVolumeNotFound`.
{
@ -1607,10 +1444,9 @@ func TestPosixRenameFile(t *testing.T) {
destVol: "dest-vol",
srcPath: "file4",
destPath: "new-path/",
ioErrCnt: 0,
expectedErr: errVolumeNotFound,
},
// TestPosix case - 12.
// TestPosix case - 11.
// TestPosix case with non-existent destination volume.
// Expecting to fail with `errVolumeNotFound`.
{
@ -1618,10 +1454,9 @@ func TestPosixRenameFile(t *testing.T) {
destVol: "dest-vol-non-existent",
srcPath: "file4",
destPath: "new-path/",
ioErrCnt: 0,
expectedErr: errVolumeNotFound,
},
// TestPosix case - 13.
// TestPosix case - 12.
// TestPosix case with invalid src volume name. Length should be atleast 3.
// Expecting to fail with `errInvalidArgument`.
{
@ -1629,7 +1464,16 @@ func TestPosixRenameFile(t *testing.T) {
destVol: "dest-vol-non-existent",
srcPath: "file4",
destPath: "new-path/",
ioErrCnt: 0,
expectedErr: errVolumeNotFound,
},
// TestPosix case - 13.
// TestPosix case with invalid destination volume name. Length should be atleast 3.
// Expecting to fail with `errInvalidArgument`.
{
srcVol: "abcd",
destVol: "ef",
srcPath: "file4",
destPath: "new-path/",
expectedErr: errVolumeNotFound,
},
// TestPosix case - 14.
@ -1640,21 +1484,9 @@ func TestPosixRenameFile(t *testing.T) {
destVol: "ef",
srcPath: "file4",
destPath: "new-path/",
ioErrCnt: 0,
expectedErr: errVolumeNotFound,
},
// TestPosix case - 15.
// TestPosix case with invalid destination volume name. Length should be atleast 3.
// Expecting to fail with `errInvalidArgument`.
{
srcVol: "abcd",
destVol: "ef",
srcPath: "file4",
destPath: "new-path/",
ioErrCnt: 0,
expectedErr: errVolumeNotFound,
},
// TestPosix case - 16.
// TestPosix case with the parent of the destination being a file.
// expected to fail with `errFileAccessDenied`.
{
@ -1662,10 +1494,9 @@ func TestPosixRenameFile(t *testing.T) {
destVol: "dest-vol",
srcPath: "file5",
destPath: "file-one/parent-is-file",
ioErrCnt: 0,
expectedErr: errFileAccessDenied,
},
// TestPosix case - 17.
// TestPosix case - 16.
// TestPosix case with segment of source file name more than 255.
// expected not to fail.
{
@ -1673,10 +1504,9 @@ func TestPosixRenameFile(t *testing.T) {
destVol: "dest-vol",
srcPath: "path/to/my/object0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001",
destPath: "file-six",
ioErrCnt: 0,
expectedErr: errFileNameTooLong,
},
// TestPosix case - 18.
// TestPosix case - 17.
// TestPosix case with segment of destination file name more than 255.
// expected not to fail.
{
@ -1684,17 +1514,12 @@ func TestPosixRenameFile(t *testing.T) {
destVol: "dest-vol",
srcPath: "file6",
destPath: "path/to/my/object0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001",
ioErrCnt: 0,
expectedErr: errFileNameTooLong,
},
}
for i, testCase := range testCases {
// setting ioErrCnt from the test case.
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
p.storage.ioErrCount = int32(testCase.ioErrCnt)
} else {
if _, ok := posixStorage.(*posixDiskIDCheck); !ok {
t.Fatalf("Expected the StorageAPI to be of type *posix")
}
@ -1729,7 +1554,6 @@ func TestPosixStatFile(t *testing.T) {
testCases := []struct {
srcVol string
srcPath string
ioErrCnt int
expectedErr error
}{
// TestPosix case - 1.
@ -1737,7 +1561,6 @@ func TestPosixStatFile(t *testing.T) {
{
srcVol: "success-vol",
srcPath: "success-file",
ioErrCnt: 0,
expectedErr: nil,
},
// TestPosix case - 2.
@ -1745,7 +1568,6 @@ func TestPosixStatFile(t *testing.T) {
{
srcVol: "success-vol",
srcPath: "path/to/success-file",
ioErrCnt: 0,
expectedErr: nil,
},
// TestPosix case - 3.
@ -1753,7 +1575,6 @@ func TestPosixStatFile(t *testing.T) {
{
srcVol: "success-vol",
srcPath: "nonexistent-file",
ioErrCnt: 0,
expectedErr: errFileNotFound,
},
// TestPosix case - 4.
@ -1761,7 +1582,6 @@ func TestPosixStatFile(t *testing.T) {
{
srcVol: "success-vol",
srcPath: "path/2/success-file",
ioErrCnt: 0,
expectedErr: errFileNotFound,
},
// TestPosix case - 5.
@ -1769,33 +1589,19 @@ func TestPosixStatFile(t *testing.T) {
{
srcVol: "success-vol",
srcPath: "path",
ioErrCnt: 0,
expectedErr: errFileNotFound,
},
// TestPosix case - 6.
// TestPosix case with io error count > max limit.
{
srcVol: "success-vol",
srcPath: "success-file",
ioErrCnt: 6,
expectedErr: errFaultyDisk,
},
// TestPosix case - 7.
// TestPosix case with non existent volume.
{
srcVol: "non-existent-vol",
srcPath: "success-file",
ioErrCnt: 0,
expectedErr: errVolumeNotFound,
},
}
for i, testCase := range testCases {
// setting ioErrCnt from the test case.
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
p.storage.ioErrCount = int32(testCase.ioErrCnt)
} else {
if _, ok := posixStorage.(*posixDiskIDCheck); !ok {
t.Errorf("Expected the StorageAPI to be of type *posix")
}
if _, err := posixStorage.StatFile(testCase.srcVol, testCase.srcPath); err != testCase.expectedErr {