diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index a9fe5b851..f44ef7375 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -32,14 +32,10 @@ import ( "github.com/djherbis/atime" "github.com/minio/minio/cmd/logger" - "github.com/minio/minio/pkg/wildcard" - "github.com/minio/minio/pkg/hash" + "github.com/minio/minio/pkg/wildcard" ) -// list of all errors that can be ignored in tree walk operation in disk cache -var cacheTreeWalkIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errVolumeNotFound, errFileNotFound) - const ( // disk cache needs to have cacheSizeMultiplier * object size space free for a cache entry to be created. cacheSizeMultiplier = 100 @@ -291,40 +287,32 @@ func (c cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string) // Returns function "listDir" of the type listDirFunc. // isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry. // disks - list of fsObjects -func listDirCacheFactory(isLeaf isLeafFunc, treeWalkIgnoredErrs []error, disks []*cacheFSObjects) listDirFunc { - listCacheDirs := func(bucket, prefixDir, prefixEntry string) (dirs []string, err error) { +func listDirCacheFactory(isLeaf isLeafFunc, disks []*cacheFSObjects) listDirFunc { + listCacheDirs := func(bucket, prefixDir, prefixEntry string) (dirs []string) { var entries []string for _, disk := range disks { // ignore disk-caches that might be missing/offline if disk == nil { continue } - fs := disk.FSObjects - entries, err = readDir(pathJoin(fs.fsPath, bucket, prefixDir)) - // For any reason disk was deleted or goes offline, continue - // and list from other disks if possible. + fs := disk.FSObjects + var err error + entries, err = readDir(pathJoin(fs.fsPath, bucket, prefixDir)) if err != nil { - if IsErrIgnored(err, treeWalkIgnoredErrs...) { - continue - } - return nil, err + continue } // Filter entries that have the prefix prefixEntry. entries = filterMatchingPrefix(entries, prefixEntry) dirs = append(dirs, entries...) } - return dirs, nil + return dirs } // listDir - lists all the entries at a given prefix and given entry in the prefix. - listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool, err error) { - var cacheEntries []string - cacheEntries, err = listCacheDirs(bucket, prefixDir, prefixEntry) - if err != nil { - return nil, false, err - } + listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool) { + cacheEntries := listCacheDirs(bucket, prefixDir, prefixEntry) for _, entry := range cacheEntries { // Find elements in entries which are not in mergedEntries idx := sort.SearchStrings(mergedEntries, entry) @@ -335,7 +323,7 @@ func listDirCacheFactory(isLeaf isLeafFunc, treeWalkIgnoredErrs []error, disks [ mergedEntries = append(mergedEntries, entry) sort.Strings(mergedEntries) } - return mergedEntries, false, nil + return mergedEntries, false } return listDir } @@ -371,7 +359,7 @@ func (c cacheObjects) listCacheObjects(ctx context.Context, bucket, prefix, mark return fs.isObjectDir(bucket, object) } - listDir := listDirCacheFactory(isLeaf, cacheTreeWalkIgnoredErrs, c.cache.cfs) + listDir := listDirCacheFactory(isLeaf, c.cache.cfs) walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh) } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index bf5e9ed00..7f5592070 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -968,13 +968,15 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string) er // is a leaf or non-leaf entry. func (fs *FSObjects) listDirFactory(isLeaf isLeafFunc) listDirFunc { // listDir - lists all the entries at a given prefix and given entry in the prefix. - listDir := func(bucket, prefixDir, prefixEntry string) (entries []string, delayIsLeaf bool, err error) { + listDir := func(bucket, prefixDir, prefixEntry string) (entries []string, delayIsLeaf bool) { + var err error entries, err = readDir(pathJoin(fs.fsPath, bucket, prefixDir)) if err != nil { - return nil, false, err + logger.LogIf(context.Background(), err) + return } entries, delayIsLeaf = filterListEntries(bucket, prefixDir, entries, prefixEntry, isLeaf) - return entries, delayIsLeaf, nil + return entries, delayIsLeaf } // Return list factory instance. diff --git a/cmd/posix-errors.go b/cmd/posix-errors.go index e1e93c680..0bfb1a5b5 100644 --- a/cmd/posix-errors.go +++ b/cmd/posix-errors.go @@ -24,22 +24,46 @@ import ( // Function not implemented error func isSysErrNoSys(err error) bool { - return err == syscall.ENOSYS + if pathErr, ok := err.(*os.PathError); ok { + switch pathErr.Err { + case syscall.ENOSYS: + return true + } + } + return false } // Not supported error func isSysErrOpNotSupported(err error) bool { - return err == syscall.EOPNOTSUPP + if pathErr, ok := err.(*os.PathError); ok { + switch pathErr.Err { + case syscall.EOPNOTSUPP: + return true + } + } + return false } // No space left on device error func isSysErrNoSpace(err error) bool { - return err == syscall.ENOSPC + if pathErr, ok := err.(*os.PathError); ok { + switch pathErr.Err { + case syscall.ENOSPC: + return true + } + } + return false } // Input/output error func isSysErrIO(err error) bool { - return err == syscall.EIO + if pathErr, ok := err.(*os.PathError); ok { + switch pathErr.Err { + case syscall.EIO: + return true + } + } + return false } // Check if the given error corresponds to EISDIR (is a directory). diff --git a/cmd/posix.go b/cmd/posix.go index a6e429799..18556b19b 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -327,15 +327,18 @@ func (s *posix) checkDiskFound() (err error) { if !s.IsOnline() { return errDiskNotFound } - _, err = os.Stat((s.diskPath)) + _, err = os.Stat(s.diskPath) if err != nil { if os.IsNotExist(err) { return errDiskNotFound } else if isSysErrTooLong(err) { return errFileNameTooLong + } else if isSysErrIO(err) { + return errFaultyDisk } + return err } - return err + return nil } // diskUsage returns du information for the posix path, in a continuous routine. @@ -421,7 +424,7 @@ func (s *posix) diskUsage(doneCh chan struct{}) { // Make a volume entry. func (s *posix) MakeVol(volume string) (err error) { defer func() { - if err == syscall.EIO { + if err == errFaultyDisk { atomic.AddInt32(&s.ioErrCount, 1) } }() @@ -451,6 +454,8 @@ func (s *posix) MakeVol(volume string) (err error) { } if os.IsPermission(err) { return errDiskAccessDenied + } else if isSysErrIO(err) { + return errFaultyDisk } return err } @@ -462,7 +467,7 @@ func (s *posix) MakeVol(volume string) (err error) { // ListVols - list volumes. func (s *posix) ListVols() (volsInfo []VolInfo, err error) { defer func() { - if err == syscall.EIO { + if err == errFaultyDisk { atomic.AddInt32(&s.ioErrCount, 1) } }() @@ -475,8 +480,11 @@ func (s *posix) ListVols() (volsInfo []VolInfo, err error) { return nil, err } - volsInfo, err = listVols((s.diskPath)) + volsInfo, err = listVols(s.diskPath) if err != nil { + if isSysErrIO(err) { + return nil, errFaultyDisk + } return nil, err } for i, vol := range volsInfo { @@ -505,11 +513,13 @@ func listVols(dirPath string) ([]VolInfo, error) { continue } var fi os.FileInfo - fi, err = os.Stat((pathJoin(dirPath, entry))) + fi, err = os.Stat(pathJoin(dirPath, entry)) if err != nil { // If the file does not exist, skip the entry. if os.IsNotExist(err) { continue + } else if isSysErrIO(err) { + return nil, errFaultyDisk } return nil, err } @@ -526,7 +536,7 @@ func listVols(dirPath string) ([]VolInfo, error) { // StatVol - get volume info. func (s *posix) StatVol(volume string) (volInfo VolInfo, err error) { defer func() { - if err == syscall.EIO { + if err == errFaultyDisk { atomic.AddInt32(&s.ioErrCount, 1) } }() @@ -550,6 +560,8 @@ func (s *posix) StatVol(volume string) (volInfo VolInfo, err error) { if err != nil { if os.IsNotExist(err) { return VolInfo{}, errVolumeNotFound + } else if isSysErrIO(err) { + return VolInfo{}, errFaultyDisk } return VolInfo{}, err } @@ -565,7 +577,7 @@ func (s *posix) StatVol(volume string) (volInfo VolInfo, err error) { // DeleteVol - delete a volume. func (s *posix) DeleteVol(volume string) (err error) { defer func() { - if err == syscall.EIO { + if err == errFaultyDisk { atomic.AddInt32(&s.ioErrCount, 1) } }() @@ -591,8 +603,9 @@ func (s *posix) DeleteVol(volume string) (err error) { return errVolumeNotEmpty } else if os.IsPermission(err) { return errDiskAccessDenied + } else if isSysErrIO(err) { + return errFaultyDisk } - return err } return nil @@ -602,7 +615,7 @@ func (s *posix) DeleteVol(volume string) (err error) { // If an entry is a directory it will be returned with a trailing "/". func (s *posix) ListDir(volume, dirPath string, count int) (entries []string, err error) { defer func() { - if err == syscall.EIO { + if err == errFaultyDisk { atomic.AddInt32(&s.ioErrCount, 1) } }() @@ -625,6 +638,8 @@ func (s *posix) ListDir(volume, dirPath string, count int) (entries []string, er if err != nil { if os.IsNotExist(err) { return nil, errVolumeNotFound + } else if isSysErrIO(err) { + return nil, errFaultyDisk } return nil, err } @@ -644,7 +659,7 @@ func (s *posix) ListDir(volume, dirPath string, count int) (entries []string, er // not use this on large files as it would cause server to crash. func (s *posix) ReadAll(volume, path string) (buf []byte, err error) { defer func() { - if err == syscall.EIO { + if err == errFaultyDisk { atomic.AddInt32(&s.ioErrCount, 1) } }() @@ -666,6 +681,8 @@ func (s *posix) ReadAll(volume, path string) (buf []byte, err error) { if err != nil { if os.IsNotExist(err) { return nil, errVolumeNotFound + } else if isSysErrIO(err) { + return nil, errFaultyDisk } return nil, err } @@ -694,6 +711,8 @@ func (s *posix) ReadAll(volume, path string) (buf []byte, err error) { } } return nil, pathErr + } else if isSysErrIO(err) { + return nil, errFaultyDisk } return nil, err } @@ -715,7 +734,7 @@ func (s *posix) ReadAll(volume, path string) (buf []byte, err error) { // semantics are same as io.ReadFull. func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verifier *BitrotVerifier) (n int64, err error) { defer func() { - if err == syscall.EIO { + if err == errFaultyDisk { atomic.AddInt32(&s.ioErrCount, 1) } }() @@ -737,6 +756,8 @@ func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verif if err != nil { if os.IsNotExist(err) { return 0, errVolumeNotFound + } else if isSysErrIO(err) { + return 0, errFaultyDisk } return 0, err } @@ -756,6 +777,8 @@ func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verif return 0, errFileAccessDenied } else if isSysErrNotDir(err) { return 0, errFileAccessDenied + } else if isSysErrIO(err) { + return 0, errFaultyDisk } return 0, err } @@ -807,7 +830,7 @@ func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verif func (s *posix) createFile(volume, path string) (f *os.File, err error) { defer func() { - if err == syscall.EIO { + if err == errFaultyDisk { atomic.AddInt32(&s.ioErrCount, 1) } }() @@ -829,6 +852,8 @@ func (s *posix) createFile(volume, path string) (f *os.File, err error) { if err != nil { if os.IsNotExist(err) { return nil, errVolumeNotFound + } else if isSysErrIO(err) { + return nil, errFaultyDisk } return nil, err } @@ -859,6 +884,8 @@ func (s *posix) createFile(volume, path string) (f *os.File, err error) { return nil, errFileAccessDenied } else if os.IsPermission(err) { return nil, errFileAccessDenied + } else if isSysErrIO(err) { + return nil, errFaultyDisk } return nil, err } @@ -875,7 +902,7 @@ func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) { } defer func() { - if err == syscall.EIO { + if err == errFaultyDisk { atomic.AddInt32(&s.ioErrCount, 1) } }() @@ -886,6 +913,9 @@ func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) { // Validate if disk is indeed free. if err = checkDiskFree(s.diskPath, fileSize); err != nil { + if isSysErrIO(err) { + return errFaultyDisk + } return err } @@ -907,7 +937,7 @@ func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) { case isSysErrNoSpace(e): err = errDiskFull case isSysErrIO(e): - err = e + err = errFaultyDisk default: // For errors: EBADF, EINTR, EINVAL, ENODEV, EPERM, ESPIPE and ETXTBSY // Appending was failed anyway, returns unexpected error @@ -922,7 +952,7 @@ func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) { // path this call explicitly creates it. func (s *posix) AppendFile(volume, path string, buf []byte) (err error) { defer func() { - if err == syscall.EIO { + if err == errFaultyDisk { atomic.AddInt32(&s.ioErrCount, 1) } }() @@ -944,7 +974,7 @@ func (s *posix) AppendFile(volume, path string, buf []byte) (err error) { // StatFile - get file info. func (s *posix) StatFile(volume, path string) (file FileInfo, err error) { defer func() { - if err == syscall.EIO { + if err == errFaultyDisk { atomic.AddInt32(&s.ioErrCount, 1) } }() @@ -979,10 +1009,10 @@ func (s *posix) StatFile(volume, path string) (file FileInfo, err error) { // File is really not found. if os.IsNotExist(err) { return FileInfo{}, errFileNotFound - } - - // File path cannot be verified since one of the parents is a file. - if isSysErrNotDir(err) { + } else if isSysErrIO(err) { + return FileInfo{}, errFaultyDisk + } else if isSysErrNotDir(err) { + // File path cannot be verified since one of the parents is a file. return FileInfo{}, errFileNotFound } @@ -1023,6 +1053,8 @@ func deleteFile(basePath, deletePath string) error { return errFileNotFound } else if os.IsPermission(err) { return errFileAccessDenied + } else if isSysErrIO(err) { + return errFaultyDisk } return err } @@ -1041,7 +1073,7 @@ func deleteFile(basePath, deletePath string) error { // DeleteFile - delete a file at path. func (s *posix) DeleteFile(volume, path string) (err error) { defer func() { - if err == syscall.EIO { + if err == errFaultyDisk { atomic.AddInt32(&s.ioErrCount, 1) } }() @@ -1063,6 +1095,8 @@ func (s *posix) DeleteFile(volume, path string) (err error) { if err != nil { if os.IsNotExist(err) { return errVolumeNotFound + } else if isSysErrIO(err) { + return errFaultyDisk } return err } @@ -1081,7 +1115,7 @@ func (s *posix) DeleteFile(volume, path string) (err error) { // RenameFile - rename source path to destination path atomically. func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) { defer func() { - if err == syscall.EIO { + if err == errFaultyDisk { atomic.AddInt32(&s.ioErrCount, 1) } }() @@ -1107,6 +1141,8 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e if err != nil { if os.IsNotExist(err) { return errVolumeNotFound + } else if isSysErrIO(err) { + return errFaultyDisk } return err } @@ -1114,6 +1150,8 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e if err != nil { if os.IsNotExist(err) { return errVolumeNotFound + } else if isSysErrIO(err) { + return errFaultyDisk } } @@ -1136,7 +1174,9 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e // we still need to allow overwriting an empty directory since it represents // an object empty directory. _, err = os.Stat(dstFilePath) - + if isSysErrIO(err) { + return errFaultyDisk + } if err == nil && !isDirEmpty(dstFilePath) { return errFileAccessDenied } @@ -1146,6 +1186,9 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e } if err = renameAll(srcFilePath, dstFilePath); err != nil { + if isSysErrIO(err) { + return errFaultyDisk + } return err } diff --git a/cmd/tree-walk.go b/cmd/tree-walk.go index 98228390e..97ce51d1e 100644 --- a/cmd/tree-walk.go +++ b/cmd/tree-walk.go @@ -86,7 +86,7 @@ func filterMatchingPrefix(entries []string, prefixEntry string) []string { } // "listDir" function of type listDirFunc returned by listDirFactory() - explained below. -type listDirFunc func(bucket, prefixDir, prefixEntry string) (entries []string, delayIsLeaf bool, err error) +type listDirFunc func(bucket, prefixDir, prefixEntry string) (entries []string, delayIsLeaf bool) // A function isLeaf of type isLeafFunc is used to detect if an entry is a leaf entry. There are four scenarios // where isLeaf should behave differently: @@ -141,16 +141,8 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker markerBase = markerSplit[1] } } - entries, delayIsLeaf, err := listDir(bucket, prefixDir, entryPrefixMatch) - if err != nil { - select { - case <-endWalkCh: - return errWalkAbort - case resultCh <- treeWalkResult{err: err}: - return err - } - } + entries, delayIsLeaf := listDir(bucket, prefixDir, entryPrefixMatch) // When isleaf check is delayed, make sure that it is set correctly here. if delayIsLeaf && isLeaf == nil { return errInvalidArgument diff --git a/cmd/tree-walk_test.go b/cmd/tree-walk_test.go index 8340e853e..fe671701f 100644 --- a/cmd/tree-walk_test.go +++ b/cmd/tree-walk_test.go @@ -196,7 +196,7 @@ func TestTreeWalk(t *testing.T) { return len(entries) == 0 } - listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk) + listDir := listDirFactory(context.Background(), isLeaf, disk) // Simple test for prefix based walk. testTreeWalkPrefix(t, listDir, isLeaf, isLeafDir) // Simple test when marker is set. @@ -240,7 +240,7 @@ func TestTreeWalkTimeout(t *testing.T) { return len(entries) == 0 } - listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk) + listDir := listDirFactory(context.Background(), isLeaf, disk) // TreeWalk pool with 2 seconds timeout for tree-walk go routines. pool := newTreeWalkPool(2 * time.Second) @@ -315,7 +315,7 @@ func TestListDir(t *testing.T) { // create listDir function. listDir := listDirFactory(context.Background(), func(volume, prefix string) bool { return !hasSuffix(prefix, slashSeparator) - }, xlTreeWalkIgnoredErrs, disk1, disk2) + }, disk1, disk2) // Create file1 in fsDir1 and file2 in fsDir2. disks := []StorageAPI{disk1, disk2} @@ -327,10 +327,7 @@ func TestListDir(t *testing.T) { } // Should list "file1" from fsDir1. - entries, _, err := listDir(volume, "", "") - if err != nil { - t.Error(err) - } + entries, _ := listDir(volume, "", "") if len(entries) != 2 { t.Fatal("Expected the number of entries to be 2") } @@ -348,10 +345,7 @@ func TestListDir(t *testing.T) { } // Should list "file2" from fsDir2. - entries, _, err = listDir(volume, "", "") - if err != nil { - t.Error(err) - } + entries, _ = listDir(volume, "", "") if len(entries) != 1 { t.Fatal("Expected the number of entries to be 1") } @@ -362,13 +356,6 @@ func TestListDir(t *testing.T) { if err != nil { t.Error(err) } - // None of the disks are available, should get - // errDiskNotFound. Since errDiskNotFound is an ignored error, - // we should get nil. - _, _, err = listDir(volume, "", "") - if err != nil { - t.Errorf("expected nil error but found %v.", err) - } } // TestRecursiveWalk - tests if treeWalk returns entries correctly with and @@ -400,7 +387,7 @@ func TestRecursiveTreeWalk(t *testing.T) { } // Create listDir function. - listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk1) + listDir := listDirFactory(context.Background(), isLeaf, disk1) // Create the namespace. var files = []string{ @@ -515,7 +502,7 @@ func TestSortedness(t *testing.T) { } // Create listDir function. - listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk1) + listDir := listDirFactory(context.Background(), isLeaf, disk1) // Create the namespace. var files = []string{ @@ -598,7 +585,7 @@ func TestTreeWalkIsEnd(t *testing.T) { } // Create listDir function. - listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk1) + listDir := listDirFactory(context.Background(), isLeaf, disk1) // Create the namespace. var files = []string{ diff --git a/cmd/utils_test.go b/cmd/utils_test.go index 071c4e1a6..b1caa2bdc 100644 --- a/cmd/utils_test.go +++ b/cmd/utils_test.go @@ -445,6 +445,7 @@ func TestCeilFrac(t *testing.T) { // Test if isErrIgnored works correctly. func TestIsErrIgnored(t *testing.T) { var errIgnored = fmt.Errorf("ignored error") + ignoredErrs := append(baseIgnoredErrs, errIgnored) var testCases = []struct { err error ignored bool @@ -457,9 +458,13 @@ func TestIsErrIgnored(t *testing.T) { err: errIgnored, ignored: true, }, + { + err: errFaultyDisk, + ignored: true, + }, } for i, testCase := range testCases { - if ok := IsErrIgnored(testCase.err, errIgnored); ok != testCase.ignored { + if ok := IsErrIgnored(testCase.err, ignoredErrs...); ok != testCase.ignored { t.Errorf("Test: %d, Expected %t, got %t", i+1, testCase.ignored, ok) } } diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index cb6f453a8..8ee0b6ed6 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -622,8 +622,8 @@ func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, destBucke // Returns function "listDir" of the type listDirFunc. // isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry. // disks - used for doing disk.ListDir(). Sets passes set of disks. -func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, isLeafDir isLeafDirFunc, treeWalkIgnoredErrs []error, sets ...[]StorageAPI) listDirFunc { - listDirInternal := func(bucket, prefixDir, prefixEntry string, disks []StorageAPI) (mergedEntries []string, err error) { +func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, isLeafDir isLeafDirFunc, sets ...[]StorageAPI) listDirFunc { + listDirInternal := func(bucket, prefixDir, prefixEntry string, disks []StorageAPI) (mergedEntries []string) { for _, disk := range disks { if disk == nil { continue @@ -631,15 +631,10 @@ func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, isLeafDir isLeaf var entries []string var newEntries []string + var err error entries, err = disk.ListDir(bucket, prefixDir, -1) if err != nil { - // For any reason disk was deleted or goes offline, continue - // and list from other disks if possible. - if IsErrIgnored(err, treeWalkIgnoredErrs...) { - continue - } - logger.LogIf(ctx, err) - return nil, err + continue } // Find elements in entries which are not in mergedEntries @@ -658,18 +653,13 @@ func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, isLeafDir isLeaf sort.Strings(mergedEntries) } } - return mergedEntries, nil + return mergedEntries } // listDir - lists all the entries at a given prefix and given entry in the prefix. - listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool, err error) { + listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool) { for _, disks := range sets { - var entries []string - entries, err = listDirInternal(bucket, prefixDir, prefixEntry, disks) - if err != nil { - return nil, false, err - } - + entries := listDirInternal(bucket, prefixDir, prefixEntry, disks) var newEntries []string // Find elements in entries which are not in mergedEntries for _, entry := range entries { @@ -688,7 +678,7 @@ func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, isLeafDir isLeaf } } mergedEntries, delayIsLeaf = filterListEntries(bucket, prefixDir, mergedEntries, prefixEntry, isLeaf) - return mergedEntries, delayIsLeaf, nil + return mergedEntries, delayIsLeaf } return listDir } @@ -731,7 +721,7 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi setDisks = append(setDisks, set.getLoadBalancedDisks()) } - listDir := listDirSetsFactory(ctx, isLeaf, isLeafDir, xlTreeWalkIgnoredErrs, setDisks...) + listDir := listDirSetsFactory(ctx, isLeaf, isLeafDir, setDisks...) walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh) } @@ -1266,13 +1256,15 @@ func (s *xlSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) { // isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry. // disks - used for doing disk.ListDir(). Sets passes set of disks. func listDirSetsHealFactory(isLeaf isLeafFunc, sets ...[]StorageAPI) listDirFunc { - listDirInternal := func(bucket, prefixDir, prefixEntry string, disks []StorageAPI) (mergedEntries []string, err error) { + listDirInternal := func(bucket, prefixDir, prefixEntry string, disks []StorageAPI) (mergedEntries []string) { for _, disk := range disks { if disk == nil { continue } + var entries []string var newEntries []string + var err error entries, err = disk.ListDir(bucket, prefixDir, -1) if err != nil { continue @@ -1305,18 +1297,14 @@ func listDirSetsHealFactory(isLeaf isLeafFunc, sets ...[]StorageAPI) listDirFunc sort.Strings(mergedEntries) } } - return mergedEntries, nil + return mergedEntries } // listDir - lists all the entries at a given prefix and given entry in the prefix. - listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool, err error) { + listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool) { for _, disks := range sets { - var entries []string - entries, err = listDirInternal(bucket, prefixDir, prefixEntry, disks) - if err != nil { - return nil, false, err - } + entries := listDirInternal(bucket, prefixDir, prefixEntry, disks) var newEntries []string // Find elements in entries which are not in mergedEntries @@ -1335,7 +1323,7 @@ func listDirSetsHealFactory(isLeaf isLeafFunc, sets ...[]StorageAPI) listDirFunc sort.Strings(mergedEntries) } } - return mergedEntries, false, nil + return mergedEntries, false } return listDir } diff --git a/cmd/xl-v1-common.go b/cmd/xl-v1-common.go index c7bfb818e..b1f72ef8a 100644 --- a/cmd/xl-v1-common.go +++ b/cmd/xl-v1-common.go @@ -51,6 +51,8 @@ func (xl xlObjects) parentDirIsObject(ctx context.Context, bucket, parent string return isParentDirObject(parent) } +var xlTreeWalkIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errVolumeNotFound, errFileNotFound) + // isObject - returns `true` if the prefix is an object i.e if // `xl.json` exists at the leaf, false otherwise. func (xl xlObjects) isObject(bucket, prefix string) (ok bool) { diff --git a/cmd/xl-v1-list-objects.go b/cmd/xl-v1-list-objects.go index 696093ccb..d8386f9b8 100644 --- a/cmd/xl-v1-list-objects.go +++ b/cmd/xl-v1-list-objects.go @@ -24,23 +24,19 @@ import ( // Returns function "listDir" of the type listDirFunc. // isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry. // disks - used for doing disk.ListDir() -func listDirFactory(ctx context.Context, isLeaf isLeafFunc, treeWalkIgnoredErrs []error, disks ...StorageAPI) listDirFunc { +func listDirFactory(ctx context.Context, isLeaf isLeafFunc, disks ...StorageAPI) listDirFunc { // Returns sorted merged entries from all the disks. - listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool, err error) { + listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool) { for _, disk := range disks { if disk == nil { continue } var entries []string var newEntries []string + var err error entries, err = disk.ListDir(bucket, prefixDir, -1) if err != nil { - // For any reason disk was deleted or goes offline, continue - // and list from other disks if possible. - if IsErrIgnored(err, treeWalkIgnoredErrs...) { - continue - } - return nil, false, err + continue } // Find elements in entries which are not in mergedEntries @@ -60,7 +56,7 @@ func listDirFactory(ctx context.Context, isLeaf isLeafFunc, treeWalkIgnoredErrs } } mergedEntries, delayIsLeaf = filterListEntries(bucket, prefixDir, mergedEntries, prefixEntry, isLeaf) - return mergedEntries, delayIsLeaf, nil + return mergedEntries, delayIsLeaf } return listDir } @@ -79,7 +75,7 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del endWalkCh = make(chan struct{}) isLeaf := xl.isObject isLeafDir := xl.isObjectDir - listDir := listDirFactory(ctx, isLeaf, xlTreeWalkIgnoredErrs, xl.getLoadBalancedDisks()...) + listDir := listDirFactory(ctx, isLeaf, xl.getLoadBalancedDisks()...) walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh) } diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 25dd283d4..450ca613e 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -49,9 +49,6 @@ type xlObjects struct { listPool *treeWalkPool } -// list of all errors that can be ignored in tree walk operation in XL -var xlTreeWalkIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errVolumeNotFound, errFileNotFound) - // Shutdown function for object storage interface. func (xl xlObjects) Shutdown(ctx context.Context) error { // Add any object layer shutdown activities here.