mirror of
https://github.com/minio/minio.git
synced 2025-11-10 05:59:43 -05:00
Flat multipart backend implementation for Erasure backend (#5447)
This commit is contained in:
committed by
kannappanr
parent
0e4431725c
commit
9083bc152e
@@ -20,6 +20,7 @@ import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -29,158 +30,17 @@ import (
|
||||
"github.com/minio/minio/pkg/mimedb"
|
||||
)
|
||||
|
||||
// updateUploadJSON - add or remove upload ID info in all `uploads.json`.
|
||||
func (xl xlObjects) updateUploadJSON(bucket, object, uploadID string, initiated time.Time, writeQuorum int, isRemove bool) error {
|
||||
uploadsPath := path.Join(bucket, object, uploadsJSONFile)
|
||||
tmpUploadsPath := mustGetUUID()
|
||||
|
||||
// slice to store errors from disks
|
||||
errs := make([]error, len(xl.getDisks()))
|
||||
// slice to store if it is a delete operation on a disk
|
||||
isDelete := make([]bool, len(xl.getDisks()))
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
for index, disk := range xl.getDisks() {
|
||||
if disk == nil {
|
||||
errs[index] = errors.Trace(errDiskNotFound)
|
||||
continue
|
||||
}
|
||||
// Update `uploads.json` in a go routine.
|
||||
wg.Add(1)
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
|
||||
// read and parse uploads.json on this disk
|
||||
uploadsJSON, err := readUploadsJSON(bucket, object, disk)
|
||||
if errors.Cause(err) == errFileNotFound {
|
||||
// If file is not found, we assume an
|
||||
// default (empty) upload info.
|
||||
uploadsJSON, err = newUploadsV1("xl"), nil
|
||||
}
|
||||
// If we have a read error, we store error and
|
||||
// exit.
|
||||
if err != nil {
|
||||
errs[index] = err
|
||||
return
|
||||
}
|
||||
|
||||
if !isRemove {
|
||||
// Add the uploadID
|
||||
uploadsJSON.AddUploadID(uploadID, initiated)
|
||||
} else {
|
||||
// Remove the upload ID
|
||||
uploadsJSON.RemoveUploadID(uploadID)
|
||||
if len(uploadsJSON.Uploads) == 0 {
|
||||
isDelete[index] = true
|
||||
}
|
||||
}
|
||||
|
||||
// For delete, rename to tmp, for the
|
||||
// possibility of recovery in case of quorum
|
||||
// failure.
|
||||
if !isDelete[index] {
|
||||
errs[index] = writeUploadJSON(&uploadsJSON, uploadsPath, tmpUploadsPath, disk)
|
||||
} else {
|
||||
wErr := disk.RenameFile(minioMetaMultipartBucket, uploadsPath, minioMetaTmpBucket, tmpUploadsPath)
|
||||
if wErr != nil {
|
||||
errs[index] = errors.Trace(wErr)
|
||||
}
|
||||
|
||||
}
|
||||
}(index, disk)
|
||||
}
|
||||
|
||||
// Wait for all the writes to finish.
|
||||
wg.Wait()
|
||||
|
||||
err := reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, writeQuorum)
|
||||
if errors.Cause(err) == errXLWriteQuorum {
|
||||
// No quorum. Perform cleanup on the minority of disks
|
||||
// on which the operation succeeded.
|
||||
|
||||
// There are two cases:
|
||||
//
|
||||
// 1. uploads.json file was updated -> we delete the
|
||||
// file that we successfully overwrote on the
|
||||
// minority of disks, so that the failed quorum
|
||||
// operation is not partially visible.
|
||||
//
|
||||
// 2. uploads.json was deleted -> in this case since
|
||||
// the delete failed, we restore from tmp.
|
||||
for index, disk := range xl.getDisks() {
|
||||
if disk == nil || errs[index] != nil {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
if !isDelete[index] {
|
||||
_ = disk.DeleteFile(
|
||||
minioMetaMultipartBucket,
|
||||
uploadsPath,
|
||||
)
|
||||
} else {
|
||||
_ = disk.RenameFile(
|
||||
minioMetaTmpBucket, tmpUploadsPath,
|
||||
minioMetaMultipartBucket, uploadsPath,
|
||||
)
|
||||
}
|
||||
}(index, disk)
|
||||
}
|
||||
wg.Wait()
|
||||
return err
|
||||
}
|
||||
|
||||
// we do have quorum, so in case of delete upload.json file
|
||||
// operation, we purge from tmp.
|
||||
for index, disk := range xl.getDisks() {
|
||||
if disk == nil || !isDelete[index] {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
// isDelete[index] = true at this point.
|
||||
_ = disk.DeleteFile(minioMetaTmpBucket, tmpUploadsPath)
|
||||
}(index, disk)
|
||||
}
|
||||
wg.Wait()
|
||||
return err
|
||||
func (xl xlObjects) getUploadIDDir(bucket, object, uploadID string) string {
|
||||
return pathJoin(xl.getMultipartSHADir(bucket, object), uploadID)
|
||||
}
|
||||
|
||||
// addUploadID - add upload ID and its initiated time to 'uploads.json'.
|
||||
func (xl xlObjects) addUploadID(bucket, object string, uploadID string, initiated time.Time, writeQuorum int) error {
|
||||
return xl.updateUploadJSON(bucket, object, uploadID, initiated, writeQuorum, false)
|
||||
}
|
||||
|
||||
// removeUploadID - remove upload ID in 'uploads.json'.
|
||||
func (xl xlObjects) removeUploadID(bucket, object string, uploadID string, writeQuorum int) error {
|
||||
return xl.updateUploadJSON(bucket, object, uploadID, time.Time{}, writeQuorum, true)
|
||||
}
|
||||
|
||||
// Returns if the prefix is a multipart upload.
|
||||
func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool {
|
||||
for _, disk := range xl.getLoadBalancedDisks() {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
_, err := disk.StatFile(bucket, pathJoin(prefix, uploadsJSONFile))
|
||||
if err == nil {
|
||||
return true
|
||||
}
|
||||
// For any reason disk was deleted or goes offline, continue
|
||||
if errors.IsErrIgnored(err, objMetadataOpIgnoredErrs...) {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
return false
|
||||
func (xl xlObjects) getMultipartSHADir(bucket, object string) string {
|
||||
return getSHA256Hash([]byte(pathJoin(bucket, object)))
|
||||
}
|
||||
|
||||
// isUploadIDExists - verify if a given uploadID exists and is valid.
|
||||
func (xl xlObjects) isUploadIDExists(bucket, object, uploadID string) bool {
|
||||
uploadIDPath := path.Join(bucket, object, uploadID)
|
||||
return xl.isObject(minioMetaMultipartBucket, uploadIDPath)
|
||||
return xl.isObject(minioMetaMultipartBucket, xl.getUploadIDDir(bucket, object, uploadID))
|
||||
}
|
||||
|
||||
// Removes part given by partName belonging to a mulitpart upload from minioMetaBucket
|
||||
@@ -206,7 +66,7 @@ func (xl xlObjects) removeObjectPart(bucket, object, uploadID, partName string)
|
||||
// statPart - returns fileInfo structure for a successful stat on part file.
|
||||
func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInfo FileInfo, err error) {
|
||||
var ignoredErrs []error
|
||||
partNamePath := path.Join(bucket, object, uploadID, partName)
|
||||
partNamePath := path.Join(xl.getUploadIDDir(bucket, object, uploadID), partName)
|
||||
for _, disk := range xl.getLoadBalancedDisks() {
|
||||
if disk == nil {
|
||||
ignoredErrs = append(ignoredErrs, errDiskNotFound)
|
||||
@@ -271,174 +131,6 @@ func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPr
|
||||
return evalDisks(disks, mErrs), err
|
||||
}
|
||||
|
||||
// listMultipartUploadsCleanup - lists all multipart uploads. Called by xl.cleanupStaleMultipartUpload()
|
||||
func (xl xlObjects) listMultipartUploadsCleanup(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) {
|
||||
result := ListMultipartsInfo{
|
||||
IsTruncated: true,
|
||||
MaxUploads: maxUploads,
|
||||
KeyMarker: keyMarker,
|
||||
Prefix: prefix,
|
||||
Delimiter: delimiter,
|
||||
}
|
||||
|
||||
recursive := true
|
||||
if delimiter == slashSeparator {
|
||||
recursive = false
|
||||
}
|
||||
|
||||
// Not using path.Join() as it strips off the trailing '/'.
|
||||
multipartPrefixPath := pathJoin(bucket, prefix)
|
||||
if prefix == "" {
|
||||
// Should have a trailing "/" if prefix is ""
|
||||
// For ex. multipartPrefixPath should be "multipart/bucket/" if prefix is ""
|
||||
multipartPrefixPath += slashSeparator
|
||||
}
|
||||
multipartMarkerPath := ""
|
||||
if keyMarker != "" {
|
||||
multipartMarkerPath = pathJoin(bucket, keyMarker)
|
||||
}
|
||||
var uploads []MultipartInfo
|
||||
var err error
|
||||
var eof bool
|
||||
// List all upload ids for the keyMarker starting from
|
||||
// uploadIDMarker first.
|
||||
if uploadIDMarker != "" {
|
||||
// hold lock on keyMarker path
|
||||
keyMarkerLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, keyMarker))
|
||||
if err = keyMarkerLock.GetRLock(globalListingTimeout); err != nil {
|
||||
return lmi, err
|
||||
}
|
||||
for _, disk := range xl.getLoadBalancedDisks() {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
uploads, _, err = xl.listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, disk)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if errors.IsErrIgnored(err, objMetadataOpIgnoredErrs...) {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
keyMarkerLock.RUnlock()
|
||||
if err != nil {
|
||||
return lmi, err
|
||||
}
|
||||
maxUploads = maxUploads - len(uploads)
|
||||
}
|
||||
var walkerCh chan treeWalkResult
|
||||
var walkerDoneCh chan struct{}
|
||||
heal := false // true only for xl.ListObjectsHeal
|
||||
// Validate if we need to list further depending on maxUploads.
|
||||
if maxUploads > 0 {
|
||||
walkerCh, walkerDoneCh = xl.listPool.Release(listParams{minioMetaMultipartBucket, recursive, multipartMarkerPath, multipartPrefixPath, heal})
|
||||
if walkerCh == nil {
|
||||
walkerDoneCh = make(chan struct{})
|
||||
isLeaf := xl.isMultipartUpload
|
||||
listDir := listDirFactory(isLeaf, xlTreeWalkIgnoredErrs, xl.getLoadBalancedDisks()...)
|
||||
walkerCh = startTreeWalk(minioMetaMultipartBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, isLeaf, walkerDoneCh)
|
||||
}
|
||||
// Collect uploads until we have reached maxUploads count to 0.
|
||||
for maxUploads > 0 {
|
||||
walkResult, ok := <-walkerCh
|
||||
if !ok {
|
||||
// Closed channel.
|
||||
eof = true
|
||||
break
|
||||
}
|
||||
// For any walk error return right away.
|
||||
if walkResult.err != nil {
|
||||
return lmi, walkResult.err
|
||||
}
|
||||
entry := strings.TrimPrefix(walkResult.entry, retainSlash(bucket))
|
||||
// For an entry looking like a directory, store and
|
||||
// continue the loop not need to fetch uploads.
|
||||
if hasSuffix(walkResult.entry, slashSeparator) {
|
||||
uploads = append(uploads, MultipartInfo{
|
||||
Object: entry,
|
||||
})
|
||||
maxUploads--
|
||||
if maxUploads == 0 {
|
||||
eof = true
|
||||
break
|
||||
}
|
||||
continue
|
||||
}
|
||||
var newUploads []MultipartInfo
|
||||
var end bool
|
||||
uploadIDMarker = ""
|
||||
|
||||
// For the new object entry we get all its
|
||||
// pending uploadIDs.
|
||||
entryLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, entry))
|
||||
if err = entryLock.GetRLock(globalListingTimeout); err != nil {
|
||||
return lmi, err
|
||||
}
|
||||
var disk StorageAPI
|
||||
for _, disk = range xl.getLoadBalancedDisks() {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
newUploads, end, err = xl.listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, disk)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if errors.IsErrIgnored(err, objMetadataOpIgnoredErrs...) {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
entryLock.RUnlock()
|
||||
if err != nil {
|
||||
if errors.IsErrIgnored(err, xlTreeWalkIgnoredErrs...) {
|
||||
continue
|
||||
}
|
||||
return lmi, err
|
||||
}
|
||||
uploads = append(uploads, newUploads...)
|
||||
maxUploads -= len(newUploads)
|
||||
if end && walkResult.end {
|
||||
eof = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
// For all received uploads fill in the multiparts result.
|
||||
for _, upload := range uploads {
|
||||
var objectName string
|
||||
var uploadID string
|
||||
if hasSuffix(upload.Object, slashSeparator) {
|
||||
// All directory entries are common prefixes.
|
||||
uploadID = "" // For common prefixes, upload ids are empty.
|
||||
objectName = upload.Object
|
||||
result.CommonPrefixes = append(result.CommonPrefixes, objectName)
|
||||
} else {
|
||||
uploadID = upload.UploadID
|
||||
objectName = upload.Object
|
||||
result.Uploads = append(result.Uploads, upload)
|
||||
}
|
||||
result.NextKeyMarker = objectName
|
||||
result.NextUploadIDMarker = uploadID
|
||||
}
|
||||
|
||||
if !eof {
|
||||
// Save the go-routine state in the pool so that it can continue from where it left off on
|
||||
// the next request.
|
||||
xl.listPool.Set(listParams{bucket, recursive, result.NextKeyMarker, prefix, heal}, walkerCh, walkerDoneCh)
|
||||
}
|
||||
|
||||
result.IsTruncated = !eof
|
||||
// Result is not truncated, reset the markers.
|
||||
if !result.IsTruncated {
|
||||
result.NextKeyMarker = ""
|
||||
result.NextUploadIDMarker = ""
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// ListMultipartUploads - lists all the pending multipart
|
||||
// uploads for a particular object in a bucket.
|
||||
//
|
||||
@@ -446,14 +138,11 @@ func (xl xlObjects) listMultipartUploadsCleanup(bucket, prefix, keyMarker, uploa
|
||||
// not support prefix based listing, this is a deliberate attempt
|
||||
// towards simplification of multipart APIs.
|
||||
// The resulting ListMultipartsInfo structure is unmarshalled directly as XML.
|
||||
func (xl xlObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) {
|
||||
func (xl xlObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, e error) {
|
||||
if err := checkListMultipartArgs(bucket, object, keyMarker, uploadIDMarker, delimiter, xl); err != nil {
|
||||
return lmi, err
|
||||
return result, err
|
||||
}
|
||||
|
||||
result := ListMultipartsInfo{}
|
||||
|
||||
result.IsTruncated = true
|
||||
result.MaxUploads = maxUploads
|
||||
result.KeyMarker = keyMarker
|
||||
result.Prefix = object
|
||||
@@ -463,31 +152,22 @@ func (xl xlObjects) ListMultipartUploads(bucket, object, keyMarker, uploadIDMark
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
// Hold the lock so that two parallel complete-multipart-uploads
|
||||
// do not leave a stale uploads.json behind.
|
||||
objectMPartPathLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object))
|
||||
if err := objectMPartPathLock.GetRLock(globalListingTimeout); err != nil {
|
||||
return lmi, err
|
||||
}
|
||||
defer objectMPartPathLock.RUnlock()
|
||||
uploads, _, err := xl.listMultipartUploadIDs(bucket, object, uploadIDMarker, maxUploads, disk)
|
||||
uploadIDs, err := disk.ListDir(minioMetaMultipartBucket, xl.getMultipartSHADir(bucket, object))
|
||||
if err != nil {
|
||||
return lmi, err
|
||||
if err == errFileNotFound {
|
||||
return result, nil
|
||||
}
|
||||
return result, errors.Trace(err)
|
||||
}
|
||||
|
||||
result.NextKeyMarker = object
|
||||
// Loop through all the received uploads fill in the multiparts result.
|
||||
for _, upload := range uploads {
|
||||
uploadID := upload.UploadID
|
||||
result.Uploads = append(result.Uploads, upload)
|
||||
result.NextUploadIDMarker = uploadID
|
||||
for i := range uploadIDs {
|
||||
uploadIDs[i] = strings.TrimSuffix(uploadIDs[i], slashSeparator)
|
||||
}
|
||||
|
||||
result.IsTruncated = len(uploads) == maxUploads
|
||||
|
||||
if !result.IsTruncated {
|
||||
result.NextKeyMarker = ""
|
||||
result.NextUploadIDMarker = ""
|
||||
sort.Strings(uploadIDs)
|
||||
for _, uploadID := range uploadIDs {
|
||||
if len(result.Uploads) == maxUploads {
|
||||
break
|
||||
}
|
||||
result.Uploads = append(result.Uploads, MultipartInfo{Object: object, UploadID: uploadID})
|
||||
}
|
||||
break
|
||||
}
|
||||
@@ -527,17 +207,8 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
|
||||
xlMeta.Stat.ModTime = UTCNow()
|
||||
xlMeta.Meta = meta
|
||||
|
||||
// This lock needs to be held for any changes to the directory
|
||||
// contents of ".minio.sys/multipart/object/"
|
||||
objectMPartPathLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, object))
|
||||
if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer objectMPartPathLock.Unlock()
|
||||
|
||||
uploadID := mustGetUUID()
|
||||
uploadIDPath := path.Join(bucket, object, uploadID)
|
||||
uploadIDPath := xl.getUploadIDDir(bucket, object, uploadID)
|
||||
tempUploadIDPath := uploadID
|
||||
|
||||
// Write updated `xl.json` to all disks.
|
||||
@@ -556,11 +227,6 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
|
||||
return "", toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath)
|
||||
}
|
||||
|
||||
initiated := UTCNow()
|
||||
// Create or update 'uploads.json'
|
||||
if err = xl.addUploadID(bucket, object, uploadID, initiated, writeQuorum); err != nil {
|
||||
return "", err
|
||||
}
|
||||
// Return success.
|
||||
return uploadID, nil
|
||||
}
|
||||
@@ -637,17 +303,9 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d
|
||||
return pi, toObjectErr(errors.Trace(errInvalidArgument))
|
||||
}
|
||||
|
||||
// Hold the lock so that two parallel complete-multipart-uploads
|
||||
// do not leave a stale uploads.json behind.
|
||||
objectMPartPathLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object))
|
||||
if err := objectMPartPathLock.GetLock(globalOperationTimeout); err != nil {
|
||||
return pi, err
|
||||
}
|
||||
defer objectMPartPathLock.Unlock()
|
||||
|
||||
var partsMetadata []xlMetaV1
|
||||
var errs []error
|
||||
uploadIDPath := pathJoin(bucket, object, uploadID)
|
||||
uploadIDPath := xl.getUploadIDDir(bucket, object, uploadID)
|
||||
|
||||
// pre-check upload id lock.
|
||||
preUploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath)
|
||||
@@ -803,16 +461,35 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, d
|
||||
}, nil
|
||||
}
|
||||
|
||||
// listObjectParts - wrapper reading `xl.json` for a given object and
|
||||
// uploadID. Lists all the parts captured inside `xl.json` content.
|
||||
func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi ListPartsInfo, e error) {
|
||||
result := ListPartsInfo{}
|
||||
// ListObjectParts - lists all previously uploaded parts for a given
|
||||
// object and uploadID. Takes additional input of part-number-marker
|
||||
// to indicate where the listing should begin from.
|
||||
//
|
||||
// Implements S3 compatible ListObjectParts API. The resulting
|
||||
// ListPartsInfo structure is marshalled directly into XML and
|
||||
// replied back to the client.
|
||||
func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (result ListPartsInfo, e error) {
|
||||
if err := checkListPartsArgs(bucket, object, xl); err != nil {
|
||||
return result, err
|
||||
}
|
||||
// Hold lock so that there is no competing
|
||||
// abort-multipart-upload or complete-multipart-upload.
|
||||
uploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
xl.getUploadIDDir(bucket, object, uploadID))
|
||||
if err := uploadIDLock.GetLock(globalListingTimeout); err != nil {
|
||||
return result, err
|
||||
}
|
||||
defer uploadIDLock.Unlock()
|
||||
|
||||
uploadIDPath := path.Join(bucket, object, uploadID)
|
||||
if !xl.isUploadIDExists(bucket, object, uploadID) {
|
||||
return result, errors.Trace(InvalidUploadID{UploadID: uploadID})
|
||||
}
|
||||
|
||||
uploadIDPath := xl.getUploadIDDir(bucket, object, uploadID)
|
||||
|
||||
xlParts, xlMeta, err := xl.readXLMetaParts(minioMetaMultipartBucket, uploadIDPath)
|
||||
if err != nil {
|
||||
return lpi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
||||
return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
||||
}
|
||||
|
||||
// Populate the result stub.
|
||||
@@ -844,7 +521,7 @@ func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberM
|
||||
var fi FileInfo
|
||||
fi, err = xl.statPart(bucket, object, uploadID, part.Name)
|
||||
if err != nil {
|
||||
return lpi, toObjectErr(err, minioMetaBucket, path.Join(uploadID, part.Name))
|
||||
return result, toObjectErr(err, minioMetaBucket, path.Join(uploadID, part.Name))
|
||||
}
|
||||
result.Parts = append(result.Parts, PartInfo{
|
||||
PartNumber: part.Number,
|
||||
@@ -868,40 +545,6 @@ func (xl xlObjects) listObjectParts(bucket, object, uploadID string, partNumberM
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// ListObjectParts - lists all previously uploaded parts for a given
|
||||
// object and uploadID. Takes additional input of part-number-marker
|
||||
// to indicate where the listing should begin from.
|
||||
//
|
||||
// Implements S3 compatible ListObjectParts API. The resulting
|
||||
// ListPartsInfo structure is unmarshalled directly into XML and
|
||||
// replied back to the client.
|
||||
func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi ListPartsInfo, e error) {
|
||||
if err := checkListPartsArgs(bucket, object, xl); err != nil {
|
||||
return lpi, err
|
||||
}
|
||||
// Hold the lock so that two parallel complete-multipart-uploads
|
||||
// do not leave a stale uploads.json behind.
|
||||
objectMPartPathLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object))
|
||||
if err := objectMPartPathLock.GetRLock(globalListingTimeout); err != nil {
|
||||
return lpi, errors.Trace(err)
|
||||
}
|
||||
defer objectMPartPathLock.RUnlock()
|
||||
// Hold lock so that there is no competing
|
||||
// abort-multipart-upload or complete-multipart-upload.
|
||||
uploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, object, uploadID))
|
||||
if err := uploadIDLock.GetLock(globalListingTimeout); err != nil {
|
||||
return lpi, err
|
||||
}
|
||||
defer uploadIDLock.Unlock()
|
||||
|
||||
if !xl.isUploadIDExists(bucket, object, uploadID) {
|
||||
return lpi, errors.Trace(InvalidUploadID{UploadID: uploadID})
|
||||
}
|
||||
result, err := xl.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
|
||||
return result, err
|
||||
}
|
||||
|
||||
// CompleteMultipartUpload - completes an ongoing multipart
|
||||
// transaction after receiving all the parts indicated by the client.
|
||||
// Returns an md5sum calculated by concatenating all the individual
|
||||
@@ -918,14 +561,16 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
return oi, err
|
||||
}
|
||||
defer destLock.Unlock()
|
||||
|
||||
uploadIDPath := xl.getUploadIDDir(bucket, object, uploadID)
|
||||
|
||||
// Hold lock so that
|
||||
//
|
||||
// 1) no one aborts this multipart upload
|
||||
//
|
||||
// 2) no one does a parallel complete-multipart-upload on this
|
||||
// multipart upload
|
||||
uploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, object, uploadID))
|
||||
uploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath)
|
||||
if err := uploadIDLock.GetLock(globalOperationTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
@@ -947,8 +592,6 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
return oi, err
|
||||
}
|
||||
|
||||
uploadIDPath := pathJoin(bucket, object, uploadID)
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
partsMetadata, errs := readAllXLMetadata(xl.getDisks(), minioMetaMultipartBucket, uploadIDPath)
|
||||
|
||||
@@ -1034,7 +677,6 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
// Save successfully calculated md5sum.
|
||||
xlMeta.Meta["etag"] = s3MD5
|
||||
|
||||
uploadIDPath = path.Join(bucket, object, uploadID)
|
||||
tempUploadIDPath := uploadID
|
||||
|
||||
// Update all xl metadata, make sure to not modify fields like
|
||||
@@ -1090,21 +732,6 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
|
||||
return oi, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Hold the lock so that two parallel
|
||||
// complete-multipart-uploads do not leave a stale
|
||||
// uploads.json behind.
|
||||
objectMPartPathLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, object))
|
||||
if err = objectMPartPathLock.GetLock(globalOperationTimeout); err != nil {
|
||||
return oi, toObjectErr(err, bucket, object)
|
||||
}
|
||||
defer objectMPartPathLock.Unlock()
|
||||
|
||||
// remove entry from uploads.json with quorum
|
||||
if err = xl.removeUploadID(bucket, object, uploadID, writeQuorum); err != nil {
|
||||
return oi, toObjectErr(err, minioMetaMultipartBucket, path.Join(bucket, object))
|
||||
}
|
||||
|
||||
// Success, return object info.
|
||||
return xlMeta.ToObjectInfo(bucket, object), nil
|
||||
}
|
||||
@@ -1137,46 +764,6 @@ func (xl xlObjects) cleanupUploadedParts(uploadIDPath string, writeQuorum int) e
|
||||
return reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, writeQuorum)
|
||||
}
|
||||
|
||||
// abortMultipartUpload - wrapper for purging an ongoing multipart
|
||||
// transaction, deletes uploadID entry from `uploads.json` and purges
|
||||
// the directory at '.minio.sys/multipart/bucket/object/uploadID' holding
|
||||
// all the upload parts.
|
||||
func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err error) {
|
||||
// Construct uploadIDPath.
|
||||
uploadIDPath := path.Join(bucket, object, uploadID)
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
partsMetadata, errs := readAllXLMetadata(xl.getDisks(), minioMetaMultipartBucket, uploadIDPath)
|
||||
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err := objectQuorumFromMeta(xl, partsMetadata, errs)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Cleanup all uploaded parts.
|
||||
if err = xl.cleanupUploadedParts(uploadIDPath, writeQuorum); err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// hold lock so we don't compete with a complete, or abort
|
||||
// multipart request.
|
||||
objectMPartPathLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, object))
|
||||
if err = objectMPartPathLock.GetLock(globalOperationTimeout); err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
defer objectMPartPathLock.Unlock()
|
||||
|
||||
// remove entry from uploads.json with quorum
|
||||
if err = xl.removeUploadID(bucket, object, uploadID, writeQuorum); err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Successfully purged.
|
||||
return nil
|
||||
}
|
||||
|
||||
// AbortMultipartUpload - aborts an ongoing multipart operation
|
||||
// signified by the input uploadID. This is an atomic operation
|
||||
// doesn't require clients to initiate multiple such requests.
|
||||
@@ -1192,10 +779,11 @@ func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error
|
||||
if err := checkAbortMultipartArgs(bucket, object, xl); err != nil {
|
||||
return err
|
||||
}
|
||||
// Construct uploadIDPath.
|
||||
uploadIDPath := xl.getUploadIDDir(bucket, object, uploadID)
|
||||
// Hold lock so that there is no competing
|
||||
// complete-multipart-upload or put-object-part.
|
||||
uploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
pathJoin(bucket, object, uploadID))
|
||||
uploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath)
|
||||
if err := uploadIDLock.GetLock(globalOperationTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1204,5 +792,76 @@ func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error
|
||||
if !xl.isUploadIDExists(bucket, object, uploadID) {
|
||||
return errors.Trace(InvalidUploadID{UploadID: uploadID})
|
||||
}
|
||||
return xl.abortMultipartUpload(bucket, object, uploadID)
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
partsMetadata, errs := readAllXLMetadata(xl.getDisks(), minioMetaMultipartBucket, uploadIDPath)
|
||||
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err := objectQuorumFromMeta(xl, partsMetadata, errs)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Cleanup all uploaded parts.
|
||||
if err = xl.cleanupUploadedParts(uploadIDPath, writeQuorum); err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Successfully purged.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Clean-up the old multipart uploads. Should be run in a Go routine.
|
||||
func (xl xlObjects) cleanupStaleMultipartUploads(cleanupInterval, expiry time.Duration, doneCh chan struct{}) {
|
||||
ticker := time.NewTicker(cleanupInterval)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-doneCh:
|
||||
ticker.Stop()
|
||||
return
|
||||
case <-ticker.C:
|
||||
var disk StorageAPI
|
||||
for _, d := range xl.getLoadBalancedDisks() {
|
||||
if d != nil {
|
||||
disk = d
|
||||
break
|
||||
}
|
||||
}
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
xl.cleanupStaleMultipartUploadsOnDisk(disk, expiry)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the old multipart uploads on the given disk.
|
||||
func (xl xlObjects) cleanupStaleMultipartUploadsOnDisk(disk StorageAPI, expiry time.Duration) {
|
||||
now := time.Now()
|
||||
shaDirs, err := disk.ListDir(minioMetaMultipartBucket, "")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for _, shaDir := range shaDirs {
|
||||
uploadIDDirs, err := disk.ListDir(minioMetaMultipartBucket, shaDir)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
for _, uploadIDDir := range uploadIDDirs {
|
||||
uploadIDPath := pathJoin(shaDir, uploadIDDir)
|
||||
fi, err := disk.StatFile(minioMetaMultipartBucket, pathJoin(uploadIDPath, xlMetaJSONFile))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if now.Sub(fi.ModTime) > expiry {
|
||||
// Quorum value will need to be figured out using readAllXLMetadata() and objectQuorumFromMeta()
|
||||
// But we can avoid these calls as we do not care if xl.cleanupUploadedParts() meets quorum
|
||||
// when it removes files. We igore the error message from xl.cleanupUploadedParts() as we can't
|
||||
// return it to any client. Hence we set quorum to 0.
|
||||
quorum := 0
|
||||
xl.cleanupUploadedParts(uploadIDPath, quorum)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user