1
0
mirror of https://github.com/minio/minio.git synced 2025-04-04 03:40:30 -04:00

api: refactor list object handling in fs backend

When list object is invoked, it creates a goroutine if not available
for given parameters else uses existing goroutine.  These goroutines
are alive for 15 seconds for further continuation list object request
else they exit.

Fixes 
This commit is contained in:
Bala.FA 2016-02-18 14:08:58 +05:30
parent 5cb546d288
commit c70bc2209e
3 changed files with 388 additions and 294 deletions

245
pkg/fs/dir.go Normal file

@ -0,0 +1,245 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fs
import (
"io"
"os"
"path/filepath"
"sort"
"strings"
"time"
)
const (
// listObjectsLimit - maximum list objects limit
listObjectsLimit = 1000
)
// IsDirEmpty - returns whether given directory is empty or not
func IsDirEmpty(dirname string) (status bool, err error) {
f, err := os.Open(dirname)
if err == nil {
defer f.Close()
if _, err = f.Readdirnames(1); err == io.EOF {
status = true
err = nil
}
}
return
}
// IsDirExist - returns whether given directory is exist or not
func IsDirExist(dirname string) (status bool, err error) {
fi, err := os.Lstat(dirname)
if err == nil {
status = fi.IsDir()
}
return
}
// byName implements sort.Interface for sorting os.FileInfo list
type byName []os.FileInfo
func (f byName) Len() int { return len(f) }
func (f byName) Swap(i, j int) { f[i], f[j] = f[j], f[i] }
func (f byName) Less(i, j int) bool {
n1 := f[i].Name()
if f[i].IsDir() {
n1 = n1 + string(os.PathSeparator)
}
n2 := f[j].Name()
if f[j].IsDir() {
n2 = n2 + string(os.PathSeparator)
}
return n1 < n2
}
// ObjectInfo - object info
type ObjectInfo struct {
Name string
ModifiedTime time.Time
Checksum string
Size int64
IsDir bool
Err error
}
// readDir - read 'scanDir' directory. It returns list of ObjectInfo where
// each object name is appended with 'namePrefix'
func readDir(scanDir, namePrefix string) (objInfos []ObjectInfo) {
f, err := os.Open(scanDir)
if err != nil {
objInfos = append(objInfos, ObjectInfo{Err: err})
return
}
fis, err := f.Readdir(-1)
if err != nil {
f.Close()
objInfos = append(objInfos, ObjectInfo{Err: err})
return
}
f.Close()
sort.Sort(byName(fis))
// make []ObjectInfo from []FileInfo
for _, fi := range fis {
name := fi.Name()
if namePrefix != "" {
name = namePrefix + "/" + name
}
if fi.IsDir() {
name += "/"
}
objInfos = append(objInfos, ObjectInfo{
Name: name,
ModifiedTime: fi.ModTime(),
Checksum: "",
Size: fi.Size(),
IsDir: fi.IsDir(),
})
}
return
}
// ObjectInfoChannel - object info channel
type ObjectInfoChannel struct {
ch <-chan ObjectInfo
objInfo *ObjectInfo
closed bool
timeoutCh <-chan struct{}
timedOut bool
}
func (oic *ObjectInfoChannel) Read() (ObjectInfo, bool) {
if oic.closed {
return ObjectInfo{}, false
}
if oic.objInfo == nil {
// first read
if oi, ok := <-oic.ch; ok {
oic.objInfo = &oi
} else {
oic.closed = true
return ObjectInfo{}, false
}
}
retObjInfo := *oic.objInfo
status := true
oic.objInfo = nil
// read once more to know whether it was last read
if oi, ok := <-oic.ch; ok {
oic.objInfo = &oi
} else {
oic.closed = true
}
return retObjInfo, status
}
// IsClosed - return whether channel is closed or not
func (oic ObjectInfoChannel) IsClosed() bool {
if oic.objInfo != nil {
return false
}
return oic.closed
}
// IsTimedOut - return whether channel is closed due to timeout
func (oic ObjectInfoChannel) IsTimedOut() bool {
if oic.timedOut {
return true
}
select {
case _, ok := <-oic.timeoutCh:
if ok {
oic.timedOut = true
return true
}
return false
default:
return false
}
}
// treeWalk - walk into 'scanDir' recursively when 'recursive' is true.
// It uses 'bucketDir' to get name prefix for object name.
func treeWalk(scanDir, bucketDir string, recursive bool) ObjectInfoChannel {
objectInfoCh := make(chan ObjectInfo, listObjectsLimit)
timeoutCh := make(chan struct{}, 1)
// goroutine - retrieves directory entries, makes ObjectInfo and sends into the channel
go func() {
defer close(objectInfoCh)
defer close(timeoutCh)
// send function - returns true if ObjectInfo is sent
// within (time.Second * 15) else false on time-out
send := func(oi ObjectInfo) bool {
timer := time.After(time.Second * 15)
select {
case objectInfoCh <- oi:
return true
case <-timer:
timeoutCh <- struct{}{}
return false
}
}
namePrefix := strings.Replace(filepath.ToSlash(scanDir), filepath.ToSlash(bucketDir), "", 1)
if strings.HasPrefix(namePrefix, "/") {
/* remove beginning "/" */
namePrefix = namePrefix[1:]
}
for objInfos := readDir(scanDir, namePrefix); len(objInfos) > 0; {
var objInfo ObjectInfo
objInfo, objInfos = objInfos[0], objInfos[1:]
if !send(objInfo) {
return
}
if objInfo.IsDir && recursive {
scanDir := filepath.Join(bucketDir, filepath.FromSlash(objInfo.Name))
namePrefix = strings.Replace(filepath.ToSlash(scanDir), filepath.ToSlash(bucketDir), "", 1)
if strings.HasPrefix(namePrefix, "/") {
/* remove beginning "/" */
namePrefix = namePrefix[1:]
}
objInfos = append(readDir(scanDir, namePrefix), objInfos...)
}
}
}()
return ObjectInfoChannel{ch: objectInfoCh, timeoutCh: timeoutCh}
}

