mirror of
https://github.com/minio/minio.git
synced 2025-01-23 04:33:15 -05:00
Make sure to handle FaultyDisks in listing ops (#6204)
Continuing from PR 157ed65c352e40c71fe6ab91738321d95bd19b34 Our posix.go implementation did not handle I/O errors properly on the disks, this led to situations where top-level callers such as ListObjects might return early without even verifying all the available disks. This commit tries to address this in Kubernetes, drbd/nbd based persistent volumes which can disconnect under load and result in the situations with disks return I/O errors. This commit also simplifies listing operation, listing never returns any error. We can avoid this since we pretty much ignore most of the errors anyways. When objects are accessed directly we return proper errors.
This commit is contained in:
parent
644c2ce326
commit
ad86454580
@ -32,14 +32,10 @@ import (
|
||||
"github.com/djherbis/atime"
|
||||
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/wildcard"
|
||||
|
||||
"github.com/minio/minio/pkg/hash"
|
||||
"github.com/minio/minio/pkg/wildcard"
|
||||
)
|
||||
|
||||
// list of all errors that can be ignored in tree walk operation in disk cache
|
||||
var cacheTreeWalkIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errVolumeNotFound, errFileNotFound)
|
||||
|
||||
const (
|
||||
// disk cache needs to have cacheSizeMultiplier * object size space free for a cache entry to be created.
|
||||
cacheSizeMultiplier = 100
|
||||
@ -291,40 +287,32 @@ func (c cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string)
|
||||
// Returns function "listDir" of the type listDirFunc.
|
||||
// isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry.
|
||||
// disks - list of fsObjects
|
||||
func listDirCacheFactory(isLeaf isLeafFunc, treeWalkIgnoredErrs []error, disks []*cacheFSObjects) listDirFunc {
|
||||
listCacheDirs := func(bucket, prefixDir, prefixEntry string) (dirs []string, err error) {
|
||||
func listDirCacheFactory(isLeaf isLeafFunc, disks []*cacheFSObjects) listDirFunc {
|
||||
listCacheDirs := func(bucket, prefixDir, prefixEntry string) (dirs []string) {
|
||||
var entries []string
|
||||
for _, disk := range disks {
|
||||
// ignore disk-caches that might be missing/offline
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
fs := disk.FSObjects
|
||||
entries, err = readDir(pathJoin(fs.fsPath, bucket, prefixDir))
|
||||
|
||||
// For any reason disk was deleted or goes offline, continue
|
||||
// and list from other disks if possible.
|
||||
fs := disk.FSObjects
|
||||
var err error
|
||||
entries, err = readDir(pathJoin(fs.fsPath, bucket, prefixDir))
|
||||
if err != nil {
|
||||
if IsErrIgnored(err, treeWalkIgnoredErrs...) {
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
continue
|
||||
}
|
||||
|
||||
// Filter entries that have the prefix prefixEntry.
|
||||
entries = filterMatchingPrefix(entries, prefixEntry)
|
||||
dirs = append(dirs, entries...)
|
||||
}
|
||||
return dirs, nil
|
||||
return dirs
|
||||
}
|
||||
|
||||
// listDir - lists all the entries at a given prefix and given entry in the prefix.
|
||||
listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool, err error) {
|
||||
var cacheEntries []string
|
||||
cacheEntries, err = listCacheDirs(bucket, prefixDir, prefixEntry)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool) {
|
||||
cacheEntries := listCacheDirs(bucket, prefixDir, prefixEntry)
|
||||
for _, entry := range cacheEntries {
|
||||
// Find elements in entries which are not in mergedEntries
|
||||
idx := sort.SearchStrings(mergedEntries, entry)
|
||||
@ -335,7 +323,7 @@ func listDirCacheFactory(isLeaf isLeafFunc, treeWalkIgnoredErrs []error, disks [
|
||||
mergedEntries = append(mergedEntries, entry)
|
||||
sort.Strings(mergedEntries)
|
||||
}
|
||||
return mergedEntries, false, nil
|
||||
return mergedEntries, false
|
||||
}
|
||||
return listDir
|
||||
}
|
||||
@ -371,7 +359,7 @@ func (c cacheObjects) listCacheObjects(ctx context.Context, bucket, prefix, mark
|
||||
return fs.isObjectDir(bucket, object)
|
||||
}
|
||||
|
||||
listDir := listDirCacheFactory(isLeaf, cacheTreeWalkIgnoredErrs, c.cache.cfs)
|
||||
listDir := listDirCacheFactory(isLeaf, c.cache.cfs)
|
||||
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh)
|
||||
}
|
||||
|
||||
|
@ -968,13 +968,15 @@ func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string) er
|
||||
// is a leaf or non-leaf entry.
|
||||
func (fs *FSObjects) listDirFactory(isLeaf isLeafFunc) listDirFunc {
|
||||
// listDir - lists all the entries at a given prefix and given entry in the prefix.
|
||||
listDir := func(bucket, prefixDir, prefixEntry string) (entries []string, delayIsLeaf bool, err error) {
|
||||
listDir := func(bucket, prefixDir, prefixEntry string) (entries []string, delayIsLeaf bool) {
|
||||
var err error
|
||||
entries, err = readDir(pathJoin(fs.fsPath, bucket, prefixDir))
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
logger.LogIf(context.Background(), err)
|
||||
return
|
||||
}
|
||||
entries, delayIsLeaf = filterListEntries(bucket, prefixDir, entries, prefixEntry, isLeaf)
|
||||
return entries, delayIsLeaf, nil
|
||||
return entries, delayIsLeaf
|
||||
}
|
||||
|
||||
// Return list factory instance.
|
||||
|
@ -24,22 +24,46 @@ import (
|
||||
|
||||
// Function not implemented error
|
||||
func isSysErrNoSys(err error) bool {
|
||||
return err == syscall.ENOSYS
|
||||
if pathErr, ok := err.(*os.PathError); ok {
|
||||
switch pathErr.Err {
|
||||
case syscall.ENOSYS:
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Not supported error
|
||||
func isSysErrOpNotSupported(err error) bool {
|
||||
return err == syscall.EOPNOTSUPP
|
||||
if pathErr, ok := err.(*os.PathError); ok {
|
||||
switch pathErr.Err {
|
||||
case syscall.EOPNOTSUPP:
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// No space left on device error
|
||||
func isSysErrNoSpace(err error) bool {
|
||||
return err == syscall.ENOSPC
|
||||
if pathErr, ok := err.(*os.PathError); ok {
|
||||
switch pathErr.Err {
|
||||
case syscall.ENOSPC:
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Input/output error
|
||||
func isSysErrIO(err error) bool {
|
||||
return err == syscall.EIO
|
||||
if pathErr, ok := err.(*os.PathError); ok {
|
||||
switch pathErr.Err {
|
||||
case syscall.EIO:
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if the given error corresponds to EISDIR (is a directory).
|
||||
|
91
cmd/posix.go
91
cmd/posix.go
@ -327,15 +327,18 @@ func (s *posix) checkDiskFound() (err error) {
|
||||
if !s.IsOnline() {
|
||||
return errDiskNotFound
|
||||
}
|
||||
_, err = os.Stat((s.diskPath))
|
||||
_, err = os.Stat(s.diskPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return errDiskNotFound
|
||||
} else if isSysErrTooLong(err) {
|
||||
return errFileNameTooLong
|
||||
} else if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
}
|
||||
return err
|
||||
}
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
// diskUsage returns du information for the posix path, in a continuous routine.
|
||||
@ -421,7 +424,7 @@ func (s *posix) diskUsage(doneCh chan struct{}) {
|
||||
// Make a volume entry.
|
||||
func (s *posix) MakeVol(volume string) (err error) {
|
||||
defer func() {
|
||||
if err == syscall.EIO {
|
||||
if err == errFaultyDisk {
|
||||
atomic.AddInt32(&s.ioErrCount, 1)
|
||||
}
|
||||
}()
|
||||
@ -451,6 +454,8 @@ func (s *posix) MakeVol(volume string) (err error) {
|
||||
}
|
||||
if os.IsPermission(err) {
|
||||
return errDiskAccessDenied
|
||||
} else if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
}
|
||||
return err
|
||||
}
|
||||
@ -462,7 +467,7 @@ func (s *posix) MakeVol(volume string) (err error) {
|
||||
// ListVols - list volumes.
|
||||
func (s *posix) ListVols() (volsInfo []VolInfo, err error) {
|
||||
defer func() {
|
||||
if err == syscall.EIO {
|
||||
if err == errFaultyDisk {
|
||||
atomic.AddInt32(&s.ioErrCount, 1)
|
||||
}
|
||||
}()
|
||||
@ -475,8 +480,11 @@ func (s *posix) ListVols() (volsInfo []VolInfo, err error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
volsInfo, err = listVols((s.diskPath))
|
||||
volsInfo, err = listVols(s.diskPath)
|
||||
if err != nil {
|
||||
if isSysErrIO(err) {
|
||||
return nil, errFaultyDisk
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
for i, vol := range volsInfo {
|
||||
@ -505,11 +513,13 @@ func listVols(dirPath string) ([]VolInfo, error) {
|
||||
continue
|
||||
}
|
||||
var fi os.FileInfo
|
||||
fi, err = os.Stat((pathJoin(dirPath, entry)))
|
||||
fi, err = os.Stat(pathJoin(dirPath, entry))
|
||||
if err != nil {
|
||||
// If the file does not exist, skip the entry.
|
||||
if os.IsNotExist(err) {
|
||||
continue
|
||||
} else if isSysErrIO(err) {
|
||||
return nil, errFaultyDisk
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
@ -526,7 +536,7 @@ func listVols(dirPath string) ([]VolInfo, error) {
|
||||
// StatVol - get volume info.
|
||||
func (s *posix) StatVol(volume string) (volInfo VolInfo, err error) {
|
||||
defer func() {
|
||||
if err == syscall.EIO {
|
||||
if err == errFaultyDisk {
|
||||
atomic.AddInt32(&s.ioErrCount, 1)
|
||||
}
|
||||
}()
|
||||
@ -550,6 +560,8 @@ func (s *posix) StatVol(volume string) (volInfo VolInfo, err error) {
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return VolInfo{}, errVolumeNotFound
|
||||
} else if isSysErrIO(err) {
|
||||
return VolInfo{}, errFaultyDisk
|
||||
}
|
||||
return VolInfo{}, err
|
||||
}
|
||||
@ -565,7 +577,7 @@ func (s *posix) StatVol(volume string) (volInfo VolInfo, err error) {
|
||||
// DeleteVol - delete a volume.
|
||||
func (s *posix) DeleteVol(volume string) (err error) {
|
||||
defer func() {
|
||||
if err == syscall.EIO {
|
||||
if err == errFaultyDisk {
|
||||
atomic.AddInt32(&s.ioErrCount, 1)
|
||||
}
|
||||
}()
|
||||
@ -591,8 +603,9 @@ func (s *posix) DeleteVol(volume string) (err error) {
|
||||
return errVolumeNotEmpty
|
||||
} else if os.IsPermission(err) {
|
||||
return errDiskAccessDenied
|
||||
} else if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@ -602,7 +615,7 @@ func (s *posix) DeleteVol(volume string) (err error) {
|
||||
// If an entry is a directory it will be returned with a trailing "/".
|
||||
func (s *posix) ListDir(volume, dirPath string, count int) (entries []string, err error) {
|
||||
defer func() {
|
||||
if err == syscall.EIO {
|
||||
if err == errFaultyDisk {
|
||||
atomic.AddInt32(&s.ioErrCount, 1)
|
||||
}
|
||||
}()
|
||||
@ -625,6 +638,8 @@ func (s *posix) ListDir(volume, dirPath string, count int) (entries []string, er
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil, errVolumeNotFound
|
||||
} else if isSysErrIO(err) {
|
||||
return nil, errFaultyDisk
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
@ -644,7 +659,7 @@ func (s *posix) ListDir(volume, dirPath string, count int) (entries []string, er
|
||||
// not use this on large files as it would cause server to crash.
|
||||
func (s *posix) ReadAll(volume, path string) (buf []byte, err error) {
|
||||
defer func() {
|
||||
if err == syscall.EIO {
|
||||
if err == errFaultyDisk {
|
||||
atomic.AddInt32(&s.ioErrCount, 1)
|
||||
}
|
||||
}()
|
||||
@ -666,6 +681,8 @@ func (s *posix) ReadAll(volume, path string) (buf []byte, err error) {
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil, errVolumeNotFound
|
||||
} else if isSysErrIO(err) {
|
||||
return nil, errFaultyDisk
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
@ -694,6 +711,8 @@ func (s *posix) ReadAll(volume, path string) (buf []byte, err error) {
|
||||
}
|
||||
}
|
||||
return nil, pathErr
|
||||
} else if isSysErrIO(err) {
|
||||
return nil, errFaultyDisk
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
@ -715,7 +734,7 @@ func (s *posix) ReadAll(volume, path string) (buf []byte, err error) {
|
||||
// semantics are same as io.ReadFull.
|
||||
func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verifier *BitrotVerifier) (n int64, err error) {
|
||||
defer func() {
|
||||
if err == syscall.EIO {
|
||||
if err == errFaultyDisk {
|
||||
atomic.AddInt32(&s.ioErrCount, 1)
|
||||
}
|
||||
}()
|
||||
@ -737,6 +756,8 @@ func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verif
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return 0, errVolumeNotFound
|
||||
} else if isSysErrIO(err) {
|
||||
return 0, errFaultyDisk
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
@ -756,6 +777,8 @@ func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verif
|
||||
return 0, errFileAccessDenied
|
||||
} else if isSysErrNotDir(err) {
|
||||
return 0, errFileAccessDenied
|
||||
} else if isSysErrIO(err) {
|
||||
return 0, errFaultyDisk
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
@ -807,7 +830,7 @@ func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verif
|
||||
|
||||
func (s *posix) createFile(volume, path string) (f *os.File, err error) {
|
||||
defer func() {
|
||||
if err == syscall.EIO {
|
||||
if err == errFaultyDisk {
|
||||
atomic.AddInt32(&s.ioErrCount, 1)
|
||||
}
|
||||
}()
|
||||
@ -829,6 +852,8 @@ func (s *posix) createFile(volume, path string) (f *os.File, err error) {
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil, errVolumeNotFound
|
||||
} else if isSysErrIO(err) {
|
||||
return nil, errFaultyDisk
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
@ -859,6 +884,8 @@ func (s *posix) createFile(volume, path string) (f *os.File, err error) {
|
||||
return nil, errFileAccessDenied
|
||||
} else if os.IsPermission(err) {
|
||||
return nil, errFileAccessDenied
|
||||
} else if isSysErrIO(err) {
|
||||
return nil, errFaultyDisk
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
@ -875,7 +902,7 @@ func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) {
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err == syscall.EIO {
|
||||
if err == errFaultyDisk {
|
||||
atomic.AddInt32(&s.ioErrCount, 1)
|
||||
}
|
||||
}()
|
||||
@ -886,6 +913,9 @@ func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) {
|
||||
|
||||
// Validate if disk is indeed free.
|
||||
if err = checkDiskFree(s.diskPath, fileSize); err != nil {
|
||||
if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -907,7 +937,7 @@ func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) {
|
||||
case isSysErrNoSpace(e):
|
||||
err = errDiskFull
|
||||
case isSysErrIO(e):
|
||||
err = e
|
||||
err = errFaultyDisk
|
||||
default:
|
||||
// For errors: EBADF, EINTR, EINVAL, ENODEV, EPERM, ESPIPE and ETXTBSY
|
||||
// Appending was failed anyway, returns unexpected error
|
||||
@ -922,7 +952,7 @@ func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) {
|
||||
// path this call explicitly creates it.
|
||||
func (s *posix) AppendFile(volume, path string, buf []byte) (err error) {
|
||||
defer func() {
|
||||
if err == syscall.EIO {
|
||||
if err == errFaultyDisk {
|
||||
atomic.AddInt32(&s.ioErrCount, 1)
|
||||
}
|
||||
}()
|
||||
@ -944,7 +974,7 @@ func (s *posix) AppendFile(volume, path string, buf []byte) (err error) {
|
||||
// StatFile - get file info.
|
||||
func (s *posix) StatFile(volume, path string) (file FileInfo, err error) {
|
||||
defer func() {
|
||||
if err == syscall.EIO {
|
||||
if err == errFaultyDisk {
|
||||
atomic.AddInt32(&s.ioErrCount, 1)
|
||||
}
|
||||
}()
|
||||
@ -979,10 +1009,10 @@ func (s *posix) StatFile(volume, path string) (file FileInfo, err error) {
|
||||
// File is really not found.
|
||||
if os.IsNotExist(err) {
|
||||
return FileInfo{}, errFileNotFound
|
||||
}
|
||||
|
||||
// File path cannot be verified since one of the parents is a file.
|
||||
if isSysErrNotDir(err) {
|
||||
} else if isSysErrIO(err) {
|
||||
return FileInfo{}, errFaultyDisk
|
||||
} else if isSysErrNotDir(err) {
|
||||
// File path cannot be verified since one of the parents is a file.
|
||||
return FileInfo{}, errFileNotFound
|
||||
}
|
||||
|
||||
@ -1023,6 +1053,8 @@ func deleteFile(basePath, deletePath string) error {
|
||||
return errFileNotFound
|
||||
} else if os.IsPermission(err) {
|
||||
return errFileAccessDenied
|
||||
} else if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
}
|
||||
return err
|
||||
}
|
||||
@ -1041,7 +1073,7 @@ func deleteFile(basePath, deletePath string) error {
|
||||
// DeleteFile - delete a file at path.
|
||||
func (s *posix) DeleteFile(volume, path string) (err error) {
|
||||
defer func() {
|
||||
if err == syscall.EIO {
|
||||
if err == errFaultyDisk {
|
||||
atomic.AddInt32(&s.ioErrCount, 1)
|
||||
}
|
||||
}()
|
||||
@ -1063,6 +1095,8 @@ func (s *posix) DeleteFile(volume, path string) (err error) {
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return errVolumeNotFound
|
||||
} else if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
}
|
||||
return err
|
||||
}
|
||||
@ -1081,7 +1115,7 @@ func (s *posix) DeleteFile(volume, path string) (err error) {
|
||||
// RenameFile - rename source path to destination path atomically.
|
||||
func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) {
|
||||
defer func() {
|
||||
if err == syscall.EIO {
|
||||
if err == errFaultyDisk {
|
||||
atomic.AddInt32(&s.ioErrCount, 1)
|
||||
}
|
||||
}()
|
||||
@ -1107,6 +1141,8 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return errVolumeNotFound
|
||||
} else if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
}
|
||||
return err
|
||||
}
|
||||
@ -1114,6 +1150,8 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return errVolumeNotFound
|
||||
} else if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
}
|
||||
}
|
||||
|
||||
@ -1136,7 +1174,9 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e
|
||||
// we still need to allow overwriting an empty directory since it represents
|
||||
// an object empty directory.
|
||||
_, err = os.Stat(dstFilePath)
|
||||
|
||||
if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
}
|
||||
if err == nil && !isDirEmpty(dstFilePath) {
|
||||
return errFileAccessDenied
|
||||
}
|
||||
@ -1146,6 +1186,9 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e
|
||||
}
|
||||
|
||||
if err = renameAll(srcFilePath, dstFilePath); err != nil {
|
||||
if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -86,7 +86,7 @@ func filterMatchingPrefix(entries []string, prefixEntry string) []string {
|
||||
}
|
||||
|
||||
// "listDir" function of type listDirFunc returned by listDirFactory() - explained below.
|
||||
type listDirFunc func(bucket, prefixDir, prefixEntry string) (entries []string, delayIsLeaf bool, err error)
|
||||
type listDirFunc func(bucket, prefixDir, prefixEntry string) (entries []string, delayIsLeaf bool)
|
||||
|
||||
// A function isLeaf of type isLeafFunc is used to detect if an entry is a leaf entry. There are four scenarios
|
||||
// where isLeaf should behave differently:
|
||||
@ -141,16 +141,8 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
|
||||
markerBase = markerSplit[1]
|
||||
}
|
||||
}
|
||||
entries, delayIsLeaf, err := listDir(bucket, prefixDir, entryPrefixMatch)
|
||||
if err != nil {
|
||||
select {
|
||||
case <-endWalkCh:
|
||||
return errWalkAbort
|
||||
case resultCh <- treeWalkResult{err: err}:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
entries, delayIsLeaf := listDir(bucket, prefixDir, entryPrefixMatch)
|
||||
// When isleaf check is delayed, make sure that it is set correctly here.
|
||||
if delayIsLeaf && isLeaf == nil {
|
||||
return errInvalidArgument
|
||||
|
@ -196,7 +196,7 @@ func TestTreeWalk(t *testing.T) {
|
||||
return len(entries) == 0
|
||||
}
|
||||
|
||||
listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk)
|
||||
listDir := listDirFactory(context.Background(), isLeaf, disk)
|
||||
// Simple test for prefix based walk.
|
||||
testTreeWalkPrefix(t, listDir, isLeaf, isLeafDir)
|
||||
// Simple test when marker is set.
|
||||
@ -240,7 +240,7 @@ func TestTreeWalkTimeout(t *testing.T) {
|
||||
return len(entries) == 0
|
||||
}
|
||||
|
||||
listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk)
|
||||
listDir := listDirFactory(context.Background(), isLeaf, disk)
|
||||
|
||||
// TreeWalk pool with 2 seconds timeout for tree-walk go routines.
|
||||
pool := newTreeWalkPool(2 * time.Second)
|
||||
@ -315,7 +315,7 @@ func TestListDir(t *testing.T) {
|
||||
// create listDir function.
|
||||
listDir := listDirFactory(context.Background(), func(volume, prefix string) bool {
|
||||
return !hasSuffix(prefix, slashSeparator)
|
||||
}, xlTreeWalkIgnoredErrs, disk1, disk2)
|
||||
}, disk1, disk2)
|
||||
|
||||
// Create file1 in fsDir1 and file2 in fsDir2.
|
||||
disks := []StorageAPI{disk1, disk2}
|
||||
@ -327,10 +327,7 @@ func TestListDir(t *testing.T) {
|
||||
}
|
||||
|
||||
// Should list "file1" from fsDir1.
|
||||
entries, _, err := listDir(volume, "", "")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
entries, _ := listDir(volume, "", "")
|
||||
if len(entries) != 2 {
|
||||
t.Fatal("Expected the number of entries to be 2")
|
||||
}
|
||||
@ -348,10 +345,7 @@ func TestListDir(t *testing.T) {
|
||||
}
|
||||
|
||||
// Should list "file2" from fsDir2.
|
||||
entries, _, err = listDir(volume, "", "")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
entries, _ = listDir(volume, "", "")
|
||||
if len(entries) != 1 {
|
||||
t.Fatal("Expected the number of entries to be 1")
|
||||
}
|
||||
@ -362,13 +356,6 @@ func TestListDir(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
// None of the disks are available, should get
|
||||
// errDiskNotFound. Since errDiskNotFound is an ignored error,
|
||||
// we should get nil.
|
||||
_, _, err = listDir(volume, "", "")
|
||||
if err != nil {
|
||||
t.Errorf("expected nil error but found %v.", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRecursiveWalk - tests if treeWalk returns entries correctly with and
|
||||
@ -400,7 +387,7 @@ func TestRecursiveTreeWalk(t *testing.T) {
|
||||
}
|
||||
|
||||
// Create listDir function.
|
||||
listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk1)
|
||||
listDir := listDirFactory(context.Background(), isLeaf, disk1)
|
||||
|
||||
// Create the namespace.
|
||||
var files = []string{
|
||||
@ -515,7 +502,7 @@ func TestSortedness(t *testing.T) {
|
||||
}
|
||||
|
||||
// Create listDir function.
|
||||
listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk1)
|
||||
listDir := listDirFactory(context.Background(), isLeaf, disk1)
|
||||
|
||||
// Create the namespace.
|
||||
var files = []string{
|
||||
@ -598,7 +585,7 @@ func TestTreeWalkIsEnd(t *testing.T) {
|
||||
}
|
||||
|
||||
// Create listDir function.
|
||||
listDir := listDirFactory(context.Background(), isLeaf, xlTreeWalkIgnoredErrs, disk1)
|
||||
listDir := listDirFactory(context.Background(), isLeaf, disk1)
|
||||
|
||||
// Create the namespace.
|
||||
var files = []string{
|
||||
|
@ -445,6 +445,7 @@ func TestCeilFrac(t *testing.T) {
|
||||
// Test if isErrIgnored works correctly.
|
||||
func TestIsErrIgnored(t *testing.T) {
|
||||
var errIgnored = fmt.Errorf("ignored error")
|
||||
ignoredErrs := append(baseIgnoredErrs, errIgnored)
|
||||
var testCases = []struct {
|
||||
err error
|
||||
ignored bool
|
||||
@ -457,9 +458,13 @@ func TestIsErrIgnored(t *testing.T) {
|
||||
err: errIgnored,
|
||||
ignored: true,
|
||||
},
|
||||
{
|
||||
err: errFaultyDisk,
|
||||
ignored: true,
|
||||
},
|
||||
}
|
||||
for i, testCase := range testCases {
|
||||
if ok := IsErrIgnored(testCase.err, errIgnored); ok != testCase.ignored {
|
||||
if ok := IsErrIgnored(testCase.err, ignoredErrs...); ok != testCase.ignored {
|
||||
t.Errorf("Test: %d, Expected %t, got %t", i+1, testCase.ignored, ok)
|
||||
}
|
||||
}
|
||||
|
@ -622,8 +622,8 @@ func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, destBucke
|
||||
// Returns function "listDir" of the type listDirFunc.
|
||||
// isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry.
|
||||
// disks - used for doing disk.ListDir(). Sets passes set of disks.
|
||||
func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, isLeafDir isLeafDirFunc, treeWalkIgnoredErrs []error, sets ...[]StorageAPI) listDirFunc {
|
||||
listDirInternal := func(bucket, prefixDir, prefixEntry string, disks []StorageAPI) (mergedEntries []string, err error) {
|
||||
func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, isLeafDir isLeafDirFunc, sets ...[]StorageAPI) listDirFunc {
|
||||
listDirInternal := func(bucket, prefixDir, prefixEntry string, disks []StorageAPI) (mergedEntries []string) {
|
||||
for _, disk := range disks {
|
||||
if disk == nil {
|
||||
continue
|
||||
@ -631,15 +631,10 @@ func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, isLeafDir isLeaf
|
||||
|
||||
var entries []string
|
||||
var newEntries []string
|
||||
var err error
|
||||
entries, err = disk.ListDir(bucket, prefixDir, -1)
|
||||
if err != nil {
|
||||
// For any reason disk was deleted or goes offline, continue
|
||||
// and list from other disks if possible.
|
||||
if IsErrIgnored(err, treeWalkIgnoredErrs...) {
|
||||
continue
|
||||
}
|
||||
logger.LogIf(ctx, err)
|
||||
return nil, err
|
||||
continue
|
||||
}
|
||||
|
||||
// Find elements in entries which are not in mergedEntries
|
||||
@ -658,18 +653,13 @@ func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, isLeafDir isLeaf
|
||||
sort.Strings(mergedEntries)
|
||||
}
|
||||
}
|
||||
return mergedEntries, nil
|
||||
return mergedEntries
|
||||
}
|
||||
|
||||
// listDir - lists all the entries at a given prefix and given entry in the prefix.
|
||||
listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool, err error) {
|
||||
listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool) {
|
||||
for _, disks := range sets {
|
||||
var entries []string
|
||||
entries, err = listDirInternal(bucket, prefixDir, prefixEntry, disks)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
entries := listDirInternal(bucket, prefixDir, prefixEntry, disks)
|
||||
var newEntries []string
|
||||
// Find elements in entries which are not in mergedEntries
|
||||
for _, entry := range entries {
|
||||
@ -688,7 +678,7 @@ func listDirSetsFactory(ctx context.Context, isLeaf isLeafFunc, isLeafDir isLeaf
|
||||
}
|
||||
}
|
||||
mergedEntries, delayIsLeaf = filterListEntries(bucket, prefixDir, mergedEntries, prefixEntry, isLeaf)
|
||||
return mergedEntries, delayIsLeaf, nil
|
||||
return mergedEntries, delayIsLeaf
|
||||
}
|
||||
return listDir
|
||||
}
|
||||
@ -731,7 +721,7 @@ func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimi
|
||||
setDisks = append(setDisks, set.getLoadBalancedDisks())
|
||||
}
|
||||
|
||||
listDir := listDirSetsFactory(ctx, isLeaf, isLeafDir, xlTreeWalkIgnoredErrs, setDisks...)
|
||||
listDir := listDirSetsFactory(ctx, isLeaf, isLeafDir, setDisks...)
|
||||
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh)
|
||||
}
|
||||
|
||||
@ -1266,13 +1256,15 @@ func (s *xlSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) {
|
||||
// isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry.
|
||||
// disks - used for doing disk.ListDir(). Sets passes set of disks.
|
||||
func listDirSetsHealFactory(isLeaf isLeafFunc, sets ...[]StorageAPI) listDirFunc {
|
||||
listDirInternal := func(bucket, prefixDir, prefixEntry string, disks []StorageAPI) (mergedEntries []string, err error) {
|
||||
listDirInternal := func(bucket, prefixDir, prefixEntry string, disks []StorageAPI) (mergedEntries []string) {
|
||||
for _, disk := range disks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
var entries []string
|
||||
var newEntries []string
|
||||
var err error
|
||||
entries, err = disk.ListDir(bucket, prefixDir, -1)
|
||||
if err != nil {
|
||||
continue
|
||||
@ -1305,18 +1297,14 @@ func listDirSetsHealFactory(isLeaf isLeafFunc, sets ...[]StorageAPI) listDirFunc
|
||||
sort.Strings(mergedEntries)
|
||||
}
|
||||
}
|
||||
return mergedEntries, nil
|
||||
return mergedEntries
|
||||
|
||||
}
|
||||
|
||||
// listDir - lists all the entries at a given prefix and given entry in the prefix.
|
||||
listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool, err error) {
|
||||
listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool) {
|
||||
for _, disks := range sets {
|
||||
var entries []string
|
||||
entries, err = listDirInternal(bucket, prefixDir, prefixEntry, disks)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
entries := listDirInternal(bucket, prefixDir, prefixEntry, disks)
|
||||
|
||||
var newEntries []string
|
||||
// Find elements in entries which are not in mergedEntries
|
||||
@ -1335,7 +1323,7 @@ func listDirSetsHealFactory(isLeaf isLeafFunc, sets ...[]StorageAPI) listDirFunc
|
||||
sort.Strings(mergedEntries)
|
||||
}
|
||||
}
|
||||
return mergedEntries, false, nil
|
||||
return mergedEntries, false
|
||||
}
|
||||
return listDir
|
||||
}
|
||||
|
@ -51,6 +51,8 @@ func (xl xlObjects) parentDirIsObject(ctx context.Context, bucket, parent string
|
||||
return isParentDirObject(parent)
|
||||
}
|
||||
|
||||
var xlTreeWalkIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errVolumeNotFound, errFileNotFound)
|
||||
|
||||
// isObject - returns `true` if the prefix is an object i.e if
|
||||
// `xl.json` exists at the leaf, false otherwise.
|
||||
func (xl xlObjects) isObject(bucket, prefix string) (ok bool) {
|
||||
|
@ -24,23 +24,19 @@ import (
|
||||
// Returns function "listDir" of the type listDirFunc.
|
||||
// isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry.
|
||||
// disks - used for doing disk.ListDir()
|
||||
func listDirFactory(ctx context.Context, isLeaf isLeafFunc, treeWalkIgnoredErrs []error, disks ...StorageAPI) listDirFunc {
|
||||
func listDirFactory(ctx context.Context, isLeaf isLeafFunc, disks ...StorageAPI) listDirFunc {
|
||||
// Returns sorted merged entries from all the disks.
|
||||
listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool, err error) {
|
||||
listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool) {
|
||||
for _, disk := range disks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
var entries []string
|
||||
var newEntries []string
|
||||
var err error
|
||||
entries, err = disk.ListDir(bucket, prefixDir, -1)
|
||||
if err != nil {
|
||||
// For any reason disk was deleted or goes offline, continue
|
||||
// and list from other disks if possible.
|
||||
if IsErrIgnored(err, treeWalkIgnoredErrs...) {
|
||||
continue
|
||||
}
|
||||
return nil, false, err
|
||||
continue
|
||||
}
|
||||
|
||||
// Find elements in entries which are not in mergedEntries
|
||||
@ -60,7 +56,7 @@ func listDirFactory(ctx context.Context, isLeaf isLeafFunc, treeWalkIgnoredErrs
|
||||
}
|
||||
}
|
||||
mergedEntries, delayIsLeaf = filterListEntries(bucket, prefixDir, mergedEntries, prefixEntry, isLeaf)
|
||||
return mergedEntries, delayIsLeaf, nil
|
||||
return mergedEntries, delayIsLeaf
|
||||
}
|
||||
return listDir
|
||||
}
|
||||
@ -79,7 +75,7 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del
|
||||
endWalkCh = make(chan struct{})
|
||||
isLeaf := xl.isObject
|
||||
isLeafDir := xl.isObjectDir
|
||||
listDir := listDirFactory(ctx, isLeaf, xlTreeWalkIgnoredErrs, xl.getLoadBalancedDisks()...)
|
||||
listDir := listDirFactory(ctx, isLeaf, xl.getLoadBalancedDisks()...)
|
||||
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh)
|
||||
}
|
||||
|
||||
|
@ -49,9 +49,6 @@ type xlObjects struct {
|
||||
listPool *treeWalkPool
|
||||
}
|
||||
|
||||
// list of all errors that can be ignored in tree walk operation in XL
|
||||
var xlTreeWalkIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errVolumeNotFound, errFileNotFound)
|
||||
|
||||
// Shutdown function for object storage interface.
|
||||
func (xl xlObjects) Shutdown(ctx context.Context) error {
|
||||
// Add any object layer shutdown activities here.
|
||||
|
Loading…
x
Reference in New Issue
Block a user