objectapi: Simplify ListMultipart combine recursive and non-recursive. (#1390)

Fixes #1365
This commit is contained in:
Harshavardhana 2016-04-26 17:57:16 -07:00 committed by Anand Babu (AB) Periasamy
parent ad1abc4486
commit 90987df9b4
2 changed files with 131 additions and 118 deletions

View File

@ -26,6 +26,7 @@ import (
"strconv"
"strings"
"github.com/Sirupsen/logrus"
"github.com/minio/minio/pkg/probe"
"github.com/skyrings/skyring-common/tools/uuid"
)
@ -47,15 +48,20 @@ func (o objectAPI) isBucketExist(bucketName string) (bool, error) {
return true, nil
}
// checkLeafDirectory - verifies if a given path is leaf directory if
// yes returns all the files inside it.
func (o objectAPI) checkLeafDirectory(prefixPath string) (isLeaf bool, fis []FileInfo) {
// listLeafEntries - lists all entries if a given prefixPath is a leaf
// directory, returns error if any - returns empty list if prefixPath
// is not a leaf directory.
func (o objectAPI) listLeafEntries(prefixPath string) (entries []FileInfo, e error) {
var allFileInfos []FileInfo
var markerPath string
for {
fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, prefixPath, markerPath, false, 1000)
if e != nil {
break
log.WithFields(logrus.Fields{
"prefixPath": prefixPath,
"markerPath": markerPath,
}).Errorf("%s", e)
return nil, e
}
allFileInfos = append(allFileInfos, fileInfos...)
if eof {
@ -65,16 +71,96 @@ func (o objectAPI) checkLeafDirectory(prefixPath string) (isLeaf bool, fis []Fil
}
for _, fileInfo := range allFileInfos {
if fileInfo.Mode.IsDir() {
isLeaf = false
return isLeaf, nil
// If a directory is found, doesn't return anything.
return nil, nil
}
fileName := path.Base(fileInfo.Name)
if !strings.Contains(fileName, ".") {
fis = append(fis, fileInfo)
entries = append(entries, fileInfo)
}
}
isLeaf = true
return isLeaf, fis
return entries, nil
}
// listMetaVolumeFiles - list all files at a given prefix inside minioMetaVolume.
func (o objectAPI) listMetaVolumeFiles(prefixPath string, markerPath string, recursive bool, maxKeys int) (allFileInfos []FileInfo, eof bool, e error) {
// newMaxKeys tracks the size of entries which are going to be
// returned back.
var newMaxKeys int
// Following loop gathers and filters out special files inside
// minio meta volume.
for {
var fileInfos []FileInfo
// List files up to maxKeys-newMaxKeys, since we are skipping entries for special files.
fileInfos, eof, e = o.storage.ListFiles(minioMetaVolume, prefixPath, markerPath, recursive, maxKeys-newMaxKeys)
if e != nil {
log.WithFields(logrus.Fields{
"prefixPath": prefixPath,
"markerPath": markerPath,
"recursive": recursive,
"maxKeys": maxKeys,
}).Errorf("%s", e)
return nil, true, e
}
// Loop through and validate individual file.
for _, fi := range fileInfos {
var entries []FileInfo
// List all the entries if fi.Name is a leaf directory, if
// fi.Name is not a leaf directory them the resulting
// entries are empty.
entries, e = o.listLeafEntries(fi.Name)
if e != nil {
log.WithFields(logrus.Fields{
"prefixPath": fi.Name,
}).Errorf("%s", e)
return nil, false, e
}
// Set markerPath for next batch of listing.
markerPath = fi.Name
if len(entries) > 0 {
for _, entry := range entries {
// Skip the entries for erasure parts if any.
if strings.Contains(path.Base(entry.Name), ".") {
continue
}
allFileInfos = append(allFileInfos, entry)
}
} else {
// Skip special files.
specialFile := path.Base(fi.Name)
if strings.Contains(specialFile, ".") {
// Contains partnumber and md5sum info, skip this.
continue
}
allFileInfos = append(allFileInfos, fi)
}
newMaxKeys++
// If we have reached the maxKeys, it means we have listed
// everything that was requested. Return right here.
if newMaxKeys == maxKeys {
// Returns all the entries until maxKeys entries.
//
// eof is deliberately set as false since most of the
// time if newMaxKeys == maxKeys, there are most
// probably more than 1000 multipart sessions in
// progress.
//
// Setting this here allows us to set proper Markers
// so that the subsequent call returns the next set of
// entries.
eof = false
return allFileInfos, eof, nil
}
}
// If we have reached eof then we break out.
if eof {
break
}
}
// Return entries here.
return allFileInfos, eof, nil
}
// ListMultipartUploads - list multipart uploads.
@ -122,124 +208,51 @@ func (o objectAPI) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarke
if delimiter == slashSeparator {
recursive = false
}
result.IsTruncated = true
result.MaxUploads = maxUploads
newMaxUploads := 0
// Not using path.Join() as it strips off the trailing '/'.
// Also bucket should always be followed by '/' even if prefix is empty.
prefixPath := pathJoin(bucket, prefix)
if recursive {
keyMarkerPath := ""
if keyMarker != "" {
keyMarkerPath = path.Join(bucket, keyMarker, uploadIDMarker)
}
outerLoop:
for {
fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, prefixPath, keyMarkerPath, recursive, maxUploads-newMaxUploads)
if e != nil {
return ListMultipartsInfo{}, probe.NewError(e)
}
for _, fi := range fileInfos {
keyMarkerPath = fi.Name
// fi.Name will look like bucket/object/uploadID, extract object and uploadID.
uploadID := path.Base(fi.Name)
objectName := strings.TrimPrefix(path.Dir(fi.Name), retainSlash(bucket))
if strings.Contains(uploadID, ".") {
// Contains partnumber and md5sum info, skip this.
continue
}
result.Uploads = append(result.Uploads, uploadMetadata{
Object: objectName,
UploadID: uploadID,
Initiated: fi.ModTime,
})
result.NextKeyMarker = objectName
result.NextUploadIDMarker = uploadID
newMaxUploads++
if newMaxUploads == maxUploads {
if eof {
result.IsTruncated = false
}
break outerLoop
}
}
if eof {
result.IsTruncated = false
break
}
}
if !result.IsTruncated {
result.NextKeyMarker = ""
result.NextUploadIDMarker = ""
}
return result, nil
keyMarkerPath := ""
if keyMarker != "" {
keyMarkerPath = path.Join(bucket, keyMarker, uploadIDMarker)
}
var fileInfos []FileInfo
// read all the "fileInfos" in the prefix
for {
marker := ""
fis, eof, e := o.storage.ListFiles(minioMetaVolume, prefixPath, marker, recursive, 1000)
if e != nil {
return ListMultipartsInfo{}, probe.NewError(e)
}
for _, fi := range fis {
marker = fi.Name
if fi.Mode.IsDir() {
fileInfos = append(fileInfos, fi)
}
}
if eof {
break
}
// List all the multipart files at prefixPath, starting with
// marker keyMarkerPath.
fileInfos, eof, e := o.listMetaVolumeFiles(prefixPath, keyMarkerPath, recursive, maxUploads)
if e != nil {
log.WithFields(logrus.Fields{
"prefixPath": prefixPath,
"markerPath": keyMarkerPath,
"recursive": recursive,
"maxUploads": maxUploads,
}).Errorf("listMetaVolumeFiles failed with %s", e)
return ListMultipartsInfo{}, probe.NewError(e)
}
// Create "uploads" slice from "fileInfos" slice.
var uploads []uploadMetadata
for _, fi := range fileInfos {
leaf, entries := o.checkLeafDirectory(fi.Name)
objectName := strings.TrimPrefix(fi.Name, retainSlash(bucket))
if leaf {
for _, entry := range entries {
if strings.Contains(path.Base(entry.Name), ".") {
continue
}
uploads = append(uploads, uploadMetadata{
Object: path.Dir(objectName),
UploadID: path.Base(entry.Name),
Initiated: entry.ModTime,
})
}
var objectName string
var uploadID string
if fi.Mode.IsDir() {
objectName = strings.TrimPrefix(fi.Name, retainSlash(bucket))
// For a directory entry
result.CommonPrefixes = append(result.CommonPrefixes, objectName)
continue
} else {
uploadID = path.Base(fi.Name)
objectName = strings.TrimPrefix(path.Dir(fi.Name), retainSlash(bucket))
result.Uploads = append(result.Uploads, uploadMetadata{
Object: objectName,
UploadID: uploadID,
Initiated: fi.ModTime,
})
}
uploads = append(uploads, uploadMetadata{
Object: objectName,
})
result.NextKeyMarker = objectName
result.NextUploadIDMarker = uploadID
}
index := 0
for i, upload := range uploads {
index = i
if upload.Object > keyMarker {
break
}
if uploads[index].Object == keyMarker && upload.UploadID > uploadIDMarker {
break
}
}
for ; index < len(uploads); index++ {
if (len(result.Uploads) + len(result.CommonPrefixes)) == maxUploads {
break
}
result.NextKeyMarker = uploads[index].Object
if strings.HasSuffix(uploads[index].Object, slashSeparator) {
// for a directory entry
result.CommonPrefixes = append(result.CommonPrefixes, uploads[index].Object)
continue
}
result.NextUploadIDMarker = uploads[index].UploadID
result.Uploads = append(result.Uploads, uploads[index])
}
if index == len(uploads) {
result.IsTruncated = false
result.IsTruncated = !eof
if !result.IsTruncated {
result.NextKeyMarker = ""
result.NextUploadIDMarker = ""
}

View File

@ -531,7 +531,7 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Errorf("getReadableDisks failed with %s", err)
}).Errorf("listOnlineDisks failed with %s", err)
return FileInfo{}, err
}
@ -540,7 +540,7 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Errorf("doHealFile failed with %s", err)
}).Errorf("healFile failed with %s", err)
return FileInfo{}, err
}
}