From 6dc8323684369e9437bfa85db52e6488cad2c93e Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Fri, 27 May 2016 03:13:17 +0530 Subject: [PATCH] FS/ListMultipart: Fix FS list-multipart to work for unit test cases. --- fs-v1-multipart.go | 211 ++++++++++++++++---------------------- fs-v1.go | 26 ++++- test-utils_test.go | 6 +- tree-walk-fs.go | 48 +++------ xl-v1-multipart-common.go | 150 +++++++++++++++++---------- xl-v1-multipart.go | 25 ++--- 6 files changed, 236 insertions(+), 230 deletions(-) diff --git a/fs-v1-multipart.go b/fs-v1-multipart.go index da53477c9..b64c4e6c5 100644 --- a/fs-v1-multipart.go +++ b/fs-v1-multipart.go @@ -92,22 +92,16 @@ func (fs fsObjects) newMultipartUploadCommon(bucket string, object string, meta return uploadID, nil } -func isMultipartObject(storage StorageAPI, bucket, prefix string) bool { - _, err := storage.StatFile(bucket, path.Join(prefix, fsMetaJSONFile)) - if err != nil { - if err == errFileNotFound { - return false - } - errorIf(err, "Unable to access "+path.Join(prefix, fsMetaJSONFile)) - return false - } - return true +// Returns if the prefix is a multipart upload. +func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool { + _, err := fs.storage.StatFile(bucket, pathJoin(prefix, uploadsJSONFile)) + return err == nil } // listUploadsInfo - list all uploads info. func (fs fsObjects) listUploadsInfo(prefixPath string) (uploads []uploadInfo, err error) { splitPrefixes := strings.SplitN(prefixPath, "/", 3) - uploadIDs, err := getUploadIDs(splitPrefixes[1], splitPrefixes[2], fs.storage) + uploadIDs, err := readUploadsJSON(splitPrefixes[1], splitPrefixes[2], fs.storage) if err != nil { if err == errFileNotFound { return []uploadInfo{}, nil @@ -118,96 +112,6 @@ func (fs fsObjects) listUploadsInfo(prefixPath string) (uploads []uploadInfo, er return uploads, nil } -// listMetaBucketMultipart - list all objects at a given prefix inside minioMetaBucket. -func (fs fsObjects) listMetaBucketMultipart(prefixPath string, markerPath string, recursive bool, maxKeys int) (fileInfos []FileInfo, eof bool, err error) { - walker := fs.lookupTreeWalk(listParams{minioMetaBucket, recursive, markerPath, prefixPath}) - if walker == nil { - walker = fs.startTreeWalk(minioMetaBucket, prefixPath, markerPath, recursive) - } - - // newMaxKeys tracks the size of entries which are going to be - // returned back. - var newMaxKeys int - - // Following loop gathers and filters out special files inside - // minio meta volume. - for { - walkResult, ok := <-walker.ch - if !ok { - // Closed channel. - eof = true - break - } - // For any walk error return right away. - if walkResult.err != nil { - // File not found or Disk not found is a valid case. - if walkResult.err == errFileNotFound || walkResult.err == errDiskNotFound { - return nil, true, nil - } - return nil, false, toObjectErr(walkResult.err, minioMetaBucket, prefixPath) - } - fileInfo := walkResult.fileInfo - var uploads []uploadInfo - if fileInfo.Mode.IsDir() { - // List all the entries if fi.Name is a leaf directory, if - // fi.Name is not a leaf directory then the resulting - // entries are empty. - uploads, err = fs.listUploadsInfo(fileInfo.Name) - if err != nil { - return nil, false, err - } - } - if len(uploads) > 0 { - for _, upload := range uploads { - fileInfos = append(fileInfos, FileInfo{ - Name: path.Join(fileInfo.Name, upload.UploadID), - ModTime: upload.Initiated, - }) - newMaxKeys++ - // If we have reached the maxKeys, it means we have listed - // everything that was requested. - if newMaxKeys == maxKeys { - break - } - } - } else { - // We reach here for a non-recursive case non-leaf entry - // OR recursive case with fi.Name. - if !fileInfo.Mode.IsDir() { // Do not skip non-recursive case directory entries. - // Validate if 'fi.Name' is incomplete multipart. - if !strings.HasSuffix(fileInfo.Name, fsMetaJSONFile) { - continue - } - fileInfo.Name = path.Dir(fileInfo.Name) - } - fileInfos = append(fileInfos, fileInfo) - newMaxKeys++ - // If we have reached the maxKeys, it means we have listed - // everything that was requested. - if newMaxKeys == maxKeys { - break - } - } - } - - if !eof && len(fileInfos) != 0 { - // EOF has not reached, hence save the walker channel to the map so that the walker go routine - // can continue from where it left off for the next list request. - lastFileInfo := fileInfos[len(fileInfos)-1] - markerPath = lastFileInfo.Name - fs.saveTreeWalk(listParams{minioMetaBucket, recursive, markerPath, prefixPath}, walker) - } - - // Return entries here. - return fileInfos, eof, nil -} - -// FIXME: Currently the code sorts based on keyName/upload-id which is -// not correct based on the S3 specs. According to s3 specs we are -// supposed to only lexically sort keyNames and then for keyNames with -// multiple upload ids should be sorted based on the initiated time. -// Currently this case is not handled. - // listMultipartUploadsCommon - lists all multipart uploads, common function for both object layers. func (fs fsObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { result := ListMultipartsInfo{} @@ -259,9 +163,12 @@ func (fs fsObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, upload result.IsTruncated = true result.MaxUploads = maxUploads + result.KeyMarker = keyMarker + result.Prefix = prefix + result.Delimiter = delimiter // Not using path.Join() as it strips off the trailing '/'. - multipartPrefixPath := pathJoin(mpartMetaPrefix, pathJoin(bucket, prefix)) + multipartPrefixPath := pathJoin(mpartMetaPrefix, bucket, prefix) if prefix == "" { // Should have a trailing "/" if prefix is "" // For ex. multipartPrefixPath should be "multipart/bucket/" if prefix is "" @@ -269,33 +176,83 @@ func (fs fsObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, upload } multipartMarkerPath := "" if keyMarker != "" { - keyMarkerPath := pathJoin(pathJoin(bucket, keyMarker), uploadIDMarker) - multipartMarkerPath = pathJoin(mpartMetaPrefix, keyMarkerPath) + multipartMarkerPath = pathJoin(mpartMetaPrefix, bucket, keyMarker) } - - // List all the multipart files at prefixPath, starting with marker keyMarkerPath. - fileInfos, eof, err := fs.listMetaBucketMultipart(multipartPrefixPath, multipartMarkerPath, recursive, maxUploads) - if err != nil { - return ListMultipartsInfo{}, err + var uploads []uploadMetadata + var err error + var eof bool + if uploadIDMarker != "" { + uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, fs.storage) + if err != nil { + return ListMultipartsInfo{}, err + } + maxUploads = maxUploads - len(uploads) } - - // Loop through all the received files fill in the multiparts result. - for _, fileInfo := range fileInfos { + if maxUploads > 0 { + walker := fs.lookupTreeWalk(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath}) + if walker == nil { + walker = fs.startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, func(bucket, object string) bool { + return fs.isMultipartUpload(bucket, object) + }) + } + for maxUploads > 0 { + walkResult, ok := <-walker.ch + if !ok { + // Closed channel. + eof = true + break + } + // For any walk error return right away. + if walkResult.err != nil { + // File not found or Disk not found is a valid case. + if walkResult.err == errFileNotFound || walkResult.err == errDiskNotFound { + eof = true + break + } + return ListMultipartsInfo{}, err + } + entry := strings.TrimPrefix(walkResult.entry, retainSlash(pathJoin(mpartMetaPrefix, bucket))) + if strings.HasSuffix(walkResult.entry, slashSeparator) { + uploads = append(uploads, uploadMetadata{ + Object: entry, + }) + maxUploads-- + if maxUploads == 0 { + if walkResult.end { + eof = true + break + } + } + continue + } + var tmpUploads []uploadMetadata + var end bool + uploadIDMarker = "" + tmpUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, fs.storage) + if err != nil { + return ListMultipartsInfo{}, err + } + uploads = append(uploads, tmpUploads...) + maxUploads -= len(tmpUploads) + if walkResult.end && end { + eof = true + break + } + } + } + // Loop through all the received uploads fill in the multiparts result. + for _, upload := range uploads { var objectName string var uploadID string - if fileInfo.Mode.IsDir() { + if strings.HasSuffix(upload.Object, slashSeparator) { // All directory entries are common prefixes. uploadID = "" // Upload ids are empty for CommonPrefixes. - objectName = strings.TrimPrefix(fileInfo.Name, retainSlash(pathJoin(mpartMetaPrefix, bucket))) + objectName = upload.Object result.CommonPrefixes = append(result.CommonPrefixes, objectName) } else { - uploadID = path.Base(fileInfo.Name) - objectName = strings.TrimPrefix(path.Dir(fileInfo.Name), retainSlash(pathJoin(mpartMetaPrefix, bucket))) - result.Uploads = append(result.Uploads, uploadMetadata{ - Object: objectName, - UploadID: uploadID, - Initiated: fileInfo.ModTime, - }) + uploadID = upload.UploadID + objectName = upload.Object + result.Uploads = append(result.Uploads, upload) } result.NextKeyMarker = objectName result.NextUploadIDMarker = uploadID @@ -639,13 +596,17 @@ func (fs fsObjects) abortMultipartUploadCommon(bucket, object, uploadID string) // Validate if there are other incomplete upload-id's present for // the object, if yes do not attempt to delete 'uploads.json'. - uploadIDs, err := getUploadIDs(bucket, object, fs.storage) + uploadsJSON, err := readUploadsJSON(bucket, object, fs.storage) if err == nil { - uploadIDIdx := uploadIDs.Index(uploadID) + uploadIDIdx := uploadsJSON.Index(uploadID) if uploadIDIdx != -1 { - uploadIDs.Uploads = append(uploadIDs.Uploads[:uploadIDIdx], uploadIDs.Uploads[uploadIDIdx+1:]...) + uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...) } - if len(uploadIDs.Uploads) > 0 { + if len(uploadsJSON.Uploads) > 0 { + err = updateUploadsJSON(bucket, object, uploadsJSON, fs.storage) + if err != nil { + return toObjectErr(err, bucket, object) + } return nil } } diff --git a/fs-v1.go b/fs-v1.go index 824347d88..9da06a44b 100644 --- a/fs-v1.go +++ b/fs-v1.go @@ -20,6 +20,7 @@ import ( "crypto/md5" "encoding/hex" "io" + "os" "path/filepath" "sort" "strings" @@ -289,6 +290,22 @@ func isBucketExist(storage StorageAPI, bucketName string) bool { } func (fs fsObjects) listObjectsFS(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { + // Convert entry to FileInfo + entryToFileInfo := func(entry string) (fileInfo FileInfo, err error) { + if strings.HasSuffix(entry, slashSeparator) { + // Object name needs to be full path. + fileInfo.Name = entry + fileInfo.Mode = os.ModeDir + return + } + if fileInfo, err = fs.storage.StatFile(bucket, entry); err != nil { + return + } + // Object name needs to be full path. + fileInfo.Name = entry + return + } + // Verify if bucket is valid. if !IsValidBucketName(bucket) { return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket} @@ -334,7 +351,9 @@ func (fs fsObjects) listObjectsFS(bucket, prefix, marker, delimiter string, maxK walker := fs.lookupTreeWalk(listParams{bucket, recursive, marker, prefix}) if walker == nil { - walker = fs.startTreeWalk(bucket, prefix, marker, recursive) + walker = fs.startTreeWalk(bucket, prefix, marker, recursive, func(bucket, object string) bool { + return !strings.HasSuffix(object, slashSeparator) + }) } var fileInfos []FileInfo var eof bool @@ -354,7 +373,10 @@ func (fs fsObjects) listObjectsFS(bucket, prefix, marker, delimiter string, maxK } return ListObjectsInfo{}, toObjectErr(walkResult.err, bucket, prefix) } - fileInfo := walkResult.fileInfo + fileInfo, err := entryToFileInfo(walkResult.entry) + if err != nil { + return ListObjectsInfo{}, nil + } nextMarker = fileInfo.Name fileInfos = append(fileInfos, fileInfo) if walkResult.end { diff --git a/test-utils_test.go b/test-utils_test.go index ca3c8255b..6589e90cb 100644 --- a/test-utils_test.go +++ b/test-utils_test.go @@ -86,10 +86,8 @@ func ExecObjectLayerTest(t *testing.T, objTest func(obj ObjectLayer, instanceTyp t.Fatalf("Initialization of object layer failed for single node setup: %s", err.Error()) } // FIXME: enable FS tests after fixing it. - if false { - // Executing the object layer tests for single node setup. - objTest(objLayer, singleNodeTestStr, t) - } + // Executing the object layer tests for single node setup. + objTest(objLayer, singleNodeTestStr, t) objLayer, fsDirs, err := getXLObjectLayer() if err != nil { diff --git a/tree-walk-fs.go b/tree-walk-fs.go index 3394e3e7f..25db24ee3 100644 --- a/tree-walk-fs.go +++ b/tree-walk-fs.go @@ -17,7 +17,6 @@ package main import ( - "os" "path" "sort" "strings" @@ -34,34 +33,17 @@ type treeWalkerFS struct { // Tree walk result carries results of tree walking. type treeWalkResultFS struct { - fileInfo FileInfo - err error - end bool + entry string + err error + end bool } // treeWalk walks FS directory tree recursively pushing fileInfo into the channel as and when it encounters files. -func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResultFS) bool, count *int) bool { +func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, send func(treeWalkResultFS) bool, count *int, isLeaf func(string, string) bool) bool { // Example: // if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively // called with prefixDir="one/two/three/four/" and marker="five.txt" - // Convert entry to FileInfo - entryToFileInfo := func(entry string) (fileInfo FileInfo, err error) { - if strings.HasSuffix(entry, slashSeparator) { - // Object name needs to be full path. - fileInfo.Name = path.Join(prefixDir, entry) - fileInfo.Name += slashSeparator - fileInfo.Mode = os.ModeDir - return - } - if fileInfo, err = fs.storage.StatFile(bucket, path.Join(prefixDir, entry)); err != nil { - return - } - // Object name needs to be full path. - fileInfo.Name = path.Join(prefixDir, entry) - return - } - var markerBase, markerDir string if marker != "" { // Ex: if marker="four/five.txt", markerDir="four/" markerBase="five.txt" @@ -78,12 +60,16 @@ func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, return false } - if entryPrefixMatch != "" { - for i, entry := range entries { + for i, entry := range entries { + if entryPrefixMatch != "" { if !strings.HasPrefix(entry, entryPrefixMatch) { entries[i] = "" + continue } } + if isLeaf(bucket, pathJoin(prefixDir, entry)) { + entries[i] = strings.TrimSuffix(entry, slashSeparator) + } } sort.Strings(entries) // Skip the empty strings @@ -129,19 +115,13 @@ func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, } *count-- prefixMatch := "" // Valid only for first level treeWalk and empty for subdirectories. - if !fs.treeWalk(bucket, path.Join(prefixDir, entry), prefixMatch, markerArg, recursive, send, count) { + if !fs.treeWalk(bucket, path.Join(prefixDir, entry), prefixMatch, markerArg, recursive, send, count, isLeaf) { return false } continue } *count-- - fileInfo, err := entryToFileInfo(entry) - if err != nil { - // The file got deleted in the interim between ListDir() and StatFile() - // Ignore error and continue. - continue - } - if !send(treeWalkResultFS{fileInfo: fileInfo}) { + if !send(treeWalkResultFS{entry: pathJoin(prefixDir, entry)}) { return false } } @@ -149,7 +129,7 @@ func (fs fsObjects) treeWalk(bucket, prefixDir, entryPrefixMatch, marker string, } // Initiate a new treeWalk in a goroutine. -func (fs fsObjects) startTreeWalk(bucket, prefix, marker string, recursive bool) *treeWalkerFS { +func (fs fsObjects) startTreeWalk(bucket, prefix, marker string, recursive bool, isLeaf func(string, string) bool) *treeWalkerFS { // Example 1 // If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt" // treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt" @@ -186,7 +166,7 @@ func (fs fsObjects) startTreeWalk(bucket, prefix, marker string, recursive bool) return false } } - fs.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, send, &count) + fs.treeWalk(bucket, prefixDir, entryPrefixMatch, marker, recursive, send, &count, isLeaf) }() return &walkNotify } diff --git a/xl-v1-multipart-common.go b/xl-v1-multipart-common.go index d2c758fc3..8bc50e0d2 100644 --- a/xl-v1-multipart-common.go +++ b/xl-v1-multipart-common.go @@ -91,15 +91,17 @@ func (u uploadsV1) WriteTo(writer io.Writer) (n int64, err error) { return int64(m), err } -// getUploadIDs - get all the saved upload id's. -func getUploadIDs(bucket, object string, storageDisks ...StorageAPI) (uploadIDs uploadsV1, err error) { +// readUploadsJSON - get all the saved uploads JSON. +func readUploadsJSON(bucket, object string, storageDisks ...StorageAPI) (uploadIDs uploadsV1, err error) { uploadJSONPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) var errs = make([]error, len(storageDisks)) var uploads = make([]uploadsV1, len(storageDisks)) var wg = &sync.WaitGroup{} + // Read `uploads.json` from all disks. for index, disk := range storageDisks { wg.Add(1) + // Read `uploads.json` in a routine. go func(index int, disk StorageAPI) { defer wg.Done() r, rErr := disk.ReadFile(minioMetaBucket, uploadJSONPath, int64(0)) @@ -116,8 +118,11 @@ func getUploadIDs(bucket, object string, storageDisks ...StorageAPI) (uploadIDs errs[index] = nil }(index, disk) } + + // Wait for all the routines. wg.Wait() + // Return for first error. for _, err = range errs { if err != nil { return uploadsV1{}, err @@ -128,13 +133,16 @@ func getUploadIDs(bucket, object string, storageDisks ...StorageAPI) (uploadIDs return uploads[0], nil } -func updateUploadJSON(bucket, object string, uploadIDs uploadsV1, storageDisks ...StorageAPI) error { +// uploadUploadsJSON - update `uploads.json` with new uploadsJSON for all disks. +func updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1, storageDisks ...StorageAPI) error { uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) var errs = make([]error, len(storageDisks)) var wg = &sync.WaitGroup{} + // Update `uploads.json` for all the disks. for index, disk := range storageDisks { wg.Add(1) + // Update `uploads.json` in routine. go func(index int, disk StorageAPI) { defer wg.Done() w, wErr := disk.CreateFile(minioMetaBucket, uploadsPath) @@ -142,7 +150,7 @@ func updateUploadJSON(bucket, object string, uploadIDs uploadsV1, storageDisks . errs[index] = wErr return } - _, wErr = uploadIDs.WriteTo(w) + _, wErr = uploadsJSON.WriteTo(w) if wErr != nil { errs[index] = wErr return @@ -158,8 +166,10 @@ func updateUploadJSON(bucket, object string, uploadIDs uploadsV1, storageDisks . }(index, disk) } + // Wait for all the routines to finish updating `uploads.json` wg.Wait() + // Return for first error. for _, err := range errs { if err != nil { return err @@ -169,24 +179,44 @@ func updateUploadJSON(bucket, object string, uploadIDs uploadsV1, storageDisks . return nil } +// newUploadsV1 - initialize new uploads v1. +func newUploadsV1(format string) uploadsV1 { + uploadIDs := uploadsV1{} + uploadIDs.Version = "1" + uploadIDs.Format = format + return uploadIDs +} + // writeUploadJSON - create `uploads.json` or update it with new uploadID. -func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, storageDisks ...StorageAPI) error { +func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, storageDisks ...StorageAPI) (err error) { uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) tmpUploadsPath := path.Join(tmpMetaPrefix, bucket, object, uploadsJSONFile) var errs = make([]error, len(storageDisks)) var wg = &sync.WaitGroup{} - uploadIDs, err := getUploadIDs(bucket, object, storageDisks...) - if err != nil && err != errFileNotFound { - return err + var uploadsJSON uploadsV1 + uploadsJSON, err = readUploadsJSON(bucket, object, storageDisks...) + if err != nil { + // For any other errors. + if err != errFileNotFound { + return err + } + if len(storageDisks) == 1 { + // Set uploads format to `fs` for single disk. + uploadsJSON = newUploadsV1("fs") + } else { + // Set uploads format to `xl` otherwise. + uploadsJSON = newUploadsV1("xl") + } } - uploadIDs.Version = "1" - uploadIDs.Format = "xl" - uploadIDs.AddUploadID(uploadID, initiated) + // Add a new upload id. + uploadsJSON.AddUploadID(uploadID, initiated) + // Update `uploads.json` on all disks. for index, disk := range storageDisks { wg.Add(1) + // Update `uploads.json` in a routine. go func(index int, disk StorageAPI) { defer wg.Done() w, wErr := disk.CreateFile(minioMetaBucket, tmpUploadsPath) @@ -194,7 +224,7 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora errs[index] = wErr return } - _, wErr = uploadIDs.WriteTo(w) + _, wErr = uploadsJSON.WriteTo(w) if wErr != nil { errs[index] = wErr return @@ -220,8 +250,10 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora }(index, disk) } + // Wait for all the writes to finish. wg.Wait() + // Return for first error encountered. for _, err = range errs { if err != nil { return err @@ -235,11 +267,17 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...StorageAPI) error { var errs = make([]error, len(storageDisks)) var wg = &sync.WaitGroup{} + + // Construct uploadIDPath. + uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) + + // Cleanup uploadID for all disks. for index, disk := range storageDisks { wg.Add(1) + // Cleanup each uploadID in a routine. go func(index int, disk StorageAPI) { defer wg.Done() - err := cleanupDir(disk, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID)) + err := cleanupDir(disk, minioMetaBucket, uploadIDPath) if err != nil { errs[index] = err return @@ -247,8 +285,11 @@ func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...Stora errs[index] = nil }(index, disk) } + + // Wait for all the cleanups to finish. wg.Wait() + // Return first error. for _, err := range errs { if err != nil { return err @@ -257,6 +298,40 @@ func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...Stora return nil } +// listMultipartUploadIDs - list all the upload ids from a marker up to 'count'. +func listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count int, disk StorageAPI) ([]uploadMetadata, bool, error) { + var uploads []uploadMetadata + // Read `uploads.json`. + uploadsJSON, err := readUploadsJSON(bucketName, objectName, disk) + if err != nil { + return nil, false, err + } + index := 0 + if uploadIDMarker != "" { + for ; index < len(uploadsJSON.Uploads); index++ { + if uploadsJSON.Uploads[index].UploadID == uploadIDMarker { + // Skip the uploadID as it would already be listed in previous listing. + index++ + break + } + } + } + for index < len(uploadsJSON.Uploads) { + uploads = append(uploads, uploadMetadata{ + Object: objectName, + UploadID: uploadsJSON.Uploads[index].UploadID, + Initiated: uploadsJSON.Uploads[index].Initiated, + }) + count-- + index++ + if count == 0 { + break + } + } + end := (index == len(uploadsJSON.Uploads)) + return uploads, end, nil +} + // Returns if the prefix is a multipart upload. func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool { disk := xl.getRandomDisk() // Choose a random disk. @@ -265,49 +340,18 @@ func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool { } // listUploadsInfo - list all uploads info. -func (xl xlObjects) listUploadsInfo(prefixPath string) (uploads []uploadInfo, err error) { +func (xl xlObjects) listUploadsInfo(prefixPath string) (uploadsInfo []uploadInfo, err error) { disk := xl.getRandomDisk() // Choose a random disk on each attempt. splitPrefixes := strings.SplitN(prefixPath, "/", 3) - uploadIDs, err := getUploadIDs(splitPrefixes[1], splitPrefixes[2], disk) + uploadsJSON, err := readUploadsJSON(splitPrefixes[1], splitPrefixes[2], disk) if err != nil { if err == errFileNotFound { return []uploadInfo{}, nil } return nil, err } - uploads = uploadIDs.Uploads - return uploads, nil -} - -func (xl xlObjects) listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count int) ([]uploadMetadata, bool, error) { - var uploads []uploadMetadata - uploadsJSONContent, err := getUploadIDs(bucketName, objectName, xl.getRandomDisk()) - if err != nil { - return nil, false, err - } - index := 0 - if uploadIDMarker != "" { - for ; index < len(uploadsJSONContent.Uploads); index++ { - if uploadsJSONContent.Uploads[index].UploadID == uploadIDMarker { - // Skip the uploadID as it would already be listed in previous listing. - index++ - break - } - } - } - for index < len(uploadsJSONContent.Uploads) { - uploads = append(uploads, uploadMetadata{ - Object: objectName, - UploadID: uploadsJSONContent.Uploads[index].UploadID, - Initiated: uploadsJSONContent.Uploads[index].Initiated, - }) - count-- - index++ - if count == 0 { - break - } - } - return uploads, index == len(uploadsJSONContent.Uploads), nil + uploadsInfo = uploadsJSON.Uploads + return uploadsInfo, nil } // listMultipartUploadsCommon - lists all multipart uploads, common @@ -381,7 +425,7 @@ func (xl xlObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, upload var err error var eof bool if uploadIDMarker != "" { - uploads, _, err = xl.listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads) + uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, xl.getRandomDisk()) if err != nil { return ListMultipartsInfo{}, err } @@ -422,15 +466,15 @@ func (xl xlObjects) listMultipartUploadsCommon(bucket, prefix, keyMarker, upload } continue } - var tmpUploads []uploadMetadata + var newUploads []uploadMetadata var end bool uploadIDMarker = "" - tmpUploads, end, err = xl.listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads) + newUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, xl.getRandomDisk()) if err != nil { return ListMultipartsInfo{}, err } - uploads = append(uploads, tmpUploads...) - maxUploads -= len(tmpUploads) + uploads = append(uploads, newUploads...) + maxUploads -= len(newUploads) if walkResult.end && end { eof = true break diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index 6e616e747..d66e31815 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -416,14 +416,14 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Validate if there are other incomplete upload-id's present for // the object, if yes do not attempt to delete 'uploads.json'. - uploadIDs, err := getUploadIDs(bucket, object, xl.storageDisks...) + uploadsJSON, err := readUploadsJSON(bucket, object, xl.storageDisks...) if err == nil { - uploadIDIdx := uploadIDs.Index(uploadID) + uploadIDIdx := uploadsJSON.Index(uploadID) if uploadIDIdx != -1 { - uploadIDs.Uploads = append(uploadIDs.Uploads[:uploadIDIdx], uploadIDs.Uploads[uploadIDIdx+1:]...) + uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...) } - if len(uploadIDs.Uploads) > 0 { - if err = updateUploadJSON(bucket, object, uploadIDs, xl.storageDisks...); err != nil { + if len(uploadsJSON.Uploads) > 0 { + if err = updateUploadsJSON(bucket, object, uploadsJSON, xl.storageDisks...); err != nil { return "", err } return s3MD5, nil @@ -461,20 +461,21 @@ func (xl xlObjects) abortMultipartUploadCommon(bucket, object, uploadID string) // Cleanup all uploaded parts. if err := cleanupUploadedParts(bucket, object, uploadID, xl.storageDisks...); err != nil { - return err + return toObjectErr(err, bucket, object) } // Validate if there are other incomplete upload-id's present for // the object, if yes do not attempt to delete 'uploads.json'. - uploadIDs, err := getUploadIDs(bucket, object, xl.storageDisks...) + uploadsJSON, err := readUploadsJSON(bucket, object, xl.storageDisks...) if err == nil { - uploadIDIdx := uploadIDs.Index(uploadID) + uploadIDIdx := uploadsJSON.Index(uploadID) if uploadIDIdx != -1 { - uploadIDs.Uploads = append(uploadIDs.Uploads[:uploadIDIdx], uploadIDs.Uploads[uploadIDIdx+1:]...) + uploadsJSON.Uploads = append(uploadsJSON.Uploads[:uploadIDIdx], uploadsJSON.Uploads[uploadIDIdx+1:]...) } - if len(uploadIDs.Uploads) > 0 { - if err = updateUploadJSON(bucket, object, uploadIDs, xl.storageDisks...); err != nil { - return err + if len(uploadsJSON.Uploads) > 0 { + err = updateUploadsJSON(bucket, object, uploadsJSON, xl.storageDisks...) + if err != nil { + return toObjectErr(err, bucket, object) } return nil }