From 6af761c86c05f9e0f9bf00cc26ff838292f99d58 Mon Sep 17 00:00:00 2001 From: Bala FA Date: Sat, 9 Apr 2016 00:16:03 +0530 Subject: [PATCH] enhance multipart functions to use fsDirent (#1304) * backend/fs: scanMulitpartDir returns directories only for recursive listing * backend/fs: enhance multipart functions to use fsDirent --- fs-dir-common.go | 55 +++++++------- fs-dir-nix.go | 75 ++++++++++++++++-- fs-dir-others.go | 57 ++++++++++++-- fs-multipart-dir.go | 181 ++++++++++++-------------------------------- fs-multipart.go | 84 ++++++++++++-------- 5 files changed, 249 insertions(+), 203 deletions(-) diff --git a/fs-dir-common.go b/fs-dir-common.go index eedaf6897..0716bf146 100644 --- a/fs-dir-common.go +++ b/fs-dir-common.go @@ -26,31 +26,34 @@ import ( // fsDirent carries directory entries. type fsDirent struct { - name string - modifiedTime time.Time // On Solaris and older unix distros this is empty. - size int64 // On Solaris and older unix distros this is empty. - isDir bool + name string + modTime time.Time // On Solaris and older unix distros this is empty. + size int64 // On Solaris and older unix distros this is empty. + mode os.FileMode } -// byDirentNames is a collection satisfying sort.Interface. -type byDirentNames []fsDirent - -func (d byDirentNames) Len() int { return len(d) } -func (d byDirentNames) Swap(i, j int) { d[i], d[j] = d[j], d[i] } -func (d byDirentNames) Less(i, j int) bool { - n1 := d[i].name - if d[i].isDir { - n1 = n1 + string(os.PathSeparator) - } - - n2 := d[j].name - if d[j].isDir { - n2 = n2 + string(os.PathSeparator) - } - - return n1 < n2 +// IsDir - returns true if fsDirent is a directory +func (ent fsDirent) IsDir() bool { + return ent.mode.IsDir() } +// IsSymlink - returns true if fsDirent is a symbolic link +func (ent fsDirent) IsSymlink() bool { + return ent.mode&os.ModeSymlink == os.ModeSymlink +} + +// IsRegular - returns true if fsDirent is a regular file +func (ent fsDirent) IsRegular() bool { + return ent.mode.IsRegular() +} + +// byDirentName is a collection satisfying sort.Interface. +type byDirentName []fsDirent + +func (d byDirentName) Len() int { return len(d) } +func (d byDirentName) Swap(i, j int) { d[i], d[j] = d[j], d[i] } +func (d byDirentName) Less(i, j int) bool { return d[i].name < d[j].name } + // Using sort.Search() internally to jump to the file entry containing the prefix. func searchDirents(dirents []fsDirent, x string) int { processFunc := func(i int) bool { @@ -85,7 +88,7 @@ func treeWalk(bucketDir, prefixDir, entryPrefixMatch, marker string, recursive b objectInfo := ObjectInfo{} // Convert to full object name. objectInfo.Name = filepath.Join(prefixDir, dirent.name) - if dirent.modifiedTime.IsZero() && dirent.size == 0 { + if dirent.modTime.IsZero() && dirent.size == 0 { // ModifiedTime and Size are zero, Stat() and figure out // the actual values that need to be set. fi, err := os.Stat(filepath.Join(bucketDir, prefixDir, dirent.name)) @@ -99,9 +102,9 @@ func treeWalk(bucketDir, prefixDir, entryPrefixMatch, marker string, recursive b } else { // If ModifiedTime or Size are set then use them // without attempting another Stat operation. - objectInfo.ModifiedTime = dirent.modifiedTime + objectInfo.ModifiedTime = dirent.modTime objectInfo.Size = dirent.size - objectInfo.IsDir = dirent.isDir + objectInfo.IsDir = dirent.IsDir() } if objectInfo.IsDir { // Add os.PathSeparator suffix again for directories as @@ -135,13 +138,13 @@ func treeWalk(bucketDir, prefixDir, entryPrefixMatch, marker string, recursive b dirents = dirents[searchDirents(dirents, markerDir):] *count += len(dirents) for i, dirent := range dirents { - if i == 0 && markerDir == dirent.name && !dirent.isDir { + if i == 0 && markerDir == dirent.name && !dirent.IsDir() { // If the first entry is not a directory // we need to skip this entry. *count-- continue } - if dirent.isDir && recursive { + if dirent.IsDir() && recursive { // If the entry is a directory, we will need recurse into it. markerArg := "" if dirent.name == markerDir { diff --git a/fs-dir-nix.go b/fs-dir-nix.go index 1a4890efd..132c8d681 100644 --- a/fs-dir-nix.go +++ b/fs-dir-nix.go @@ -20,6 +20,7 @@ package main import ( "os" + "path/filepath" "runtime" "sort" "strings" @@ -28,7 +29,10 @@ import ( ) const ( - // Large enough buffer size for ReadDirent() syscall + // readDirentBufSize for syscall.ReadDirent() to hold multiple + // directory entries in one buffer. golang source uses 4096 as + // buffer size whereas we want 25 times larger to save lots of + // entries to avoid multiple syscall.ReadDirent() call. readDirentBufSize = 4096 * 25 ) @@ -65,9 +69,30 @@ func parseDirents(buf []byte) []fsDirent { if name == "." || name == ".." { continue } + + var mode os.FileMode + switch dirent.Type { + case syscall.DT_BLK, syscall.DT_WHT: + mode = os.ModeDevice + case syscall.DT_CHR: + mode = os.ModeDevice | os.ModeCharDevice + case syscall.DT_DIR: + mode = os.ModeDir + case syscall.DT_FIFO: + mode = os.ModeNamedPipe + case syscall.DT_LNK: + mode = os.ModeSymlink + case syscall.DT_REG: + mode = 0 + case syscall.DT_SOCK: + mode = os.ModeSocket + case syscall.DT_UNKNOWN: + mode = 0xffffffff + } + dirents = append(dirents, fsDirent{ - name: name, - isDir: (dirent.Type == syscall.DT_DIR), + name: name, + mode: mode, }) } return dirents @@ -91,7 +116,7 @@ func readDirAll(readDirPath, entryPrefixMatch string) ([]fsDirent, error) { break } for _, dirent := range parseDirents(buf[:nbuf]) { - if dirent.isDir { + if dirent.IsDir() { dirent.name += string(os.PathSeparator) dirent.size = 0 } @@ -100,6 +125,46 @@ func readDirAll(readDirPath, entryPrefixMatch string) ([]fsDirent, error) { } } } - sort.Sort(byDirentNames(dirents)) + sort.Sort(byDirentName(dirents)) + return dirents, nil +} + +// scans the directory dirPath, calling filter() on each directory +// entry. Entries for which filter() returns true are stored, lexically +// sorted using sort.Sort(). If filter is NULL, all entries are selected. +// If namesOnly is true, dirPath is not appended into entry name. +func scandir(dirPath string, filter func(fsDirent) bool, namesOnly bool) ([]fsDirent, error) { + buf := make([]byte, readDirentBufSize) + d, err := os.Open(dirPath) + if err != nil { + return nil, err + } + defer d.Close() + + fd := int(d.Fd()) + dirents := []fsDirent{} + for { + nbuf, err := syscall.ReadDirent(fd, buf) + if err != nil { + return nil, err + } + if nbuf <= 0 { + break + } + for _, dirent := range parseDirents(buf[:nbuf]) { + if !namesOnly { + dirent.name = filepath.Join(dirPath, dirent.name) + } + if dirent.IsDir() { + dirent.name += string(os.PathSeparator) + } + if filter == nil || filter(dirent) { + dirents = append(dirents, dirent) + } + } + } + + sort.Sort(byDirentName(dirents)) + return dirents, nil } diff --git a/fs-dir-others.go b/fs-dir-others.go index 9e987a717..b8e331284 100644 --- a/fs-dir-others.go +++ b/fs-dir-others.go @@ -21,6 +21,7 @@ package main import ( "io" "os" + "path/filepath" "sort" "strings" ) @@ -43,12 +44,12 @@ func readDirAll(readDirPath, entryPrefixMatch string) ([]fsDirent, error) { } for _, fi := range fis { dirent := fsDirent{ - name: fi.Name(), - size: fi.Size(), - modifiedTime: fi.ModTime(), - isDir: fi.IsDir(), + name: fi.Name(), + modTime: fi.ModTime(), + size: fi.Size(), + mode: fi.Mode(), } - if dirent.isDir { + if dirent.IsDir() { dirent.name += string(os.PathSeparator) dirent.size = 0 } @@ -58,6 +59,50 @@ func readDirAll(readDirPath, entryPrefixMatch string) ([]fsDirent, error) { } } // Sort dirents. - sort.Sort(byDirentNames(dirents)) + sort.Sort(byDirentName(dirents)) + return dirents, nil +} + +// scans the directory dirPath, calling filter() on each directory +// entry. Entries for which filter() returns true are stored, lexically +// sorted using sort.Sort(). If filter is NULL, all entries are selected. +// If namesOnly is true, dirPath is not appended into entry name. +func scandir(dirPath string, filter func(fsDirent) bool, namesOnly bool) ([]fsDirent, error) { + d, err := os.Open(dirPath) + if err != nil { + return nil, err + } + defer d.Close() + + var dirents []fsDirent + for { + fis, err := d.Readdir(1000) + if err != nil { + if err == io.EOF { + break + } + return nil, err + } + for _, fi := range fis { + dirent := fsDirent{ + name: fi.Name(), + modTime: fi.ModTime(), + size: fi.Size(), + mode: fi.Mode(), + } + if !namesOnly { + dirent.name = filepath.Join(dirPath, dirent.name) + } + if dirent.IsDir() { + dirent.name += string(os.PathSeparator) + } + if filter == nil || filter(dirent) { + dirents = append(dirents, dirent) + } + } + } + + sort.Sort(byDirentName(dirents)) + return dirents, nil } diff --git a/fs-multipart-dir.go b/fs-multipart-dir.go index 36a953fe2..1266b2335 100644 --- a/fs-multipart-dir.go +++ b/fs-multipart-dir.go @@ -18,118 +18,12 @@ package main import ( "errors" - "io" "os" "path/filepath" - "sort" "strings" "time" ) -// DirEntry - directory entry -type DirEntry struct { - Name string - Size int64 - Mode os.FileMode - ModTime time.Time -} - -// IsDir - returns true if DirEntry is a directory -func (entry DirEntry) IsDir() bool { - return entry.Mode.IsDir() -} - -// IsSymlink - returns true if DirEntry is a symbolic link -func (entry DirEntry) IsSymlink() bool { - return entry.Mode&os.ModeSymlink == os.ModeSymlink -} - -// IsRegular - returns true if DirEntry is a regular file -func (entry DirEntry) IsRegular() bool { - return entry.Mode.IsRegular() -} - -// sort interface for DirEntry slice -type byEntryName []DirEntry - -func (f byEntryName) Len() int { return len(f) } -func (f byEntryName) Swap(i, j int) { f[i], f[j] = f[j], f[i] } -func (f byEntryName) Less(i, j int) bool { return f[i].Name < f[j].Name } - -func filteredReaddir(dirname string, filter func(DirEntry) bool, appendPath bool) ([]DirEntry, error) { - result := []DirEntry{} - - d, err := os.Open(dirname) - if err != nil { - return result, err - } - - defer d.Close() - - for { - fis, err := d.Readdir(1000) - if err != nil { - if err == io.EOF { - break - } - - return result, err - } - - for _, fi := range fis { - name := fi.Name() - if appendPath { - name = filepath.Join(dirname, name) - } - - if fi.IsDir() { - name += string(os.PathSeparator) - } - - entry := DirEntry{Name: name, Size: fi.Size(), Mode: fi.Mode(), ModTime: fi.ModTime()} - - if filter == nil || filter(entry) { - result = append(result, entry) - } - } - } - - sort.Sort(byEntryName(result)) - - return result, nil -} - -func filteredReaddirnames(dirname string, filter func(string) bool) ([]string, error) { - result := []string{} - d, err := os.Open(dirname) - if err != nil { - return result, err - } - - defer d.Close() - - for { - names, err := d.Readdirnames(1000) - if err != nil { - if err == io.EOF { - break - } - - return result, err - } - - for _, name := range names { - if filter == nil || filter(name) { - result = append(result, name) - } - } - } - - sort.Strings(result) - - return result, nil -} - func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, recursive bool) multipartObjectInfoChannel { objectInfoCh := make(chan multipartObjectInfo, listObjectsLimit) timeoutCh := make(chan struct{}, 1) @@ -218,39 +112,49 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, } for { - entries, err := filteredReaddir(scanDir, - func(entry DirEntry) bool { - if entry.IsDir() || (entry.IsRegular() && strings.HasSuffix(entry.Name, uploadIDSuffix)) { - return strings.HasPrefix(entry.Name, prefixPath) && entry.Name > markerPath + dirents, err := scandir(scanDir, + func(dirent fsDirent) bool { + if dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, uploadIDSuffix)) { + return strings.HasPrefix(dirent.name, prefixPath) && dirent.name > markerPath } return false }, - true) + false) if err != nil { send(multipartObjectInfo{Err: err}) return } - var entry DirEntry - for len(entries) > 0 { - entry, entries = entries[0], entries[1:] + var dirent fsDirent + for len(dirents) > 0 { + dirent, dirents = dirents[0], dirents[1:] - if entry.IsRegular() { + if dirent.IsRegular() { // Handle uploadid file - name := strings.Replace(filepath.Dir(entry.Name), bucketDir, "", 1) + name := strings.Replace(filepath.Dir(dirent.name), bucketDir, "", 1) if name == "" { // This should not happen ie uploadid file should not be in bucket directory send(multipartObjectInfo{Err: errors.New("corrupted meta data")}) return } - uploadID := strings.Split(filepath.Base(entry.Name), uploadIDSuffix)[0] + uploadID := strings.Split(filepath.Base(dirent.name), uploadIDSuffix)[0] + + // In some OS modTime is empty and use os.Stat() to fill missing values + if dirent.modTime.IsZero() { + if fi, e := os.Stat(dirent.name); e == nil { + dirent.modTime = fi.ModTime() + } else { + send(multipartObjectInfo{Err: e}) + return + } + } objInfo := multipartObjectInfo{ Name: name, UploadID: uploadID, - ModifiedTime: entry.ModTime, + ModifiedTime: dirent.modTime, } if !send(objInfo) { @@ -260,21 +164,21 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, continue } - subentries, err := filteredReaddir(entry.Name, - func(entry DirEntry) bool { - return entry.IsDir() || (entry.IsRegular() && strings.HasSuffix(entry.Name, uploadIDSuffix)) + subDirents, err := scandir(dirent.name, + func(dirent fsDirent) bool { + return dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, uploadIDSuffix)) }, - true) + false) if err != nil { send(multipartObjectInfo{Err: err}) return } subDirFound := false - uploadIDEntries := []DirEntry{} - // If subentries has a directory, then current entry needs to be sent - for _, subentry := range subentries { - if subentry.IsDir() { + uploadIDDirents := []fsDirent{} + // If subDirents has a directory, then current dirent needs to be sent + for _, subdirent := range subDirents { + if subdirent.IsDir() { subDirFound = true if recursive { @@ -282,15 +186,26 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, } } - if !recursive && subentry.IsRegular() { - uploadIDEntries = append(uploadIDEntries, subentry) + if !recursive && subdirent.IsRegular() { + uploadIDDirents = append(uploadIDDirents, subdirent) } } - if subDirFound || len(subentries) == 0 { + // send directory only for non-recursive listing + if !recursive && (subDirFound || len(subDirents) == 0) { + // In some OS modTime is empty and use os.Stat() to fill missing values + if dirent.modTime.IsZero() { + if fi, e := os.Stat(dirent.name); e == nil { + dirent.modTime = fi.ModTime() + } else { + send(multipartObjectInfo{Err: e}) + return + } + } + objInfo := multipartObjectInfo{ - Name: strings.Replace(entry.Name, bucketDir, "", 1), - ModifiedTime: entry.ModTime, + Name: strings.Replace(dirent.name, bucketDir, "", 1), + ModifiedTime: dirent.modTime, IsDir: true, } @@ -300,9 +215,9 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, } if recursive { - entries = append(subentries, entries...) + dirents = append(subDirents, dirents...) } else { - entries = append(uploadIDEntries, entries...) + dirents = append(uploadIDDirents, dirents...) } } diff --git a/fs-multipart.go b/fs-multipart.go index 02fbec44f..5fd1a343f 100644 --- a/fs-multipart.go +++ b/fs-multipart.go @@ -170,18 +170,18 @@ func (fs Filesystem) cleanupUploadID(bucket, object, uploadID string) error { metaObjectDir := filepath.Join(fs.path, configDir, bucket, object) uploadIDPrefix := uploadID + "." - names, e := filteredReaddirnames(metaObjectDir, - func(name string) bool { - return strings.HasPrefix(name, uploadIDPrefix) + dirents, e := scandir(metaObjectDir, + func(dirent fsDirent) bool { + return dirent.IsRegular() && strings.HasPrefix(dirent.name, uploadIDPrefix) }, - ) + true) if e != nil { return e } - for _, name := range names { - if e := os.Remove(filepath.Join(metaObjectDir, name)); e != nil { + for _, dirent := range dirents { + if e := os.Remove(filepath.Join(metaObjectDir, dirent.name)); e != nil { //return InternalError{Err: err} return e } @@ -515,9 +515,7 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa } recursive := true - skipDir := true if delimiter == "/" { - skipDir = false recursive = false } @@ -557,10 +555,7 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa continue } - if multipartObjInfo.IsDir && skipDir { - continue - } - + // Directories are listed only if recursive is false if multipartObjInfo.IsDir { result.CommonPrefixes = append(result.CommonPrefixes, multipartObjInfo.Name) } else { @@ -600,49 +595,72 @@ func (fs Filesystem) ListObjectParts(bucket, object, uploadID string, partNumber return ListPartsInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID}) } + // return empty ListPartsInfo + if maxParts == 0 { + return ListPartsInfo{}, nil + } + + if maxParts < 0 || maxParts > 1000 { + maxParts = 1000 + } + metaObjectDir := filepath.Join(fs.path, configDir, bucket, object) - entries, err := filteredReaddir(metaObjectDir, - func(entry DirEntry) bool { - if tokens := strings.Split(entry.Name, "."); len(tokens) == 3 { - if tokens[0] == uploadID { - if partNumber, err := strconv.Atoi(tokens[1]); err == nil { - if partNumber >= 1 && partNumber <= 10000 && partNumber > partNumberMarker { - return true - } - } + uploadIDPrefix := uploadID + "." + + dirents, e := scandir(metaObjectDir, + func(dirent fsDirent) bool { + // Part file is a regular file and has to be started with 'UPLOADID.' + if !(dirent.IsRegular() && strings.HasPrefix(dirent.name, uploadIDPrefix)) { + return false + } + + // Valid part file has to be 'UPLOADID.PARTNUMBER.MD5SUM' + tokens := strings.Split(dirent.name, ".") + if len(tokens) != 3 { + return false + } + + if partNumber, err := strconv.Atoi(tokens[1]); err == nil { + if partNumber >= 1 && partNumber <= 10000 && partNumber > partNumberMarker { + return true } } return false }, - false, - ) - - if err != nil { - return ListPartsInfo{}, probe.NewError(err) + true) + if e != nil { + return ListPartsInfo{}, probe.NewError(e) } isTruncated := false - if maxParts <= 0 || maxParts > 1000 { - maxParts = 1000 - } nextPartNumberMarker := 0 parts := []partInfo{} - for i := range entries { + for i := range dirents { if i == maxParts { isTruncated = true break } - tokens := strings.Split(entries[i].Name, ".") + // In some OS modTime is empty and use os.Stat() to fill missing values + if dirents[i].modTime.IsZero() { + if fi, e := os.Stat(filepath.Join(metaObjectDir, dirents[i].name)); e == nil { + dirents[i].modTime = fi.ModTime() + dirents[i].size = fi.Size() + } else { + return ListPartsInfo{}, probe.NewError(e) + } + } + + tokens := strings.Split(dirents[i].name, ".") partNumber, _ := strconv.Atoi(tokens[1]) md5sum := tokens[2] parts = append(parts, partInfo{ PartNumber: partNumber, - LastModified: entries[i].ModTime, + LastModified: dirents[i].modTime, ETag: md5sum, - Size: entries[i].Size, + Size: dirents[i].size, }) }