mirror of
https://github.com/minio/minio.git
synced 2025-01-23 04:33:15 -05:00
fix: use renameAll instead of deleteObject() for purging temporary files (#14096)
This PR simplifies few things - Multipart parts are renamed, upon failure are unrenamed() keep this multipart specific behavior it is needed and works fine. - AbortMultipart should blindly delete once lock is acquired instead of re-reading metadata and calculating quorum, abort is a delete() operation and client has no business looking for errors on this. - Skip Access() calls to folders that are operating on `.minio.sys/multipart` folder as well.
This commit is contained in:
parent
38ccc4f672
commit
f546636c52
@ -547,7 +547,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
|
||||
}
|
||||
|
||||
defer er.deleteObject(context.Background(), minioMetaTmpBucket, tmpID, len(storageDisks)/2+1)
|
||||
defer er.renameAll(context.Background(), minioMetaTmpBucket, tmpID)
|
||||
|
||||
// Rename from tmp location to the actual location.
|
||||
for i, disk := range outDatedDisks {
|
||||
|
@ -392,6 +392,53 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec
|
||||
return partInfo, nil
|
||||
}
|
||||
|
||||
func undoRenamePart(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, errs []error) {
|
||||
// Undo rename object on disks where RenameFile succeeded.
|
||||
g := errgroup.WithNErrs(len(disks))
|
||||
for index, disk := range disks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
index := index
|
||||
g.Go(func() error {
|
||||
if errs[index] == nil {
|
||||
_ = disks[index].RenameFile(context.TODO(), dstBucket, dstEntry, srcBucket, srcEntry)
|
||||
}
|
||||
return nil
|
||||
}, index)
|
||||
}
|
||||
g.Wait()
|
||||
}
|
||||
|
||||
// renamePart - renames multipart part to its relevant location under uploadID.
|
||||
func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, writeQuorum int) ([]StorageAPI, error) {
|
||||
g := errgroup.WithNErrs(len(disks))
|
||||
|
||||
// Rename file on all underlying storage disks.
|
||||
for index := range disks {
|
||||
index := index
|
||||
g.Go(func() error {
|
||||
if disks[index] == nil {
|
||||
return errDiskNotFound
|
||||
}
|
||||
return disks[index].RenameFile(ctx, srcBucket, srcEntry, dstBucket, dstEntry)
|
||||
}, index)
|
||||
}
|
||||
|
||||
// Wait for all renames to finish.
|
||||
errs := g.Wait()
|
||||
|
||||
// We can safely allow RenameFile errors up to len(er.getDisks()) - writeQuorum
|
||||
// otherwise return failure. Cleanup successful renames.
|
||||
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
if err == errErasureWriteQuorum {
|
||||
// Undo all the partial rename operations.
|
||||
undoRenamePart(disks, srcBucket, srcEntry, dstBucket, dstEntry, errs)
|
||||
}
|
||||
|
||||
return evalDisks(disks, errs), err
|
||||
}
|
||||
|
||||
// PutObjectPart - reads incoming stream and internally erasure codes
|
||||
// them. This call is similar to single put operation but it is part
|
||||
// of the multipart transaction.
|
||||
@ -481,7 +528,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
||||
var online int
|
||||
defer func() {
|
||||
if online != len(onlineDisks) {
|
||||
er.deleteObject(context.Background(), minioMetaTmpBucket, tmpPart, writeQuorum)
|
||||
er.renameAll(context.Background(), minioMetaTmpBucket, tmpPart)
|
||||
}
|
||||
}()
|
||||
|
||||
@ -555,7 +602,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
||||
|
||||
// Rename temporary part file to its final location.
|
||||
partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix)
|
||||
onlineDisks, err = rename(wctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, false, writeQuorum, nil)
|
||||
onlineDisks, err = renamePart(wctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, writeQuorum)
|
||||
if err != nil {
|
||||
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
|
||||
}
|
||||
@ -978,21 +1025,8 @@ func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, objec
|
||||
return toObjectErr(err, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
partsMetadata, errs := readAllFileInfo(ctx, er.getDisks(), minioMetaMultipartBucket, uploadIDPath, "", false)
|
||||
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
// Cleanup all uploaded parts.
|
||||
if err = er.deleteObject(ctx, minioMetaMultipartBucket, uploadIDPath, writeQuorum); err != nil {
|
||||
return toObjectErr(err, bucket, object, uploadID)
|
||||
}
|
||||
er.renameAll(ctx, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID))
|
||||
|
||||
// Successfully purged.
|
||||
return nil
|
||||
|
@ -523,31 +523,6 @@ func (er erasureObjects) getObjectInfoAndQuorum(ctx context.Context, bucket, obj
|
||||
return objInfo, wquorum, nil
|
||||
}
|
||||
|
||||
func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, errs []error) {
|
||||
// Undo rename object on disks where RenameFile succeeded.
|
||||
|
||||
// If srcEntry/dstEntry are objects then add a trailing slash to copy
|
||||
// over all the parts inside the object directory
|
||||
if isDir {
|
||||
srcEntry = retainSlash(srcEntry)
|
||||
dstEntry = retainSlash(dstEntry)
|
||||
}
|
||||
g := errgroup.WithNErrs(len(disks))
|
||||
for index, disk := range disks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
index := index
|
||||
g.Go(func() error {
|
||||
if errs[index] == nil {
|
||||
_ = disks[index].RenameFile(context.TODO(), dstBucket, dstEntry, srcBucket, srcEntry)
|
||||
}
|
||||
return nil
|
||||
}, index)
|
||||
}
|
||||
g.Wait()
|
||||
}
|
||||
|
||||
// Similar to rename but renames data from srcEntry to dstEntry at dataDir
|
||||
func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry string, metadata []FileInfo, dstBucket, dstEntry string, writeQuorum int) ([]StorageAPI, error) {
|
||||
defer NSUpdated(dstBucket, dstEntry)
|
||||
@ -588,46 +563,6 @@ func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry str
|
||||
return evalDisks(disks, errs), err
|
||||
}
|
||||
|
||||
// rename - common function that renamePart and renameObject use to rename
|
||||
// the respective underlying storage layer representations.
|
||||
func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, writeQuorum int, ignoredErr []error) ([]StorageAPI, error) {
|
||||
if isDir {
|
||||
dstEntry = retainSlash(dstEntry)
|
||||
srcEntry = retainSlash(srcEntry)
|
||||
}
|
||||
defer NSUpdated(dstBucket, dstEntry)
|
||||
|
||||
g := errgroup.WithNErrs(len(disks))
|
||||
|
||||
// Rename file on all underlying storage disks.
|
||||
for index := range disks {
|
||||
index := index
|
||||
g.Go(func() error {
|
||||
if disks[index] == nil {
|
||||
return errDiskNotFound
|
||||
}
|
||||
if err := disks[index].RenameFile(ctx, srcBucket, srcEntry, dstBucket, dstEntry); err != nil {
|
||||
if !IsErrIgnored(err, ignoredErr...) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}, index)
|
||||
}
|
||||
|
||||
// Wait for all renames to finish.
|
||||
errs := g.Wait()
|
||||
|
||||
// We can safely allow RenameFile errors up to len(er.getDisks()) - writeQuorum
|
||||
// otherwise return failure. Cleanup successful renames.
|
||||
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
if err == errErasureWriteQuorum {
|
||||
// Undo all the partial rename operations.
|
||||
undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isDir, errs)
|
||||
}
|
||||
return evalDisks(disks, errs), err
|
||||
}
|
||||
|
||||
func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||
data := r.Reader
|
||||
|
||||
@ -904,7 +839,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||
var online int
|
||||
defer func() {
|
||||
if online != len(onlineDisks) {
|
||||
er.deleteObject(context.Background(), minioMetaTmpBucket, tempObj, writeQuorum)
|
||||
er.renameAll(context.Background(), minioMetaTmpBucket, tempObj)
|
||||
}
|
||||
}()
|
||||
|
||||
@ -1070,42 +1005,6 @@ func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object
|
||||
return reduceWriteQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, writeQuorum)
|
||||
}
|
||||
|
||||
// deleteObject - wrapper for delete object, deletes an object from
|
||||
// all the disks in parallel, including `xl.meta` associated with the
|
||||
// object.
|
||||
func (er erasureObjects) deleteObject(ctx context.Context, bucket, object string, writeQuorum int) error {
|
||||
var err error
|
||||
|
||||
disks := er.getDisks()
|
||||
tmpObj := mustGetUUID()
|
||||
if bucket == minioMetaTmpBucket {
|
||||
tmpObj = object
|
||||
} else {
|
||||
// Rename the current object while requiring write quorum, but also consider
|
||||
// that a non found object in a given disk as a success since it already
|
||||
// confirms that the object doesn't have a part in that disk (already removed)
|
||||
disks, err = rename(ctx, disks, bucket, object, minioMetaTmpBucket, tmpObj, true, writeQuorum,
|
||||
[]error{errFileNotFound})
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
}
|
||||
|
||||
g := errgroup.WithNErrs(len(disks))
|
||||
for index := range disks {
|
||||
index := index
|
||||
g.Go(func() error {
|
||||
if disks[index] == nil {
|
||||
return errDiskNotFound
|
||||
}
|
||||
return disks[index].Delete(ctx, minioMetaTmpBucket, tmpObj, true)
|
||||
}, index)
|
||||
}
|
||||
|
||||
// return errors if any during deletion
|
||||
return reduceWriteQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, writeQuorum)
|
||||
}
|
||||
|
||||
// DeleteObjects deletes objects/versions in bulk, this function will still automatically split objects list
|
||||
// into smaller bulks if some object names are found to be duplicated in the delete list, splitting
|
||||
// into smaller bulks will avoid holding twice the write lock of the duplicated object names.
|
||||
|
@ -1984,12 +1984,12 @@ func (s *xlStorage) Delete(ctx context.Context, volume string, path string, recu
|
||||
return s.deleteFile(volumeDir, filePath, recursive)
|
||||
}
|
||||
|
||||
func skipAccessChecks(volume string) bool {
|
||||
func skipAccessChecks(volume string) (ok bool) {
|
||||
switch volume {
|
||||
case minioMetaTmpBucket, minioMetaBucket, minioMetaMultipartBucket:
|
||||
return true
|
||||
case minioMetaTmpBucket, minioMetaBucket, minioMetaMultipartBucket, minioMetaTmpDeletedBucket:
|
||||
ok = true
|
||||
}
|
||||
return false
|
||||
return ok
|
||||
}
|
||||
|
||||
// RenameData - rename source path to destination path atomically, metadata and data directory.
|
||||
@ -2249,10 +2249,6 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||
s.deleteFile(dstVolumeDir, legacyDataPath, true)
|
||||
}
|
||||
s.deleteFile(dstVolumeDir, dstDataPath, false)
|
||||
// Looks like srcFilePath is missing usually at .minio.sys/ ignore it.
|
||||
if !errors.Is(err, errFileNotFound) {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
}
|
||||
@ -2264,12 +2260,6 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||
s.deleteFile(dstVolumeDir, legacyDataPath, true)
|
||||
}
|
||||
s.deleteFile(dstVolumeDir, dstFilePath, false)
|
||||
|
||||
// Looks like srcFilePath is missing usually at .minio.sys/ ignore it.
|
||||
if !errors.Is(err, errFileNotFound) {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
|
||||
@ -2287,8 +2277,6 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||
s.deleteFile(dstVolumeDir, legacyDataPath, true)
|
||||
}
|
||||
s.deleteFile(dstVolumeDir, dstFilePath, false)
|
||||
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -2311,25 +2299,27 @@ func (s *xlStorage) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolum
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Stat a volume entry.
|
||||
if err = Access(srcVolumeDir); err != nil {
|
||||
if osIsNotExist(err) {
|
||||
return errVolumeNotFound
|
||||
} else if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
if !skipAccessChecks(srcVolume) {
|
||||
// Stat a volume entry.
|
||||
if err = Access(srcVolumeDir); err != nil {
|
||||
if osIsNotExist(err) {
|
||||
return errVolumeNotFound
|
||||
} else if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
}
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
if err = Access(dstVolumeDir); err != nil {
|
||||
if osIsNotExist(err) {
|
||||
return errVolumeNotFound
|
||||
} else if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
if !skipAccessChecks(dstVolume) {
|
||||
if err = Access(dstVolumeDir); err != nil {
|
||||
if osIsNotExist(err) {
|
||||
return errVolumeNotFound
|
||||
} else if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
}
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
srcIsDir := HasSuffix(srcPath, SlashSeparator)
|
||||
dstIsDir := HasSuffix(dstPath, SlashSeparator)
|
||||
// Either src and dst have to be directories or files, else return error.
|
||||
|
Loading…
x
Reference in New Issue
Block a user