diff --git a/pkg/fs/dir.go b/pkg/fs/dir.go new file mode 100644 index 000000000..f082fda7b --- /dev/null +++ b/pkg/fs/dir.go @@ -0,0 +1,245 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fs + +import ( + "io" + "os" + "path/filepath" + "sort" + "strings" + "time" +) + +const ( + // listObjectsLimit - maximum list objects limit + listObjectsLimit = 1000 +) + +// IsDirEmpty - returns whether given directory is empty or not +func IsDirEmpty(dirname string) (status bool, err error) { + f, err := os.Open(dirname) + if err == nil { + defer f.Close() + if _, err = f.Readdirnames(1); err == io.EOF { + status = true + err = nil + } + } + + return +} + +// IsDirExist - returns whether given directory is exist or not +func IsDirExist(dirname string) (status bool, err error) { + fi, err := os.Lstat(dirname) + if err == nil { + status = fi.IsDir() + } + + return +} + +// byName implements sort.Interface for sorting os.FileInfo list +type byName []os.FileInfo + +func (f byName) Len() int { return len(f) } +func (f byName) Swap(i, j int) { f[i], f[j] = f[j], f[i] } +func (f byName) Less(i, j int) bool { + n1 := f[i].Name() + if f[i].IsDir() { + n1 = n1 + string(os.PathSeparator) + } + + n2 := f[j].Name() + if f[j].IsDir() { + n2 = n2 + string(os.PathSeparator) + } + + return n1 < n2 +} + +// ObjectInfo - object info +type ObjectInfo struct { + Name string + ModifiedTime time.Time + Checksum string + Size int64 + IsDir bool + Err error +} + +// readDir - read 'scanDir' directory. It returns list of ObjectInfo where +// each object name is appended with 'namePrefix' +func readDir(scanDir, namePrefix string) (objInfos []ObjectInfo) { + f, err := os.Open(scanDir) + if err != nil { + objInfos = append(objInfos, ObjectInfo{Err: err}) + return + } + + fis, err := f.Readdir(-1) + if err != nil { + f.Close() + objInfos = append(objInfos, ObjectInfo{Err: err}) + return + } + + f.Close() + sort.Sort(byName(fis)) + + // make []ObjectInfo from []FileInfo + for _, fi := range fis { + name := fi.Name() + if namePrefix != "" { + name = namePrefix + "/" + name + } + + if fi.IsDir() { + name += "/" + } + + objInfos = append(objInfos, ObjectInfo{ + Name: name, + ModifiedTime: fi.ModTime(), + Checksum: "", + Size: fi.Size(), + IsDir: fi.IsDir(), + }) + } + + return +} + +// ObjectInfoChannel - object info channel +type ObjectInfoChannel struct { + ch <-chan ObjectInfo + objInfo *ObjectInfo + closed bool + timeoutCh <-chan struct{} + timedOut bool +} + +func (oic *ObjectInfoChannel) Read() (ObjectInfo, bool) { + if oic.closed { + return ObjectInfo{}, false + } + + if oic.objInfo == nil { + // first read + if oi, ok := <-oic.ch; ok { + oic.objInfo = &oi + } else { + oic.closed = true + return ObjectInfo{}, false + } + } + + retObjInfo := *oic.objInfo + status := true + oic.objInfo = nil + + // read once more to know whether it was last read + if oi, ok := <-oic.ch; ok { + oic.objInfo = &oi + } else { + oic.closed = true + } + + return retObjInfo, status +} + +// IsClosed - return whether channel is closed or not +func (oic ObjectInfoChannel) IsClosed() bool { + if oic.objInfo != nil { + return false + } + + return oic.closed +} + +// IsTimedOut - return whether channel is closed due to timeout +func (oic ObjectInfoChannel) IsTimedOut() bool { + if oic.timedOut { + return true + } + + select { + case _, ok := <-oic.timeoutCh: + if ok { + oic.timedOut = true + return true + } + return false + default: + return false + } +} + +// treeWalk - walk into 'scanDir' recursively when 'recursive' is true. +// It uses 'bucketDir' to get name prefix for object name. +func treeWalk(scanDir, bucketDir string, recursive bool) ObjectInfoChannel { + objectInfoCh := make(chan ObjectInfo, listObjectsLimit) + timeoutCh := make(chan struct{}, 1) + + // goroutine - retrieves directory entries, makes ObjectInfo and sends into the channel + go func() { + defer close(objectInfoCh) + defer close(timeoutCh) + + // send function - returns true if ObjectInfo is sent + // within (time.Second * 15) else false on time-out + send := func(oi ObjectInfo) bool { + timer := time.After(time.Second * 15) + select { + case objectInfoCh <- oi: + return true + case <-timer: + timeoutCh <- struct{}{} + return false + } + } + + namePrefix := strings.Replace(filepath.ToSlash(scanDir), filepath.ToSlash(bucketDir), "", 1) + if strings.HasPrefix(namePrefix, "/") { + /* remove beginning "/" */ + namePrefix = namePrefix[1:] + } + + for objInfos := readDir(scanDir, namePrefix); len(objInfos) > 0; { + var objInfo ObjectInfo + objInfo, objInfos = objInfos[0], objInfos[1:] + if !send(objInfo) { + return + } + + if objInfo.IsDir && recursive { + scanDir := filepath.Join(bucketDir, filepath.FromSlash(objInfo.Name)) + + namePrefix = strings.Replace(filepath.ToSlash(scanDir), filepath.ToSlash(bucketDir), "", 1) + if strings.HasPrefix(namePrefix, "/") { + /* remove beginning "/" */ + namePrefix = namePrefix[1:] + } + + objInfos = append(readDir(scanDir, namePrefix), objInfos...) + } + } + }() + + return ObjectInfoChannel{ch: objectInfoCh, timeoutCh: timeoutCh} +} diff --git a/pkg/fs/fs-bucket-listobjects.go b/pkg/fs/fs-bucket-listobjects.go index 96ad95179..04e1bdbc7 100644 --- a/pkg/fs/fs-bucket-listobjects.go +++ b/pkg/fs/fs-bucket-listobjects.go @@ -17,313 +17,118 @@ package fs import ( - "errors" - "hash/fnv" + "fmt" "net/url" "os" "path/filepath" "strings" - "time" - "github.com/minio/minio/pkg/ioutils" "github.com/minio/minio/pkg/probe" ) -// listObjectsParams - list objects input parameters. -type listObjectsParams struct { - // Bucket name to list the objects for. - Bucket string - // list all objects with this parameter as common prefix. - Prefix string - // list all objects starting with object after marker in - // lexicographical order. - Marker string - // list all objects until the first occurrence of the delimtier - // after the prefix. - Delimiter string - // maximum number of objects returned per listObjects() - // operation. - MaxKeys int -} - -// listServiceReq -type listServiceReq struct { - reqParams listObjectsParams - respCh chan ListObjectsResult -} - -type listWorkerReq struct { - respCh chan ListObjectsResult -} - -// listObjects - list objects lists objects up to maxKeys for a given prefix. -func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKeys int) (chan<- listWorkerReq, *probe.Error) { - quitWalker := make(chan bool) - reqCh := make(chan listWorkerReq) - walkerCh := make(chan ObjectMetadata, 2000) - go func() { - defer close(walkerCh) - var walkPath string - bucketPath := filepath.Join(fs.path, bucket) - // Bucket path prefix should always end with a separator. - bucketPathPrefix := bucketPath + string(os.PathSeparator) - prefixPath := bucketPathPrefix + prefix - st, e := os.Stat(prefixPath) - if e != nil { - if os.IsNotExist(e) { - walkPath = bucketPath - } - } else { - if st.IsDir() && !strings.HasSuffix(prefix, delimiter) { - walkPath = bucketPath - } else { - walkPath = prefixPath - } - } - ioutils.FTW(walkPath, func(path string, info os.FileInfo, e error) error { - // For any error return right here. - if e != nil { - return e - } - // Skip special temporary files, kept for multipart transaction. - if strings.Contains(path, "$multiparts") || strings.Contains(path, "$tmpobject") { - return nil - } - // We don't need to list the walk path if its a directory. - if path == walkPath && info.IsDir() { - return nil - } - // Skip all directories if there is no delimiter. - if info.IsDir() && delimiter == "" { - return nil - } - // For all incoming directories add a ending separator. - if info.IsDir() { - path = path + string(os.PathSeparator) - } - // Extract object name. - objectName := strings.TrimPrefix(path, bucketPathPrefix) - if strings.HasPrefix(objectName, prefix) { - object := ObjectMetadata{ - Object: objectName, - LastModified: info.ModTime(), - Mode: info.Mode(), - Size: info.Size(), - } - select { - // Send object on walker channel. - case walkerCh <- object: - case <-quitWalker: - // Returning error ends the file tree Walk(). - return errors.New("Quit list worker.") - } - // If delimiter is set, we stop if current path is a directory. - if delimiter != "" && info.IsDir() { - return ioutils.ErrSkipDir - } - } - return nil - }) - }() - - go func() { - for { - select { - // Timeout after 30 seconds if request did not arrive for - // the given list parameters. - case <-time.After(30 * time.Second): - quitWalker <- true // Quit file path walk if running. - // Send back the hash for this request. - fs.timeoutReqCh <- fnvSum(bucket, prefix, marker, delimiter) - return - case req, ok := <-reqCh: - if !ok { - // If the request channel is closed, no more - // requests return here. - return - } - resp := ListObjectsResult{} - var count int - for { - // We have read all the keys necessary by now. We - // cleanly break out. - if count == maxKeys { - if delimiter != "" { - // Set the next marker for the next request. - // This element is set only if you have delimiter set. - // If response does not include the NextMaker and it is - // truncated, you can use the value of the last Key in the - // response as the marker in the subsequent request to get the - // next set of object keys. - if len(resp.Objects) > 0 { - // NextMarker is only set when there - // are more than maxKeys worth of - // objects for a given prefix path. - resp.NextMarker = resp.Objects[len(resp.Objects)-1:][0].Object - } - } - resp.IsTruncated = len(walkerCh) > 0 - break - } - object, walkerOK := <-walkerCh - // If the channel is closed return right here. - if !walkerOK { - break - } - // Verify if the object is lexically smaller than - // the marker, we will skip those objects. - if marker != "" { - if marker >= object.Object { - continue - } else { - // Reset marker so that we avoid comparing - // again and again in a loop unecessarily. - marker = "" - } - } - if delimiter != "" { - // Prefixes are only valid wth delimiters, and - // for filesystem backend they are only valid - // if they are directories. - if object.Mode.IsDir() { - resp.Prefixes = append(resp.Prefixes, object.Object) - } else { - // Rest of them are treated as objects. - resp.Objects = append(resp.Objects, object) - } - } else { - // In-case of no delimiters, there are no - // prefixes - all are considered to be objects. - resp.Objects = append(resp.Objects, object) - } - count++ // Bump the number. - } - // Set the marker right here for the new set of the - // values coming in the from the client. - marker = resp.NextMarker - req.respCh <- resp - } - } - }() - return reqCh, nil -} - -// fnvSum calculates a hash for concatenation of all input strings. -func fnvSum(elements ...string) uint32 { - fnvHash := fnv.New32a() - for _, element := range elements { - fnvHash.Write([]byte(element)) - } - return fnvHash.Sum32() -} - -// listObjectsService - list objects service manages various incoming -// list object requests by delegating them to an existing listObjects -// routine or initializes a new listObjects routine. -func (fs *Filesystem) listObjectsService() *probe.Error { - // Initialize list service request channel. - listServiceReqCh := make(chan listServiceReq) - fs.listServiceReqCh = listServiceReqCh - - // Initialize timeout request channel to receive request hashes of - // timed-out requests. - timeoutReqCh := make(chan uint32) - fs.timeoutReqCh = timeoutReqCh - - // Initialize request hash to list worker map. - reqToListWorkerReqCh := make(map[uint32]chan<- listWorkerReq) - - // Start service in a go routine. - go func() { - for { - select { - case reqHash := <-timeoutReqCh: - // For requests which have timed-out, close the worker - // channels proactively, this may happen for idle - // workers once in 10seconds. - listWorkerReqCh, ok := reqToListWorkerReqCh[reqHash] - if ok { - close(listWorkerReqCh) - } - delete(reqToListWorkerReqCh, reqHash) - case srvReq := <-listServiceReqCh: - // Save the params for readability. - bucket := srvReq.reqParams.Bucket - prefix := srvReq.reqParams.Prefix - marker := srvReq.reqParams.Marker - delimiter := srvReq.reqParams.Delimiter - maxKeys := srvReq.reqParams.MaxKeys - - // Generate hash. - reqHash := fnvSum(bucket, prefix, marker, delimiter) - listWorkerReqCh, ok := reqToListWorkerReqCh[reqHash] - if !ok { - var err *probe.Error - listWorkerReqCh, err = fs.listObjects(bucket, prefix, marker, delimiter, maxKeys) - if err != nil { - srvReq.respCh <- ListObjectsResult{} - return - } - reqToListWorkerReqCh[reqHash] = listWorkerReqCh - } - respCh := make(chan ListObjectsResult) - listWorkerReqCh <- listWorkerReq{respCh} - resp, ok := <-respCh - if !ok { - srvReq.respCh <- ListObjectsResult{} - return - } - delete(reqToListWorkerReqCh, reqHash) - if !resp.IsTruncated { - close(listWorkerReqCh) - } - srvReq.respCh <- resp - } - } - }() - return nil -} - // ListObjects - lists all objects for a given prefix, returns up to // maxKeys number of objects per call. func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsResult, *probe.Error) { + result := ListObjectsResult{} + // Input validation. if !IsValidBucketName(bucket) { - return ListObjectsResult{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) + return result, probe.NewError(BucketNameInvalid{Bucket: bucket}) } bucket = fs.denormalizeBucket(bucket) - rootPrefix := filepath.Join(fs.path, bucket) - // Check bucket exists. - if _, e := os.Stat(rootPrefix); e != nil { - if os.IsNotExist(e) { - return ListObjectsResult{}, probe.NewError(BucketNotFound{Bucket: bucket}) + + if status, err := IsDirExist(filepath.Join(fs.path, bucket)); !status { + if err == nil { + return result, probe.NewError(BucketNotFound{Bucket: bucket}) + } else if os.IsNotExist(err) { + return result, probe.NewError(BucketNotFound{Bucket: bucket}) + } else { + return result, probe.NewError(err) } - return ListObjectsResult{}, probe.NewError(e) } - // Unescape the marker values. - markerUnescaped, e := url.QueryUnescape(marker) - if e != nil { - return ListObjectsResult{}, probe.NewError(e) + if delimiter != "" && delimiter != "/" { + return result, probe.NewError(fmt.Errorf("delimiter '%s' is not supported", delimiter)) } - reqParams := listObjectsParams{} - reqParams.Bucket = bucket - reqParams.Prefix = filepath.FromSlash(prefix) - reqParams.Marker = filepath.FromSlash(markerUnescaped) - reqParams.Delimiter = filepath.FromSlash(delimiter) - reqParams.MaxKeys = maxKeys + if marker != "" { + if markerUnescaped, err := url.QueryUnescape(marker); err == nil { + marker = markerUnescaped + } else { + return result, probe.NewError(err) + } - respCh := make(chan ListObjectsResult) - fs.listServiceReqCh <- listServiceReq{reqParams, respCh} - resp := <-respCh + if !strings.HasPrefix(marker, prefix) { + return result, probe.NewError(fmt.Errorf("marker '%s' and prefix '%s' do not match", marker, prefix)) + } + } - for i := range resp.Prefixes { - resp.Prefixes[i] = filepath.ToSlash(resp.Prefixes[i]) + if maxKeys <= 0 || maxKeys > listObjectsLimit { + maxKeys = listObjectsLimit } - for i := range resp.Objects { - resp.Objects[i].Object = filepath.ToSlash(resp.Objects[i].Object) + + bucketDir := filepath.Join(fs.path, bucket) + + recursive := true + skipDir := true + if delimiter == "/" { + skipDir = false + recursive = false } - return resp, nil + + prefixDir := filepath.Dir(filepath.FromSlash(prefix)) + rootDir := filepath.Join(bucketDir, prefixDir) + + objectInfoCh := fs.popListObjectCh(ListObjectParams{bucket, delimiter, marker, prefix}) + if objectInfoCh == nil { + ch := treeWalk(rootDir, bucketDir, recursive) + objectInfoCh = &ch + } + + nextMarker := "" + for i := 0; i < maxKeys; { + objInfo, ok := objectInfoCh.Read() + if !ok { + // closed channel + return result, nil + } + + if objInfo.Err != nil { + return ListObjectsResult{}, probe.NewError(objInfo.Err) + } + + if strings.Contains(objInfo.Name, "$multiparts") || strings.Contains(objInfo.Name, "$tmpobject") { + continue + } + + if objInfo.IsDir && skipDir { + continue + } + + if strings.HasPrefix(objInfo.Name, prefix) { + if objInfo.Name > marker { + if objInfo.IsDir { + result.Prefixes = append(result.Prefixes, objInfo.Name) + } else { + result.Objects = append(result.Objects, ObjectMetadata{ + Bucket: bucket, + Object: objInfo.Name, + LastModified: objInfo.ModifiedTime, + Size: objInfo.Size, + }) + } + nextMarker = objInfo.Name + i++ + } + } + } + + if !objectInfoCh.IsClosed() { + result.IsTruncated = true + result.NextMarker = nextMarker + fs.pushListObjectCh(ListObjectParams{bucket, delimiter, nextMarker, prefix}, *objectInfoCh) + } + + return result, nil } diff --git a/pkg/fs/fs.go b/pkg/fs/fs.go index e8a781e2e..bb24c4b7b 100644 --- a/pkg/fs/fs.go +++ b/pkg/fs/fs.go @@ -25,14 +25,59 @@ import ( "github.com/minio/minio/pkg/probe" ) +// ListObjectParams - list object params used for list object map +type ListObjectParams struct { + bucket string + delimiter string + marker string + prefix string +} + // Filesystem - local variables type Filesystem struct { - path string - minFreeDisk int64 - rwLock *sync.RWMutex - multiparts *Multiparts - listServiceReqCh chan<- listServiceReq - timeoutReqCh chan<- uint32 + path string + minFreeDisk int64 + rwLock *sync.RWMutex + multiparts *Multiparts + listObjectMap map[ListObjectParams][]ObjectInfoChannel + listObjectMapMutex *sync.Mutex +} + +func (fs *Filesystem) pushListObjectCh(params ListObjectParams, ch ObjectInfoChannel) { + fs.listObjectMapMutex.Lock() + defer fs.listObjectMapMutex.Unlock() + + channels := []ObjectInfoChannel{ch} + if _, ok := fs.listObjectMap[params]; ok { + channels = append(fs.listObjectMap[params], ch) + } + + fs.listObjectMap[params] = channels +} + +func (fs *Filesystem) popListObjectCh(params ListObjectParams) *ObjectInfoChannel { + fs.listObjectMapMutex.Lock() + defer fs.listObjectMapMutex.Unlock() + + if channels, ok := fs.listObjectMap[params]; ok { + for i, channel := range channels { + if !channel.IsTimedOut() { + chs := channels[i+1:] + if len(chs) > 0 { + fs.listObjectMap[params] = chs + } else { + delete(fs.listObjectMap, params) + } + + return &channel + } + } + + // As all channels are timed out, delete the map entry + delete(fs.listObjectMap, params) + } + + return nil } // MultipartSession holds active session information @@ -83,10 +128,9 @@ func New(rootPath string, minFreeDisk int64) (Filesystem, *probe.Error) { // minium free disk required for i/o operations to succeed. fs.minFreeDisk = minFreeDisk - // Start list goroutine. - if err = fs.listObjectsService(); err != nil { - return Filesystem{}, err.Trace(rootPath) - } + fs.listObjectMap = make(map[ListObjectParams][]ObjectInfoChannel) + fs.listObjectMapMutex = &sync.Mutex{} + // Return here. return fs, nil }