mirror of
https://github.com/minio/minio.git
synced 2024-12-24 22:25:54 -05:00
Return quorum error based on disks in abortMultipartUpload (#6362)
Fixes #4980
This commit is contained in:
parent
029f52880b
commit
384a862940
@ -223,7 +223,7 @@ func (xl xlObjects) newMultipartUpload(ctx context.Context, bucket string, objec
|
||||
// delete the tmp path later in case we fail to rename (ignore
|
||||
// returned errors) - this will be a no-op in case of a rename
|
||||
// success.
|
||||
defer xl.deleteObject(ctx, minioMetaTmpBucket, tempUploadIDPath)
|
||||
defer xl.deleteObject(ctx, minioMetaTmpBucket, tempUploadIDPath, writeQuorum, false)
|
||||
|
||||
// Attempt to rename temp upload object to actual upload path object
|
||||
_, rErr := renameObject(ctx, disks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, writeQuorum)
|
||||
@ -361,7 +361,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
|
||||
tmpPartPath := path.Join(tmpPart, partSuffix)
|
||||
|
||||
// Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete.
|
||||
defer xl.deleteObject(ctx, minioMetaTmpBucket, tmpPart)
|
||||
defer xl.deleteObject(ctx, minioMetaTmpBucket, tmpPart, writeQuorum, false)
|
||||
if data.Size() > 0 {
|
||||
if pErr := xl.prepareFile(ctx, minioMetaTmpBucket, tmpPartPath, data.Size(), onlineDisks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, writeQuorum); err != nil {
|
||||
return pi, toObjectErr(pErr, bucket, object)
|
||||
@ -752,7 +752,7 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
||||
newUniqueID := mustGetUUID()
|
||||
|
||||
// Delete success renamed object.
|
||||
defer xl.deleteObject(ctx, minioMetaTmpBucket, newUniqueID)
|
||||
defer xl.deleteObject(ctx, minioMetaTmpBucket, newUniqueID, writeQuorum, false)
|
||||
|
||||
// NOTE: Do not use online disks slice here.
|
||||
// The reason is that existing object should be purged
|
||||
@ -792,34 +792,6 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
||||
return xlMeta.ToObjectInfo(bucket, object), nil
|
||||
}
|
||||
|
||||
// Wrapper which removes all the uploaded parts.
|
||||
func (xl xlObjects) cleanupUploadedParts(ctx context.Context, uploadIDPath string, writeQuorum int) error {
|
||||
var errs = make([]error, len(xl.getDisks()))
|
||||
var wg = &sync.WaitGroup{}
|
||||
|
||||
// Cleanup uploadID for all disks.
|
||||
for index, disk := range xl.getDisks() {
|
||||
if disk == nil {
|
||||
errs[index] = errDiskNotFound
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
// Cleanup each uploadID in a routine.
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
err := cleanupDir(ctx, disk, minioMetaMultipartBucket, uploadIDPath)
|
||||
if err != nil {
|
||||
errs[index] = err
|
||||
}
|
||||
}(index, disk)
|
||||
}
|
||||
|
||||
// Wait for all the cleanups to finish.
|
||||
wg.Wait()
|
||||
|
||||
return reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
}
|
||||
|
||||
// AbortMultipartUpload - aborts an ongoing multipart operation
|
||||
// signified by the input uploadID. This is an atomic operation
|
||||
// doesn't require clients to initiate multiple such requests.
|
||||
@ -859,7 +831,7 @@ func (xl xlObjects) AbortMultipartUpload(ctx context.Context, bucket, object, up
|
||||
}
|
||||
|
||||
// Cleanup all uploaded parts.
|
||||
if err = xl.cleanupUploadedParts(ctx, uploadIDPath, writeQuorum); err != nil {
|
||||
if err = xl.deleteObject(ctx, minioMetaMultipartBucket, uploadIDPath, writeQuorum, false); err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
@ -911,12 +883,7 @@ func (xl xlObjects) cleanupStaleMultipartUploadsOnDisk(ctx context.Context, disk
|
||||
continue
|
||||
}
|
||||
if now.Sub(fi.ModTime) > expiry {
|
||||
// Quorum value will need to be figured out using readAllXLMetadata() and objectQuorumFromMeta()
|
||||
// But we can avoid these calls as we do not care if xl.cleanupUploadedParts() meets quorum
|
||||
// when it removes files. We igore the error message from xl.cleanupUploadedParts() as we can't
|
||||
// return it to any client. Hence we set quorum to 0.
|
||||
quorum := 0
|
||||
xl.cleanupUploadedParts(ctx, uploadIDPath, quorum)
|
||||
xl.deleteObject(ctx, minioMetaMultipartBucket, uploadIDPath, len(xl.getDisks())/2+1, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -594,7 +594,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
|
||||
// Delete temporary object in the event of failure.
|
||||
// If PutObject succeeded there would be no temporary
|
||||
// object to delete.
|
||||
defer xl.deleteObject(ctx, minioMetaTmpBucket, tempObj)
|
||||
defer xl.deleteObject(ctx, minioMetaTmpBucket, tempObj, writeQuorum, false)
|
||||
|
||||
// This is a special case with size as '0' and object ends with
|
||||
// a slash separator, we treat it like a valid operation and
|
||||
@ -763,7 +763,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
|
||||
newUniqueID := mustGetUUID()
|
||||
|
||||
// Delete successfully renamed object.
|
||||
defer xl.deleteObject(ctx, minioMetaTmpBucket, newUniqueID)
|
||||
defer xl.deleteObject(ctx, minioMetaTmpBucket, newUniqueID, writeQuorum, false)
|
||||
|
||||
// NOTE: Do not use online disks slice here.
|
||||
// The reason is that existing object should be purged
|
||||
@ -822,36 +822,32 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
|
||||
// deleteObject - wrapper for delete object, deletes an object from
|
||||
// all the disks in parallel, including `xl.json` associated with the
|
||||
// object.
|
||||
func (xl xlObjects) deleteObject(ctx context.Context, bucket, object string) error {
|
||||
func (xl xlObjects) deleteObject(ctx context.Context, bucket, object string, writeQuorum int, isDir bool) error {
|
||||
var disks []StorageAPI
|
||||
var err error
|
||||
|
||||
tmpObj := mustGetUUID()
|
||||
if bucket == minioMetaTmpBucket {
|
||||
tmpObj = object
|
||||
disks = xl.getDisks()
|
||||
} else {
|
||||
if isDir {
|
||||
disks, err = renameObjectDir(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, tmpObj, writeQuorum)
|
||||
} else {
|
||||
disks, err = renameObject(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, tmpObj, writeQuorum)
|
||||
}
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize sync waitgroup.
|
||||
var wg = &sync.WaitGroup{}
|
||||
|
||||
var writeQuorum int
|
||||
var err error
|
||||
|
||||
isDir := hasSuffix(object, slashSeparator)
|
||||
|
||||
if !isDir {
|
||||
// Read metadata associated with the object from all disks.
|
||||
metaArr, errs := readAllXLMetadata(ctx, xl.getDisks(), bucket, object)
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err = objectQuorumFromMeta(ctx, xl, metaArr, errs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// WriteQuorum is defaulted to N/2 + 1 for directories
|
||||
writeQuorum = len(xl.getDisks())/2 + 1
|
||||
}
|
||||
|
||||
// Initialize list of errors.
|
||||
var dErrs = make([]error, len(xl.getDisks()))
|
||||
var dErrs = make([]error, len(disks))
|
||||
|
||||
for index, disk := range xl.getDisks() {
|
||||
for index, disk := range disks {
|
||||
if disk == nil {
|
||||
dErrs[index] = errDiskNotFound
|
||||
continue
|
||||
@ -863,9 +859,9 @@ func (xl xlObjects) deleteObject(ctx context.Context, bucket, object string) err
|
||||
if isDir {
|
||||
// DeleteFile() simply tries to remove a directory
|
||||
// and will succeed only if that directory is empty.
|
||||
e = disk.DeleteFile(bucket, object)
|
||||
e = disk.DeleteFile(minioMetaTmpBucket, tmpObj)
|
||||
} else {
|
||||
e = cleanupDir(ctx, disk, bucket, object)
|
||||
e = cleanupDir(ctx, disk, minioMetaTmpBucket, tmpObj)
|
||||
}
|
||||
if e != nil && e != errVolumeNotFound {
|
||||
dErrs[index] = e
|
||||
@ -897,18 +893,22 @@ func (xl xlObjects) DeleteObject(ctx context.Context, bucket, object string) (er
|
||||
|
||||
if hasSuffix(object, slashSeparator) {
|
||||
// Delete the object on all disks.
|
||||
if err = xl.deleteObject(ctx, bucket, object); err != nil {
|
||||
if err = xl.deleteObject(ctx, bucket, object, len(xl.getDisks())/2+1, true); err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
}
|
||||
|
||||
// Validate object exists.
|
||||
if !xl.isObject(bucket, object) {
|
||||
return ObjectNotFound{bucket, object}
|
||||
} // else proceed to delete the object.
|
||||
// Read metadata associated with the object from all disks.
|
||||
partsMetadata, errs := readAllXLMetadata(ctx, xl.getDisks(), bucket, object)
|
||||
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, xl, partsMetadata, errs)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Delete the object on all disks.
|
||||
if err = xl.deleteObject(ctx, bucket, object); err != nil {
|
||||
if err = xl.deleteObject(ctx, bucket, object, writeQuorum, false); err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user