From 90987df9b40fd43660be2abd524f100c72b7ef9b Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 26 Apr 2016 17:57:16 -0700 Subject: [PATCH] objectapi: Simplify ListMultipart combine recursive and non-recursive. (#1390) Fixes #1365 --- object-api-multipart.go | 245 +++++++++++++++++++++------------------- xl-v1.go | 4 +- 2 files changed, 131 insertions(+), 118 deletions(-) diff --git a/object-api-multipart.go b/object-api-multipart.go index 4b85e6603..ab00226b1 100644 --- a/object-api-multipart.go +++ b/object-api-multipart.go @@ -26,6 +26,7 @@ import ( "strconv" "strings" + "github.com/Sirupsen/logrus" "github.com/minio/minio/pkg/probe" "github.com/skyrings/skyring-common/tools/uuid" ) @@ -47,15 +48,20 @@ func (o objectAPI) isBucketExist(bucketName string) (bool, error) { return true, nil } -// checkLeafDirectory - verifies if a given path is leaf directory if -// yes returns all the files inside it. -func (o objectAPI) checkLeafDirectory(prefixPath string) (isLeaf bool, fis []FileInfo) { +// listLeafEntries - lists all entries if a given prefixPath is a leaf +// directory, returns error if any - returns empty list if prefixPath +// is not a leaf directory. +func (o objectAPI) listLeafEntries(prefixPath string) (entries []FileInfo, e error) { var allFileInfos []FileInfo var markerPath string for { fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, prefixPath, markerPath, false, 1000) if e != nil { - break + log.WithFields(logrus.Fields{ + "prefixPath": prefixPath, + "markerPath": markerPath, + }).Errorf("%s", e) + return nil, e } allFileInfos = append(allFileInfos, fileInfos...) if eof { @@ -65,16 +71,96 @@ func (o objectAPI) checkLeafDirectory(prefixPath string) (isLeaf bool, fis []Fil } for _, fileInfo := range allFileInfos { if fileInfo.Mode.IsDir() { - isLeaf = false - return isLeaf, nil + // If a directory is found, doesn't return anything. + return nil, nil } fileName := path.Base(fileInfo.Name) if !strings.Contains(fileName, ".") { - fis = append(fis, fileInfo) + entries = append(entries, fileInfo) } } - isLeaf = true - return isLeaf, fis + return entries, nil +} + +// listMetaVolumeFiles - list all files at a given prefix inside minioMetaVolume. +func (o objectAPI) listMetaVolumeFiles(prefixPath string, markerPath string, recursive bool, maxKeys int) (allFileInfos []FileInfo, eof bool, e error) { + // newMaxKeys tracks the size of entries which are going to be + // returned back. + var newMaxKeys int + + // Following loop gathers and filters out special files inside + // minio meta volume. + for { + var fileInfos []FileInfo + // List files up to maxKeys-newMaxKeys, since we are skipping entries for special files. + fileInfos, eof, e = o.storage.ListFiles(minioMetaVolume, prefixPath, markerPath, recursive, maxKeys-newMaxKeys) + if e != nil { + log.WithFields(logrus.Fields{ + "prefixPath": prefixPath, + "markerPath": markerPath, + "recursive": recursive, + "maxKeys": maxKeys, + }).Errorf("%s", e) + return nil, true, e + } + // Loop through and validate individual file. + for _, fi := range fileInfos { + var entries []FileInfo + // List all the entries if fi.Name is a leaf directory, if + // fi.Name is not a leaf directory them the resulting + // entries are empty. + entries, e = o.listLeafEntries(fi.Name) + if e != nil { + log.WithFields(logrus.Fields{ + "prefixPath": fi.Name, + }).Errorf("%s", e) + return nil, false, e + } + // Set markerPath for next batch of listing. + markerPath = fi.Name + if len(entries) > 0 { + for _, entry := range entries { + // Skip the entries for erasure parts if any. + if strings.Contains(path.Base(entry.Name), ".") { + continue + } + allFileInfos = append(allFileInfos, entry) + } + } else { + // Skip special files. + specialFile := path.Base(fi.Name) + if strings.Contains(specialFile, ".") { + // Contains partnumber and md5sum info, skip this. + continue + } + allFileInfos = append(allFileInfos, fi) + } + newMaxKeys++ + // If we have reached the maxKeys, it means we have listed + // everything that was requested. Return right here. + if newMaxKeys == maxKeys { + // Returns all the entries until maxKeys entries. + // + // eof is deliberately set as false since most of the + // time if newMaxKeys == maxKeys, there are most + // probably more than 1000 multipart sessions in + // progress. + // + // Setting this here allows us to set proper Markers + // so that the subsequent call returns the next set of + // entries. + eof = false + return allFileInfos, eof, nil + } + } + // If we have reached eof then we break out. + if eof { + break + } + } + + // Return entries here. + return allFileInfos, eof, nil } // ListMultipartUploads - list multipart uploads. @@ -122,124 +208,51 @@ func (o objectAPI) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarke if delimiter == slashSeparator { recursive = false } + result.IsTruncated = true result.MaxUploads = maxUploads - newMaxUploads := 0 + // Not using path.Join() as it strips off the trailing '/'. // Also bucket should always be followed by '/' even if prefix is empty. prefixPath := pathJoin(bucket, prefix) - if recursive { - keyMarkerPath := "" - if keyMarker != "" { - keyMarkerPath = path.Join(bucket, keyMarker, uploadIDMarker) - } - outerLoop: - for { - fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, prefixPath, keyMarkerPath, recursive, maxUploads-newMaxUploads) - if e != nil { - return ListMultipartsInfo{}, probe.NewError(e) - } - for _, fi := range fileInfos { - keyMarkerPath = fi.Name - // fi.Name will look like bucket/object/uploadID, extract object and uploadID. - uploadID := path.Base(fi.Name) - objectName := strings.TrimPrefix(path.Dir(fi.Name), retainSlash(bucket)) - if strings.Contains(uploadID, ".") { - // Contains partnumber and md5sum info, skip this. - continue - } - result.Uploads = append(result.Uploads, uploadMetadata{ - Object: objectName, - UploadID: uploadID, - Initiated: fi.ModTime, - }) - result.NextKeyMarker = objectName - result.NextUploadIDMarker = uploadID - newMaxUploads++ - if newMaxUploads == maxUploads { - if eof { - result.IsTruncated = false - } - break outerLoop - } - } - if eof { - result.IsTruncated = false - break - } - } - if !result.IsTruncated { - result.NextKeyMarker = "" - result.NextUploadIDMarker = "" - } - return result, nil + keyMarkerPath := "" + if keyMarker != "" { + keyMarkerPath = path.Join(bucket, keyMarker, uploadIDMarker) } - - var fileInfos []FileInfo - // read all the "fileInfos" in the prefix - for { - marker := "" - fis, eof, e := o.storage.ListFiles(minioMetaVolume, prefixPath, marker, recursive, 1000) - if e != nil { - return ListMultipartsInfo{}, probe.NewError(e) - } - for _, fi := range fis { - marker = fi.Name - if fi.Mode.IsDir() { - fileInfos = append(fileInfos, fi) - } - } - if eof { - break - } + // List all the multipart files at prefixPath, starting with + // marker keyMarkerPath. + fileInfos, eof, e := o.listMetaVolumeFiles(prefixPath, keyMarkerPath, recursive, maxUploads) + if e != nil { + log.WithFields(logrus.Fields{ + "prefixPath": prefixPath, + "markerPath": keyMarkerPath, + "recursive": recursive, + "maxUploads": maxUploads, + }).Errorf("listMetaVolumeFiles failed with %s", e) + return ListMultipartsInfo{}, probe.NewError(e) } - // Create "uploads" slice from "fileInfos" slice. - var uploads []uploadMetadata for _, fi := range fileInfos { - leaf, entries := o.checkLeafDirectory(fi.Name) - objectName := strings.TrimPrefix(fi.Name, retainSlash(bucket)) - if leaf { - for _, entry := range entries { - if strings.Contains(path.Base(entry.Name), ".") { - continue - } - uploads = append(uploads, uploadMetadata{ - Object: path.Dir(objectName), - UploadID: path.Base(entry.Name), - Initiated: entry.ModTime, - }) - } + var objectName string + var uploadID string + if fi.Mode.IsDir() { + objectName = strings.TrimPrefix(fi.Name, retainSlash(bucket)) + // For a directory entry + result.CommonPrefixes = append(result.CommonPrefixes, objectName) continue + } else { + uploadID = path.Base(fi.Name) + objectName = strings.TrimPrefix(path.Dir(fi.Name), retainSlash(bucket)) + result.Uploads = append(result.Uploads, uploadMetadata{ + Object: objectName, + UploadID: uploadID, + Initiated: fi.ModTime, + }) } - uploads = append(uploads, uploadMetadata{ - Object: objectName, - }) + result.NextKeyMarker = objectName + result.NextUploadIDMarker = uploadID } - index := 0 - for i, upload := range uploads { - index = i - if upload.Object > keyMarker { - break - } - if uploads[index].Object == keyMarker && upload.UploadID > uploadIDMarker { - break - } - } - for ; index < len(uploads); index++ { - if (len(result.Uploads) + len(result.CommonPrefixes)) == maxUploads { - break - } - result.NextKeyMarker = uploads[index].Object - if strings.HasSuffix(uploads[index].Object, slashSeparator) { - // for a directory entry - result.CommonPrefixes = append(result.CommonPrefixes, uploads[index].Object) - continue - } - result.NextUploadIDMarker = uploads[index].UploadID - result.Uploads = append(result.Uploads, uploads[index]) - } - if index == len(uploads) { - result.IsTruncated = false + result.IsTruncated = !eof + if !result.IsTruncated { result.NextKeyMarker = "" result.NextUploadIDMarker = "" } diff --git a/xl-v1.go b/xl-v1.go index 5fbfd07ae..54d8c3119 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -531,7 +531,7 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Errorf("getReadableDisks failed with %s", err) + }).Errorf("listOnlineDisks failed with %s", err) return FileInfo{}, err } @@ -540,7 +540,7 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) { log.WithFields(logrus.Fields{ "volume": volume, "path": path, - }).Errorf("doHealFile failed with %s", err) + }).Errorf("healFile failed with %s", err) return FileInfo{}, err } }