@ -17,313 +17,118 @@
package fs package fs
import ( import (
"errors" "fmt"
"hash/fnv"
"net/url" "net/url"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
"github.com/minio/minio/pkg/ioutils"
"github.com/minio/minio/pkg/probe" "github.com/minio/minio/pkg/probe"
) )
// listObjectsParams - list objects input parameters.
type listObjectsParams struct {
// Bucket name to list the objects for.
Bucket string
// list all objects with this parameter as common prefix.
Prefix string
// list all objects starting with object after marker in
// lexicographical order.
Marker string
// list all objects until the first occurrence of the delimtier
// after the prefix.
Delimiter string
// maximum number of objects returned per listObjects()
// operation.
MaxKeys int
}
// listServiceReq
type listServiceReq struct {
reqParams listObjectsParams
respCh chan ListObjectsResult
}
type listWorkerReq struct {
respCh chan ListObjectsResult
}
// listObjects - list objects lists objects up to maxKeys for a given prefix.
func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKeys int) (chan<- listWorkerReq, *probe.Error) {
quitWalker := make(chan bool)
reqCh := make(chan listWorkerReq)
walkerCh := make(chan ObjectMetadata, 2000)
go func() {
defer close(walkerCh)
var walkPath string
bucketPath := filepath.Join(fs.path, bucket)
// Bucket path prefix should always end with a separator.
bucketPathPrefix := bucketPath + string(os.PathSeparator)
prefixPath := bucketPathPrefix + prefix
st, e := os.Stat(prefixPath)
if e != nil {
if os.IsNotExist(e) {
walkPath = bucketPath
}
} else {
if st.IsDir() && !strings.HasSuffix(prefix, delimiter) {
walkPath = bucketPath
} else {
walkPath = prefixPath
}
}
ioutils.FTW(walkPath, func(path string, info os.FileInfo, e error) error {
// For any error return right here.
if e != nil {
return e
}
// Skip special temporary files, kept for multipart transaction.
if strings.Contains(path, "$multiparts") || strings.Contains(path, "$tmpobject") {
return nil
}
// We don't need to list the walk path if its a directory.
if path == walkPath && info.IsDir() {
return nil
}
// Skip all directories if there is no delimiter.
if info.IsDir() && delimiter == "" {
return nil
}
// For all incoming directories add a ending separator.
if info.IsDir() {
path = path + string(os.PathSeparator)
}
// Extract object name.
objectName := strings.TrimPrefix(path, bucketPathPrefix)
if strings.HasPrefix(objectName, prefix) {
object := ObjectMetadata{
Object: objectName,
LastModified: info.ModTime(),
Mode: info.Mode(),
Size: info.Size(),
}
select {
// Send object on walker channel.
case walkerCh <- object:
case <-quitWalker:
// Returning error ends the file tree Walk().
return errors.New("Quit list worker.")
}
// If delimiter is set, we stop if current path is a directory.
if delimiter != "" && info.IsDir() {
return ioutils.ErrSkipDir
}
}
return nil
})
}()
go func() {
for {
select {
// Timeout after 30 seconds if request did not arrive for
// the given list parameters.
case <-time.After(30 * time.Second):
quitWalker <- true // Quit file path walk if running.
// Send back the hash for this request.
fs.timeoutReqCh <- fnvSum(bucket, prefix, marker, delimiter)
return
case req, ok := <-reqCh:
if !ok {
// If the request channel is closed, no more
// requests return here.
return
}
resp := ListObjectsResult{}
var count int
for {
// We have read all the keys necessary by now. We
// cleanly break out.
if count == maxKeys {
if delimiter != "" {
// Set the next marker for the next request.
// This element is set only if you have delimiter set.
// If response does not include the NextMaker and it is
// truncated, you can use the value of the last Key in the
// response as the marker in the subsequent request to get the
// next set of object keys.
if len(resp.Objects) > 0 {
// NextMarker is only set when there
// are more than maxKeys worth of
// objects for a given prefix path.
resp.NextMarker = resp.Objects[len(resp.Objects)-1:][0].Object
}
}
resp.IsTruncated = len(walkerCh) > 0
break
}
object, walkerOK := <-walkerCh
// If the channel is closed return right here.
if !walkerOK {
break
}
// Verify if the object is lexically smaller than
// the marker, we will skip those objects.
if marker != "" {
if marker >= object.Object {
continue
} else {
// Reset marker so that we avoid comparing
// again and again in a loop unecessarily.
marker = ""
}
}
if delimiter != "" {
// Prefixes are only valid wth delimiters, and
// for filesystem backend they are only valid
// if they are directories.
if object.Mode.IsDir() {
resp.Prefixes = append(resp.Prefixes, object.Object)
} else {
// Rest of them are treated as objects.
resp.Objects = append(resp.Objects, object)
}
} else {
// In-case of no delimiters, there are no
// prefixes - all are considered to be objects.
resp.Objects = append(resp.Objects, object)
}
count++ // Bump the number.
}
// Set the marker right here for the new set of the
// values coming in the from the client.
marker = resp.NextMarker
req.respCh <- resp
}
}
}()
return reqCh, nil
}
// fnvSum calculates a hash for concatenation of all input strings.
func fnvSum(elements ...string) uint32 {
fnvHash := fnv.New32a()
for _, element := range elements {
fnvHash.Write([]byte(element))
}
return fnvHash.Sum32()
}
// listObjectsService - list objects service manages various incoming
// list object requests by delegating them to an existing listObjects
// routine or initializes a new listObjects routine.
func (fs *Filesystem) listObjectsService() *probe.Error {
// Initialize list service request channel.
listServiceReqCh := make(chan listServiceReq)
fs.listServiceReqCh = listServiceReqCh
// Initialize timeout request channel to receive request hashes of
// timed-out requests.
timeoutReqCh := make(chan uint32)
fs.timeoutReqCh = timeoutReqCh
// Initialize request hash to list worker map.
reqToListWorkerReqCh := make(map[uint32]chan<- listWorkerReq)
// Start service in a go routine.
go func() {
for {
select {
case reqHash := <-timeoutReqCh:
// For requests which have timed-out, close the worker
// channels proactively, this may happen for idle
// workers once in 10seconds.
listWorkerReqCh, ok := reqToListWorkerReqCh[reqHash]
if ok {
close(listWorkerReqCh)
}
delete(reqToListWorkerReqCh, reqHash)
case srvReq := <-listServiceReqCh:
// Save the params for readability.
bucket := srvReq.reqParams.Bucket
prefix := srvReq.reqParams.Prefix
marker := srvReq.reqParams.Marker
delimiter := srvReq.reqParams.Delimiter
maxKeys := srvReq.reqParams.MaxKeys
// Generate hash.
reqHash := fnvSum(bucket, prefix, marker, delimiter)
listWorkerReqCh, ok := reqToListWorkerReqCh[reqHash]
if !ok {
var err *probe.Error
listWorkerReqCh, err = fs.listObjects(bucket, prefix, marker, delimiter, maxKeys)
if err != nil {
srvReq.respCh <- ListObjectsResult{}
return
}
reqToListWorkerReqCh[reqHash] = listWorkerReqCh
}
respCh := make(chan ListObjectsResult)
listWorkerReqCh <- listWorkerReq{respCh}
resp, ok := <-respCh
if !ok {
srvReq.respCh <- ListObjectsResult{}
return
}
delete(reqToListWorkerReqCh, reqHash)
if !resp.IsTruncated {
close(listWorkerReqCh)
}
srvReq.respCh <- resp
}
}
}()
return nil
}
// ListObjects - lists all objects for a given prefix, returns up to // ListObjects - lists all objects for a given prefix, returns up to
// maxKeys number of objects per call. // maxKeys number of objects per call.
func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsResult, *probe.Error) { func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsResult, *probe.Error) {
result := ListObjectsResult{}
// Input validation. // Input validation.
if !IsValidBucketName(bucket) { if !IsValidBucketName(bucket) {
return ListObjectsResult{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) return result, probe.NewError(BucketNameInvalid{Bucket: bucket})
} }
bucket = fs.denormalizeBucket(bucket) bucket = fs.denormalizeBucket(bucket)
rootPrefix := filepath.Join(fs.path, bucket)
// Check bucket exists. if status, err := IsDirExist(filepath.Join(fs.path, bucket)); !status {
if _, e := os.Stat(rootPrefix); e != nil { if err == nil {
if os.IsNotExist(e) { return result, probe.NewError(BucketNotFound{Bucket: bucket})
return ListObjectsResult{}, probe.NewError(BucketNotFound{Bucket: bucket}) } else if os.IsNotExist(err) {
return result, probe.NewError(BucketNotFound{Bucket: bucket})
} else {
return result, probe.NewError(err)
} }
return ListObjectsResult{}, probe.NewError(e)
} }
// Unescape the marker values. if delimiter != "" && delimiter != "/" {
markerUnescaped, e := url.QueryUnescape(marker) return result, probe.NewError(fmt.Errorf("delimiter '%s' is not supported", delimiter))
if e != nil {
return ListObjectsResult{}, probe.NewError(e)
} }
reqParams := listObjectsParams{} if marker != "" {
reqParams.Bucket = bucket if markerUnescaped, err := url.QueryUnescape(marker); err == nil {
reqParams.Prefix = filepath.FromSlash(prefix) marker = markerUnescaped
reqParams.Marker = filepath.FromSlash(markerUnescaped) } else {
reqParams.Delimiter = filepath.FromSlash(delimiter) return result, probe.NewError(err)
reqParams.MaxKeys = maxKeys
respCh := make(chan ListObjectsResult)
fs.listServiceReqCh <- listServiceReq{reqParams, respCh}
resp := <-respCh
for i := range resp.Prefixes {
resp.Prefixes[i] = filepath.ToSlash(resp.Prefixes[i])
} }
for i := range resp.Objects {
resp.Objects[i].Object = filepath.ToSlash(resp.Objects[i].Object) if !strings.HasPrefix(marker, prefix) {
return result, probe.NewError(fmt.Errorf("marker '%s' and prefix '%s' do not match", marker, prefix))
} }
return resp, nil }
if maxKeys <= 0 || maxKeys > listObjectsLimit {
maxKeys = listObjectsLimit
}
bucketDir := filepath.Join(fs.path, bucket)
recursive := true
skipDir := true
if delimiter == "/" {
skipDir = false
recursive = false
}
prefixDir := filepath.Dir(filepath.FromSlash(prefix))
rootDir := filepath.Join(bucketDir, prefixDir)
objectInfoCh := fs.popListObjectCh(ListObjectParams{bucket, delimiter, marker, prefix})
if objectInfoCh == nil {
ch := treeWalk(rootDir, bucketDir, recursive)
objectInfoCh = &ch
}
nextMarker := ""
for i := 0; i < maxKeys; {
objInfo, ok := objectInfoCh.Read()
if !ok {
// closed channel
return result, nil
}
if objInfo.Err != nil {
return ListObjectsResult{}, probe.NewError(objInfo.Err)
}
if strings.Contains(objInfo.Name, "$multiparts") || strings.Contains(objInfo.Name, "$tmpobject") {
continue
}
if objInfo.IsDir && skipDir {
continue
}
if strings.HasPrefix(objInfo.Name, prefix) {
if objInfo.Name > marker {
if objInfo.IsDir {
result.Prefixes = append(result.Prefixes, objInfo.Name)
} else {
result.Objects = append(result.Objects, ObjectMetadata{
Bucket: bucket,
Object: objInfo.Name,
LastModified: objInfo.ModifiedTime,
Size: objInfo.Size,
})
}
nextMarker = objInfo.Name
i++
}
}
}
if !objectInfoCh.IsClosed() {
result.IsTruncated = true
result.NextMarker = nextMarker
fs.pushListObjectCh(ListObjectParams{bucket, delimiter, nextMarker, prefix}, *objectInfoCh)
}
return result, nil
} }

