/* * Minio Cloud Storage, (C) 2016 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package fs import ( "io" "os" "path/filepath" "sort" "strings" "time" ) const ( // ListObjectsLimit - maximum list objects limit. listObjectsLimit = 1000 ) // IsDirEmpty - returns whether given directory is empty or not. func isDirEmpty(dirname string) (status bool, err error) { f, err := os.Open(dirname) if err == nil { defer f.Close() if _, err = f.Readdirnames(1); err == io.EOF { status = true err = nil } } return } // IsDirExist - returns whether given directory is exist or not. func isDirExist(dirname string) (status bool, err error) { fi, err := os.Lstat(dirname) if err == nil { status = fi.IsDir() } return } // ByName implements sort.Interface for sorting os.FileInfo list. type byName []os.FileInfo func (f byName) Len() int { return len(f) } func (f byName) Swap(i, j int) { f[i], f[j] = f[j], f[i] } func (f byName) Less(i, j int) bool { n1 := f[i].Name() if f[i].IsDir() { n1 = n1 + string(os.PathSeparator) } n2 := f[j].Name() if f[j].IsDir() { n2 = n2 + string(os.PathSeparator) } return n1 < n2 } // ObjectInfo - object info. type ObjectInfo struct { Bucket string Name string ModifiedTime time.Time ContentType string MD5Sum string Size int64 IsDir bool Err error } // Using sort.Search() internally to jump to the file entry containing the prefix. func searchFileInfos(fileInfos []os.FileInfo, x string) int { processFunc := func(i int) bool { return fileInfos[i].Name() >= x } return sort.Search(len(fileInfos), processFunc) } // ReadDir - read 'scanDir' directory. It returns list of ObjectInfo. // Each object name is appended with 'namePrefix'. func readDir(scanDir, namePrefix, queryPrefix string, isFirst bool) (objInfos []ObjectInfo) { f, err := os.Open(scanDir) if err != nil { objInfos = append(objInfos, ObjectInfo{Err: err}) return } fis, err := f.Readdir(-1) if err != nil { f.Close() objInfos = append(objInfos, ObjectInfo{Err: err}) return } // Close the directory. f.Close() // Sort files by Name. sort.Sort(byName(fis)) var prefixIndex int // Searching for entries with objectName containing prefix. // Binary search is used for efficient search. if queryPrefix != "" && isFirst { prefixIndex = searchFileInfos(fis, queryPrefix) if prefixIndex == len(fis) { return } if !strings.HasPrefix(fis[prefixIndex].Name(), queryPrefix) { return } fis = fis[prefixIndex:] } // Populate []ObjectInfo from []FileInfo. for _, fi := range fis { name := fi.Name() if queryPrefix != "" && isFirst { // If control is here then there is a queryPrefix, and there are objects which satisfies the prefix. // Since the result is sorted, the object names which satisfies query prefix would be stored one after the other. // Push the objectInfo only if its contains the prefix. // This ensures that the channel containing object Info would only has objects with the given queryPrefix. if !strings.HasPrefix(name, queryPrefix) { return } } size := fi.Size() modTime := fi.ModTime() isDir := fi.Mode().IsDir() // Add prefix if name prefix exists. if namePrefix != "" { name = namePrefix + "/" + name } // For directories explicitly end with '/'. if isDir { name += "/" // Size is set to '0' for directories explicitly. size = 0 } if fi.Mode()&os.ModeSymlink == os.ModeSymlink { // Handle symlink by doing an additional stat and follow the link. st, e := os.Stat(filepath.Join(scanDir, name)) if e != nil { objInfos = append(objInfos, ObjectInfo{Err: err}) return } size = st.Size() modTime = st.ModTime() isDir = st.Mode().IsDir() // For directories explicitly end with '/'. if isDir { name += "/" size = 0 // Size is set to '0' for directories explicitly. } } // Populate []ObjectInfo. objInfos = append(objInfos, ObjectInfo{ Name: name, ModifiedTime: modTime, MD5Sum: "", // TODO Size: size, IsDir: isDir, }) } return } // ObjectInfoChannel - object info channel. type ObjectInfoChannel struct { ch <-chan ObjectInfo objInfo *ObjectInfo closed bool timeoutCh <-chan struct{} timedOut bool } func (oic *ObjectInfoChannel) Read() (ObjectInfo, bool) { if oic.closed { return ObjectInfo{}, false } if oic.objInfo == nil { // First read. if oi, ok := <-oic.ch; ok { oic.objInfo = &oi } else { oic.closed = true return ObjectInfo{}, false } } retObjInfo := *oic.objInfo status := true oic.objInfo = nil // Read once more to know whether it was last read. if oi, ok := <-oic.ch; ok { oic.objInfo = &oi } else { oic.closed = true } return retObjInfo, status } // IsClosed - return whether channel is closed or not. func (oic ObjectInfoChannel) IsClosed() bool { if oic.objInfo != nil { return false } return oic.closed } // IsTimedOut - return whether channel is closed due to timeout. func (oic ObjectInfoChannel) IsTimedOut() bool { if oic.timedOut { return true } select { case _, ok := <-oic.timeoutCh: if ok { oic.timedOut = true return true } return false default: return false } } // TreeWalk - walk into 'scanDir' recursively when 'recursive' is true. // It uses 'bucketDir' to get name prefix for object name. func treeWalk(scanDir, bucketDir string, recursive bool, queryPrefix string) ObjectInfoChannel { objectInfoCh := make(chan ObjectInfo, listObjectsLimit) timeoutCh := make(chan struct{}, 1) // goroutine - retrieves directory entries, makes ObjectInfo and sends into the channel. go func() { defer close(objectInfoCh) defer close(timeoutCh) // Send function - returns true if ObjectInfo is sent. // Within (time.Second * 15) else false on time-out. send := func(oi ObjectInfo) bool { timer := time.After(time.Second * 15) select { case objectInfoCh <- oi: return true case <-timer: timeoutCh <- struct{}{} return false } } namePrefix := strings.Replace(filepath.ToSlash(scanDir), filepath.ToSlash(bucketDir), "", 1) if strings.HasPrefix(namePrefix, "/") { // Remove forward slash ("/") from beginning. namePrefix = namePrefix[1:] } // The last argument (isFisrt), is set to `true` only during the first run of the function. // This makes sure that the sub-directories inside the prefixDir are recursed // without being asserted for prefix in the object name. isFirst := true for objInfos := readDir(scanDir, namePrefix, queryPrefix, isFirst); len(objInfos) > 0; { var objInfo ObjectInfo objInfo, objInfos = objInfos[0], objInfos[1:] if !send(objInfo) { return } if objInfo.IsDir && recursive { scanDir := filepath.Join(bucketDir, filepath.FromSlash(objInfo.Name)) namePrefix = strings.Replace(filepath.ToSlash(scanDir), filepath.ToSlash(bucketDir), "", 1) if strings.HasPrefix(namePrefix, "/") { /* remove beginning "/" */ namePrefix = namePrefix[1:] } // The last argument is set to false in the further calls to readdir. isFirst = false objInfos = append(readDir(scanDir, namePrefix, queryPrefix, isFirst), objInfos...) } } }() return ObjectInfoChannel{ch: objectInfoCh, timeoutCh: timeoutCh} }