Independent Multipart Uploads (#15346)

Do completely independent multipart uploads.

In distributed mode, a lock was held to merge each multipart 
upload as it was added. This lock was highly contested and 
retries are expensive (timewise) in distributed mode.

Instead, each part adds its metadata information uniquely. 
This eliminates the per object lock required for each to merge.
The metadata is read back and merged by "CompleteMultipartUpload" 
without locks when constructing final object.

Co-authored-by: Harshavardhana <harsha@minio.io>
This commit is contained in:
Klaus Post
2022-07-19 08:35:29 -07:00
committed by GitHub
parent 242d06274a
commit f939d1c183
17 changed files with 2134 additions and 931 deletions

View File

@@ -76,6 +76,30 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object
return err
}
// Removes part.meta given by partName belonging to a mulitpart upload from minioMetaBucket
func (er erasureObjects) removePartMeta(bucket, object, uploadID, dataDir string, partNumber int) {
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
curpartPath := pathJoin(uploadIDPath, dataDir, fmt.Sprintf("part.%d", partNumber))
storageDisks := er.getDisks()
g := errgroup.WithNErrs(len(storageDisks))
for index, disk := range storageDisks {
if disk == nil {
continue
}
index := index
g.Go(func() error {
_ = storageDisks[index].Delete(context.TODO(), minioMetaMultipartBucket, curpartPath+".meta", DeleteOptions{
Recursive: false,
Force: false,
})
return nil
}, index)
}
g.Wait()
}
// Removes part given by partName belonging to a mulitpart upload from minioMetaBucket
func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir string, partNumber int) {
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
@@ -96,6 +120,11 @@ func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir stri
Recursive: false,
Force: false,
})
_ = storageDisks[index].Delete(context.TODO(), minioMetaMultipartBucket, curpartPath+".meta", DeleteOptions{
Recursive: false,
Force: false,
})
return nil
}, index)
}
@@ -453,6 +482,47 @@ func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, ds
return evalDisks(disks, errs), err
}
// writeAllDisks - writes 'b' to all provided disks.
// If write cannot reach quorum, the files will be deleted from all disks.
func writeAllDisks(ctx context.Context, disks []StorageAPI, dstBucket, dstEntry string, b []byte, writeQuorum int) ([]StorageAPI, error) {
g := errgroup.WithNErrs(len(disks))
// Write file to all underlying storage disks.
for index := range disks {
index := index
g.Go(func() error {
if disks[index] == nil {
return errDiskNotFound
}
return disks[index].WriteAll(ctx, dstBucket, dstEntry, b)
}, index)
}
// Wait for all renames to finish.
errs := g.Wait()
// We can safely allow RenameFile errors up to len(er.getDisks()) - writeQuorum
// otherwise return failure. Cleanup successful renames.
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
if err == errErasureWriteQuorum {
// Remove all written
g := errgroup.WithNErrs(len(disks))
for index := range disks {
if disks[index] == nil || errs[index] != nil {
continue
}
index := index
g.Go(func() error {
return disks[index].Delete(ctx, dstBucket, dstEntry, DeleteOptions{Force: true})
}, index)
}
// Ignore these errors.
g.WaitErr()
}
return evalDisks(disks, errs), err
}
// PutObjectPart - reads incoming stream and internally erasure codes
// them. This call is similar to single put operation but it is part
// of the multipart transaction.
@@ -479,11 +549,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
return PartInfo{}, err
}
rctx := rlkctx.Context()
defer func() {
if uploadIDRLock != nil {
uploadIDRLock.RUnlock(rlkctx.Cancel)
}
}()
defer uploadIDRLock.RUnlock(rlkctx.Cancel)
data := r.Reader
// Validate input data size and it can never be less than zero.
@@ -507,10 +573,6 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
partsMetadata, errs = readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket,
uploadIDPath, "", false)
// Unlock upload id locks before, so others can get it.
uploadIDRLock.RUnlock(rlkctx.Cancel)
uploadIDRLock = nil
// get Quorum for this object
_, writeQuorum, err := objectQuorumFromMeta(pctx, partsMetadata, errs, er.defaultParityCount)
if err != nil {
@@ -617,50 +679,21 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
}
}
// Acquire write lock to update metadata.
uploadIDWLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
wlkctx, err := uploadIDWLock.GetLock(pctx, globalOperationTimeout)
if err != nil {
return PartInfo{}, err
}
wctx := wlkctx.Context()
defer uploadIDWLock.Unlock(wlkctx.Cancel)
// Validates if upload ID exists.
if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil {
return pi, toObjectErr(err, bucket, object, uploadID)
}
// Rename temporary part file to its final location.
partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix)
onlineDisks, err = renamePart(wctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, writeQuorum)
onlineDisks, err = renamePart(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, writeQuorum)
if err != nil {
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
}
// Read metadata again because it might be updated with parallel upload of another part.
partsMetadata, errs = readAllFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
reducedErr = reduceWriteQuorumErrs(wctx, errs, objectOpIgnoredErrs, writeQuorum)
if reducedErr == errErasureWriteQuorum {
return pi, toObjectErr(reducedErr, bucket, object)
}
// Get current highest version based on re-read partsMetadata.
onlineDisks, modTime = listOnlineDisks(onlineDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
fi, err = pickValidFileInfo(wctx, partsMetadata, modTime, writeQuorum)
if err != nil {
return pi, err
}
// Once part is successfully committed, proceed with updating erasure metadata.
fi.ModTime = UTCNow()
md5hex := r.MD5CurrentHexString()
if opts.PreserveETag != "" {
md5hex = opts.PreserveETag
}
// Once part is successfully committed, proceed with saving erasure metadata for part.
fi.ModTime = UTCNow()
var index []byte
if opts.IndexCB != nil {
index = opts.IndexCB()
@@ -669,27 +702,18 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
// Add the current part.
fi.AddObjectPart(partID, md5hex, n, data.ActualSize(), index)
for i, disk := range onlineDisks {
if disk == OfflineDisk {
continue
}
partsMetadata[i].Size = fi.Size
partsMetadata[i].ModTime = fi.ModTime
partsMetadata[i].Parts = fi.Parts
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
PartNumber: partID,
Algorithm: DefaultBitrotAlgorithm,
Hash: bitrotWriterSum(writers[i]),
})
// Save part info as partPath+".meta"
fiMsg, err := fi.MarshalMsg(nil)
if err != nil {
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
}
// Writes update `xl.meta` format for each disk.
if _, err = writeUniqueFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
// Write part metadata to all disks.
onlineDisks, err = writeAllDisks(ctx, onlineDisks, minioMetaMultipartBucket, partPath+".meta", fiMsg, writeQuorum)
if err != nil {
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
}
online = countOnlineDisks(onlineDisks)
// Return success.
return PartInfo{
PartNumber: partID,
@@ -704,6 +728,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
// by callers to verify object states
// - encrypted
// - compressed
// Does not contain currently uploaded parts by design.
func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) {
auditObjectErasureSet(ctx, object, &er)
@@ -795,7 +820,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath)
}
_, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum)
@@ -803,6 +828,24 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
return result, err
}
// Read Part info for all parts
partPath := pathJoin(uploadIDPath, fi.DataDir) + "/"
req := ReadMultipleReq{
Bucket: minioMetaMultipartBucket,
Prefix: partPath,
MaxSize: 1 << 20, // Each part should realistically not be > 1MiB.
}
// Parts are 1 based, so index 0 is part one, etc.
for i := 1; i <= maxPartsList; i++ {
req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", i))
}
partInfoFiles, err := readMultipleFiles(ctx, onlineDisks, req, writeQuorum)
if err != nil {
return result, err
}
// Populate the result stub.
result.Bucket = bucket
result.Object = object
@@ -812,7 +855,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
result.UserDefined = cloneMSS(fi.Metadata)
// For empty number of parts or maxParts as zero, return right here.
if len(fi.Parts) == 0 || maxParts == 0 {
if len(partInfoFiles) == 0 || maxParts == 0 {
return result, nil
}
@@ -821,6 +864,28 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
maxParts = maxPartsList
}
var partFI FileInfo
for i, part := range partInfoFiles {
if part.Error != "" || !part.Exists {
continue
}
_, err := partFI.UnmarshalMsg(part.Data)
if err != nil {
// Maybe crash or similar.
logger.LogIf(ctx, err)
continue
}
if len(partFI.Parts) != 1 {
logger.LogIf(ctx, fmt.Errorf("unexpected part count: %d", len(partFI.Parts)))
continue
}
addPart := partFI.Parts[0]
// Add the current part.
fi.AddObjectPart(i+1, addPart.ETag, addPart.Size, addPart.ActualSize, addPart.Index)
}
// Only parts with higher part numbers will be listed.
partIdx := objectPartIndex(fi.Parts, partNumberMarker)
parts := fi.Parts
@@ -860,17 +925,17 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) {
auditObjectErasureSet(ctx, object, &er)
// Hold read-locks to verify uploaded parts, also disallows
// parallel part uploads as well.
// Hold write locks to verify uploaded parts, also disallows any
// parallel PutObjectPart() requests.
uploadIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID))
rlkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
wlkctx, err := uploadIDLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return oi, err
}
rctx := rlkctx.Context()
defer uploadIDLock.RUnlock(rlkctx.Cancel)
wctx := wlkctx.Context()
defer uploadIDLock.Unlock(wlkctx.Cancel)
if err = er.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil {
if err = er.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil {
return oi, toObjectErr(err, bucket, object, uploadID)
}
@@ -879,15 +944,15 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
storageDisks := er.getDisks()
// Read metadata associated with the object from all disks.
partsMetadata, errs := readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
partsMetadata, errs := readAllFileInfo(wctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
// get Quorum for this object
_, writeQuorum, err := objectQuorumFromMeta(rctx, partsMetadata, errs, er.defaultParityCount)
_, writeQuorum, err := objectQuorumFromMeta(wctx, partsMetadata, errs, er.defaultParityCount)
if err != nil {
return oi, toObjectErr(err, bucket, object)
}
reducedErr := reduceWriteQuorumErrs(rctx, errs, objectOpIgnoredErrs, writeQuorum)
reducedErr := reduceWriteQuorumErrs(wctx, errs, objectOpIgnoredErrs, writeQuorum)
if reducedErr == errErasureWriteQuorum {
return oi, toObjectErr(reducedErr, bucket, object)
}
@@ -895,11 +960,59 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
fi, err := pickValidFileInfo(rctx, partsMetadata, modTime, writeQuorum)
fi, err := pickValidFileInfo(wctx, partsMetadata, modTime, writeQuorum)
if err != nil {
return oi, err
}
// Read Part info for all parts
partPath := pathJoin(uploadIDPath, fi.DataDir) + "/"
req := ReadMultipleReq{
Bucket: minioMetaMultipartBucket,
Prefix: partPath,
MaxSize: 1 << 20, // Each part should realistically not be > 1MiB.
}
for _, part := range parts {
req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", part.PartNumber))
}
partInfoFiles, err := readMultipleFiles(ctx, onlineDisks, req, writeQuorum)
if err != nil {
return oi, err
}
if len(partInfoFiles) != len(parts) {
// Should only happen through internal error
err := fmt.Errorf("unexpected part result count: %d, want %d", len(partInfoFiles), len(parts))
logger.LogIf(ctx, err)
return oi, toObjectErr(err, bucket, object)
}
var partFI FileInfo
for i, part := range partInfoFiles {
partID := parts[i].PartNumber
if part.Error != "" || !part.Exists {
return oi, InvalidPart{
PartNumber: partID,
}
}
_, err := partFI.UnmarshalMsg(part.Data)
if err != nil {
// Maybe crash or similar.
logger.LogIf(ctx, err)
return oi, InvalidPart{
PartNumber: partID,
}
}
if len(partFI.Parts) != 1 {
logger.LogIf(ctx, fmt.Errorf("unexpected part count: %d", len(partFI.Parts)))
return oi, InvalidPart{
PartNumber: partID,
}
}
addPart := partFI.Parts[0]
// Add the current part.
fi.AddObjectPart(partID, addPart.ETag, addPart.Size, addPart.ActualSize, addPart.Index)
}
// Calculate full object size.
var objectSize int64
@@ -939,7 +1052,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
return oi, invp
}
// All parts except the last part has to be atleast 5MB.
// All parts except the last part has to be at least 5MB.
if (i < len(parts)-1) && !isMinAllowedPartSize(currentFI.Parts[partIdx].ActualSize) {
return oi, PartTooSmall{
PartNumber: part.PartNumber,
@@ -1018,6 +1131,11 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
}
}
// Remove part.meta which is not needed anymore.
for _, part := range fi.Parts {
er.removePartMeta(bucket, object, uploadID, fi.DataDir, part.Number)
}
// Rename the multipart object to final location.
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath,
partsMetadata, bucket, object, writeQuorum); err != nil {