@ -25,14 +25,59 @@ import (
"github.com/minio/minio/pkg/probe" "github.com/minio/minio/pkg/probe"
) )
// ListObjectParams - list object params used for list object map
type ListObjectParams struct {
bucket string
delimiter string
marker string
prefix string
}
// Filesystem - local variables // Filesystem - local variables
type Filesystem struct { type Filesystem struct {
path string path string
minFreeDisk int64 minFreeDisk int64
rwLock *sync.RWMutex rwLock *sync.RWMutex
multiparts *Multiparts multiparts *Multiparts
listServiceReqCh chan<- listServiceReq listObjectMap map[ListObjectParams][]ObjectInfoChannel
timeoutReqCh chan<- uint32 listObjectMapMutex *sync.Mutex
}
func (fs *Filesystem) pushListObjectCh(params ListObjectParams, ch ObjectInfoChannel) {
fs.listObjectMapMutex.Lock()
defer fs.listObjectMapMutex.Unlock()
channels := []ObjectInfoChannel{ch}
if _, ok := fs.listObjectMap[params]; ok {
channels = append(fs.listObjectMap[params], ch)
}
fs.listObjectMap[params] = channels
}
func (fs *Filesystem) popListObjectCh(params ListObjectParams) *ObjectInfoChannel {
fs.listObjectMapMutex.Lock()
defer fs.listObjectMapMutex.Unlock()
if channels, ok := fs.listObjectMap[params]; ok {
for i, channel := range channels {
if !channel.IsTimedOut() {
chs := channels[i+1:]
if len(chs) > 0 {
fs.listObjectMap[params] = chs
} else {
delete(fs.listObjectMap, params)
}
return &channel
}
}
// As all channels are timed out, delete the map entry
delete(fs.listObjectMap, params)
}
return nil
} }
// MultipartSession holds active session information // MultipartSession holds active session information
@ -83,10 +128,9 @@ func New(rootPath string, minFreeDisk int64) (Filesystem, *probe.Error) {
// minium free disk required for i/o operations to succeed. // minium free disk required for i/o operations to succeed.
fs.minFreeDisk = minFreeDisk fs.minFreeDisk = minFreeDisk
// Start list goroutine. fs.listObjectMap = make(map[ListObjectParams][]ObjectInfoChannel)
if err = fs.listObjectsService(); err != nil { fs.listObjectMapMutex = &sync.Mutex{}
return Filesystem{}, err.Trace(rootPath)
}
// Return here. // Return here.
return fs, nil return fs, nil
} }