mirror of
https://github.com/minio/minio.git
synced 2024-12-24 22:25:54 -05:00
xl: Use random UUID during complete multipart upload (#7527)
One user has seen this following error log: API: CompleteMultipartUpload(bucket=vertica, object=perf-dss-v03/cc2/02596813aecd4e476d810148586c2a3300d00000013557ef_0.gt) Time: 15:44:07 UTC 04/11/2019 RequestID: 159475EFF4DEDFFB RemoteHost: 172.26.87.184 UserAgent: vertica-v9.1.1-5 Error: open /data/.minio.sys/tmp/100bb3ec-6c0d-4a37-8b36-65241050eb02/xl.json: file exists 1: cmd/xl-v1-metadata.go:448:cmd.writeXLMetadata() 2: cmd/xl-v1-metadata.go:501:cmd.writeUniqueXLMetadata.func1() This can happen when CompleteMultipartUpload fails with write quorum, the S3 client will retry (since write quorum is 500 http response), however the second call of CompleteMultipartUpload will fail because this latter doesn't truly use a random uuid under .minio.sys/tmp/ directory but pick the upload id. This commit fixes the behavior to choose a random uuid for generating xl.json
This commit is contained in:
parent
ae002aa724
commit
27ef1262bf
@ -425,6 +425,10 @@ func (xl xlObjects) healObject(ctx context.Context, bucket string, object string
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cleanup in case of xl.json writing failure
|
||||||
|
writeQuorum := latestMeta.Erasure.DataBlocks + 1
|
||||||
|
defer xl.deleteObject(ctx, minioMetaTmpBucket, tmpID, writeQuorum, false)
|
||||||
|
|
||||||
// Generate and write `xl.json` generated from other disks.
|
// Generate and write `xl.json` generated from other disks.
|
||||||
outDatedDisks, aErr := writeUniqueXLMetadata(ctx, outDatedDisks, minioMetaTmpBucket, tmpID,
|
outDatedDisks, aErr := writeUniqueXLMetadata(ctx, outDatedDisks, minioMetaTmpBucket, tmpID,
|
||||||
partsMetadata, diskCount(outDatedDisks))
|
partsMetadata, diskCount(outDatedDisks))
|
||||||
|
@ -425,14 +425,6 @@ func (xl xlObjects) readXLMetaStat(ctx context.Context, bucket, object string) (
|
|||||||
return statInfo{}, nil, reduceReadQuorumErrs(ctx, ignoredErrs, nil, readQuorum)
|
return statInfo{}, nil, reduceReadQuorumErrs(ctx, ignoredErrs, nil, readQuorum)
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteXLMetadata - deletes `xl.json` on a single disk.
|
|
||||||
func deleteXLMetdata(ctx context.Context, disk StorageAPI, bucket, prefix string) error {
|
|
||||||
jsonFile := path.Join(prefix, xlMetaJSONFile)
|
|
||||||
err := disk.DeleteFile(bucket, jsonFile)
|
|
||||||
logger.LogIf(ctx, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// writeXLMetadata - writes `xl.json` to a single disk.
|
// writeXLMetadata - writes `xl.json` to a single disk.
|
||||||
func writeXLMetadata(ctx context.Context, disk StorageAPI, bucket, prefix string, xlMeta xlMetaV1) error {
|
func writeXLMetadata(ctx context.Context, disk StorageAPI, bucket, prefix string, xlMeta xlMetaV1) error {
|
||||||
jsonFile := path.Join(prefix, xlMetaJSONFile)
|
jsonFile := path.Join(prefix, xlMetaJSONFile)
|
||||||
@ -450,27 +442,6 @@ func writeXLMetadata(ctx context.Context, disk StorageAPI, bucket, prefix string
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteAllXLMetadata - deletes all partially written `xl.json` depending on errs.
|
|
||||||
func deleteAllXLMetadata(ctx context.Context, disks []StorageAPI, bucket, prefix string, errs []error) {
|
|
||||||
var wg = &sync.WaitGroup{}
|
|
||||||
// Delete all the `xl.json` left over.
|
|
||||||
for index, disk := range disks {
|
|
||||||
if disk == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Undo rename object in parallel.
|
|
||||||
wg.Add(1)
|
|
||||||
go func(index int, disk StorageAPI) {
|
|
||||||
defer wg.Done()
|
|
||||||
if errs[index] != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
_ = deleteXLMetdata(ctx, disk, bucket, prefix)
|
|
||||||
}(index, disk)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Rename `xl.json` content to destination location for each disk in order.
|
// Rename `xl.json` content to destination location for each disk in order.
|
||||||
func renameXLMetadata(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, quorum int) ([]StorageAPI, error) {
|
func renameXLMetadata(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, quorum int) ([]StorageAPI, error) {
|
||||||
isDir := false
|
isDir := false
|
||||||
@ -510,10 +481,6 @@ func writeUniqueXLMetadata(ctx context.Context, disks []StorageAPI, bucket, pref
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
err := reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, quorum)
|
err := reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, quorum)
|
||||||
if err == errXLWriteQuorum {
|
|
||||||
// Delete all `xl.json` successfully renamed.
|
|
||||||
deleteAllXLMetadata(ctx, disks, bucket, prefix, mErrs)
|
|
||||||
}
|
|
||||||
return evalDisks(disks, mErrs), err
|
return evalDisks(disks, mErrs), err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -548,9 +515,5 @@ func writeSameXLMetadata(ctx context.Context, disks []StorageAPI, bucket, prefix
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
err := reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, writeQuorum)
|
err := reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, writeQuorum)
|
||||||
if err == errXLWriteQuorum {
|
|
||||||
// Delete all `xl.json` successfully renamed.
|
|
||||||
deleteAllXLMetadata(ctx, disks, bucket, prefix, mErrs)
|
|
||||||
}
|
|
||||||
return evalDisks(disks, mErrs), err
|
return evalDisks(disks, mErrs), err
|
||||||
}
|
}
|
||||||
|
@ -136,10 +136,6 @@ func commitXLMetadata(ctx context.Context, disks []StorageAPI, srcBucket, srcPre
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
err := reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, quorum)
|
err := reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, quorum)
|
||||||
if err == errXLWriteQuorum {
|
|
||||||
// Delete all `xl.json` successfully renamed.
|
|
||||||
deleteAllXLMetadata(ctx, disks, dstBucket, dstPrefix, mErrs)
|
|
||||||
}
|
|
||||||
return evalDisks(disks, mErrs), err
|
return evalDisks(disks, mErrs), err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -217,15 +213,16 @@ func (xl xlObjects) newMultipartUpload(ctx context.Context, bucket string, objec
|
|||||||
uploadIDPath := xl.getUploadIDDir(bucket, object, uploadID)
|
uploadIDPath := xl.getUploadIDDir(bucket, object, uploadID)
|
||||||
tempUploadIDPath := uploadID
|
tempUploadIDPath := uploadID
|
||||||
|
|
||||||
|
// Delete the tmp path later in case we fail to commit (ignore
|
||||||
|
// returned errors) - this will be a no-op in case of a commit
|
||||||
|
// success.
|
||||||
|
defer xl.deleteObject(ctx, minioMetaTmpBucket, tempUploadIDPath, writeQuorum, false)
|
||||||
|
|
||||||
// Write updated `xl.json` to all disks.
|
// Write updated `xl.json` to all disks.
|
||||||
disks, err := writeSameXLMetadata(ctx, xl.getDisks(), minioMetaTmpBucket, tempUploadIDPath, xlMeta, writeQuorum)
|
disks, err := writeSameXLMetadata(ctx, xl.getDisks(), minioMetaTmpBucket, tempUploadIDPath, xlMeta, writeQuorum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath)
|
return "", toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath)
|
||||||
}
|
}
|
||||||
// delete the tmp path later in case we fail to rename (ignore
|
|
||||||
// returned errors) - this will be a no-op in case of a rename
|
|
||||||
// success.
|
|
||||||
defer xl.deleteObject(ctx, minioMetaTmpBucket, tempUploadIDPath, writeQuorum, false)
|
|
||||||
|
|
||||||
// Attempt to rename temp upload object to actual upload path object
|
// Attempt to rename temp upload object to actual upload path object
|
||||||
_, rErr := rename(ctx, disks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, true, writeQuorum, nil)
|
_, rErr := rename(ctx, disks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, true, writeQuorum, nil)
|
||||||
@ -453,8 +450,10 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Write all the checksum metadata.
|
// Write all the checksum metadata.
|
||||||
newUUID := mustGetUUID()
|
tempXLMetaPath := mustGetUUID()
|
||||||
tempXLMetaPath := newUUID
|
|
||||||
|
// Cleanup in case of xl.json writing failure
|
||||||
|
defer xl.deleteObject(ctx, minioMetaTmpBucket, tempXLMetaPath, writeQuorum, false)
|
||||||
|
|
||||||
// Writes a unique `xl.json` each disk carrying new checksum related information.
|
// Writes a unique `xl.json` each disk carrying new checksum related information.
|
||||||
if onlineDisks, err = writeUniqueXLMetadata(ctx, onlineDisks, minioMetaTmpBucket, tempXLMetaPath, partsMetadata, writeQuorum); err != nil {
|
if onlineDisks, err = writeUniqueXLMetadata(ctx, onlineDisks, minioMetaTmpBucket, tempXLMetaPath, partsMetadata, writeQuorum); err != nil {
|
||||||
@ -732,8 +731,6 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
|||||||
// Save the consolidated actual size.
|
// Save the consolidated actual size.
|
||||||
xlMeta.Meta[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10)
|
xlMeta.Meta[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10)
|
||||||
|
|
||||||
tempUploadIDPath := uploadID
|
|
||||||
|
|
||||||
// Update all xl metadata, make sure to not modify fields like
|
// Update all xl metadata, make sure to not modify fields like
|
||||||
// checksum which are different on each disks.
|
// checksum which are different on each disks.
|
||||||
for index := range partsMetadata {
|
for index := range partsMetadata {
|
||||||
@ -742,13 +739,18 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
|||||||
partsMetadata[index].Parts = xlMeta.Parts
|
partsMetadata[index].Parts = xlMeta.Parts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tempXLMetaPath := mustGetUUID()
|
||||||
|
|
||||||
|
// Cleanup in case of failure
|
||||||
|
defer xl.deleteObject(ctx, minioMetaTmpBucket, tempXLMetaPath, writeQuorum, false)
|
||||||
|
|
||||||
// Write unique `xl.json` for each disk.
|
// Write unique `xl.json` for each disk.
|
||||||
if onlineDisks, err = writeUniqueXLMetadata(ctx, onlineDisks, minioMetaTmpBucket, tempUploadIDPath, partsMetadata, writeQuorum); err != nil {
|
if onlineDisks, err = writeUniqueXLMetadata(ctx, onlineDisks, minioMetaTmpBucket, tempXLMetaPath, partsMetadata, writeQuorum); err != nil {
|
||||||
return oi, toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath)
|
return oi, toObjectErr(err, minioMetaTmpBucket, tempXLMetaPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
var rErr error
|
var rErr error
|
||||||
onlineDisks, rErr = commitXLMetadata(ctx, onlineDisks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, writeQuorum)
|
onlineDisks, rErr = commitXLMetadata(ctx, onlineDisks, minioMetaTmpBucket, tempXLMetaPath, minioMetaMultipartBucket, uploadIDPath, writeQuorum)
|
||||||
if rErr != nil {
|
if rErr != nil {
|
||||||
return oi, toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath)
|
return oi, toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath)
|
||||||
}
|
}
|
||||||
|
@ -103,6 +103,9 @@ func (xl xlObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBuc
|
|||||||
|
|
||||||
tempObj := mustGetUUID()
|
tempObj := mustGetUUID()
|
||||||
|
|
||||||
|
// Cleanup in case of xl.json writing failure
|
||||||
|
defer xl.deleteObject(ctx, minioMetaTmpBucket, tempObj, writeQuorum, false)
|
||||||
|
|
||||||
// Write unique `xl.json` for each disk.
|
// Write unique `xl.json` for each disk.
|
||||||
if onlineDisks, err = writeUniqueXLMetadata(ctx, storageDisks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil {
|
if onlineDisks, err = writeUniqueXLMetadata(ctx, storageDisks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil {
|
||||||
return oi, toObjectErr(err, srcBucket, srcObject)
|
return oi, toObjectErr(err, srcBucket, srcObject)
|
||||||
|
Loading…
Reference in New Issue
Block a user