allow concurrent aborts on active uploadParts() (#21229)

allow aborting on active uploads in progress, however fail these
uploads subsequently during commit phase and return appropriate errors
This commit is contained in:
jiuker 2025-04-25 13:41:04 +08:00 committed by GitHub
parent b7540169a2
commit ddd9a84cd7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 60 additions and 18 deletions

View File

@ -527,7 +527,7 @@ func (er erasureObjects) NewMultipartUpload(ctx context.Context, bucket, object
}
// renamePart - renames multipart part to its relevant location under uploadID.
func (er erasureObjects) renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, optsMeta []byte, writeQuorum int) ([]StorageAPI, error) {
func (er erasureObjects) renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, optsMeta []byte, writeQuorum int, skipParent string) ([]StorageAPI, error) {
paths := []string{
dstEntry,
dstEntry + ".meta",
@ -545,7 +545,7 @@ func (er erasureObjects) renamePart(ctx context.Context, disks []StorageAPI, src
if disks[index] == nil {
return errDiskNotFound
}
return disks[index].RenamePart(ctx, srcBucket, srcEntry, dstBucket, dstEntry, optsMeta)
return disks[index].RenamePart(ctx, srcBucket, srcEntry, dstBucket, dstEntry, optsMeta, skipParent)
}, index)
}
@ -754,8 +754,11 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
ctx = rlkctx.Context()
defer uploadIDRLock.RUnlock(rlkctx)
onlineDisks, err = er.renamePart(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, partFI, writeQuorum)
onlineDisks, err = er.renamePart(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, partFI, writeQuorum, uploadIDPath)
if err != nil {
if errors.Is(err, errUploadIDNotFound) {
return pi, toObjectErr(errUploadIDNotFound, bucket, object, uploadID)
}
if errors.Is(err, errFileNotFound) {
// An in-quorum errFileNotFound means that client stream
// prematurely closed and we do not find any xl.meta or

View File

@ -208,11 +208,11 @@ func (d *naughtyDisk) RenameData(ctx context.Context, srcVolume, srcPath string,
return d.disk.RenameData(ctx, srcVolume, srcPath, fi, dstVolume, dstPath, opts)
}
func (d *naughtyDisk) RenamePart(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string, meta []byte) error {
func (d *naughtyDisk) RenamePart(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string, meta []byte, skipParent string) error {
if err := d.calcError(); err != nil {
return err
}
return d.disk.RenamePart(ctx, srcVolume, srcPath, dstVolume, dstPath, meta)
return d.disk.RenamePart(ctx, srcVolume, srcPath, dstVolume, dstPath, meta, skipParent)
}
func (d *naughtyDisk) ReadParts(ctx context.Context, bucket string, partMetaPaths ...string) ([]*ObjectPartInfo, error) {

View File

@ -508,6 +508,7 @@ type RenamePartHandlerParams struct {
DstVolume string `msg:"dv"`
DstFilePath string `msg:"dp"`
Meta []byte `msg:"m"`
SkipParent string `msg:"kp"`
}
// ReadAllHandlerParams are parameters for ReadAllHandler.

View File

@ -6219,6 +6219,12 @@ func (z *RenamePartHandlerParams) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "Meta")
return
}
case "kp":
z.SkipParent, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "SkipParent")
return
}
default:
err = dc.Skip()
if err != nil {
@ -6232,9 +6238,9 @@ func (z *RenamePartHandlerParams) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *RenamePartHandlerParams) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 6
// map header, size 7
// write "id"
err = en.Append(0x86, 0xa2, 0x69, 0x64)
err = en.Append(0x87, 0xa2, 0x69, 0x64)
if err != nil {
return
}
@ -6293,15 +6299,25 @@ func (z *RenamePartHandlerParams) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "Meta")
return
}
// write "kp"
err = en.Append(0xa2, 0x6b, 0x70)
if err != nil {
return
}
err = en.WriteString(z.SkipParent)
if err != nil {
err = msgp.WrapError(err, "SkipParent")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *RenamePartHandlerParams) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 6
// map header, size 7
// string "id"
o = append(o, 0x86, 0xa2, 0x69, 0x64)
o = append(o, 0x87, 0xa2, 0x69, 0x64)
o = msgp.AppendString(o, z.DiskID)
// string "sv"
o = append(o, 0xa2, 0x73, 0x76)
@ -6318,6 +6334,9 @@ func (z *RenamePartHandlerParams) MarshalMsg(b []byte) (o []byte, err error) {
// string "m"
o = append(o, 0xa1, 0x6d)
o = msgp.AppendBytes(o, z.Meta)
// string "kp"
o = append(o, 0xa2, 0x6b, 0x70)
o = msgp.AppendString(o, z.SkipParent)
return
}
@ -6375,6 +6394,12 @@ func (z *RenamePartHandlerParams) UnmarshalMsg(bts []byte) (o []byte, err error)
err = msgp.WrapError(err, "Meta")
return
}
case "kp":
z.SkipParent, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "SkipParent")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
@ -6389,7 +6414,7 @@ func (z *RenamePartHandlerParams) UnmarshalMsg(bts []byte) (o []byte, err error)
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *RenamePartHandlerParams) Msgsize() (s int) {
s = 1 + 3 + msgp.StringPrefixSize + len(z.DiskID) + 3 + msgp.StringPrefixSize + len(z.SrcVolume) + 3 + msgp.StringPrefixSize + len(z.SrcFilePath) + 3 + msgp.StringPrefixSize + len(z.DstVolume) + 3 + msgp.StringPrefixSize + len(z.DstFilePath) + 2 + msgp.BytesPrefixSize + len(z.Meta)
s = 1 + 3 + msgp.StringPrefixSize + len(z.DiskID) + 3 + msgp.StringPrefixSize + len(z.SrcVolume) + 3 + msgp.StringPrefixSize + len(z.SrcFilePath) + 3 + msgp.StringPrefixSize + len(z.DstVolume) + 3 + msgp.StringPrefixSize + len(z.DstFilePath) + 2 + msgp.BytesPrefixSize + len(z.Meta) + 3 + msgp.StringPrefixSize + len(z.SkipParent)
return
}

View File

@ -95,7 +95,7 @@ type StorageAPI interface {
CreateFile(ctx context.Context, origvolume, olume, path string, size int64, reader io.Reader) error
ReadFileStream(ctx context.Context, volume, path string, offset, length int64) (io.ReadCloser, error)
RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) error
RenamePart(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string, meta []byte) error
RenamePart(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string, meta []byte, skipParent string) error
CheckParts(ctx context.Context, volume string, path string, fi FileInfo) (*CheckPartsResp, error)
Delete(ctx context.Context, volume string, path string, opts DeleteOptions) (err error)
VerifyFile(ctx context.Context, volume, path string, fi FileInfo) (*CheckPartsResp, error)

View File

@ -760,7 +760,7 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
}
// RenamePart - renames multipart part file
func (client *storageRESTClient) RenamePart(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string, meta []byte) (err error) {
func (client *storageRESTClient) RenamePart(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string, meta []byte, skipParent string) (err error) {
ctx, cancel := context.WithTimeout(ctx, globalDriveConfig.GetMaxTimeout())
defer cancel()
@ -771,6 +771,7 @@ func (client *storageRESTClient) RenamePart(ctx context.Context, srcVolume, srcP
DstVolume: dstVolume,
DstFilePath: dstPath,
Meta: meta,
SkipParent: skipParent,
})
return toStorageErr(err)
}

View File

@ -728,7 +728,7 @@ func (s *storageRESTServer) RenamePartHandler(p *RenamePartHandlerParams) (grid.
if !s.checkID(p.DiskID) {
return grid.NewNPErr(errDiskNotFound)
}
return grid.NewNPErr(s.getStorage().RenamePart(context.Background(), p.SrcVolume, p.SrcFilePath, p.DstVolume, p.DstFilePath, p.Meta))
return grid.NewNPErr(s.getStorage().RenamePart(context.Background(), p.SrcVolume, p.SrcFilePath, p.DstVolume, p.DstFilePath, p.Meta, p.SkipParent))
}
// CleanAbandonedDataHandler - Clean unused data directories.

View File

@ -456,7 +456,7 @@ func (p *xlStorageDiskIDCheck) ReadFileStream(ctx context.Context, volume, path
})
}
func (p *xlStorageDiskIDCheck) RenamePart(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string, meta []byte) (err error) {
func (p *xlStorageDiskIDCheck) RenamePart(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string, meta []byte, skipParent string) (err error) {
ctx, done, err := p.TrackDiskHealth(ctx, storageMetricRenamePart, srcVolume, srcPath, dstVolume, dstPath)
if err != nil {
return err
@ -464,7 +464,9 @@ func (p *xlStorageDiskIDCheck) RenamePart(ctx context.Context, srcVolume, srcPat
defer done(0, &err)
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error { return p.storage.RenamePart(ctx, srcVolume, srcPath, dstVolume, dstPath, meta) })
return w.Run(func() error {
return p.storage.RenamePart(ctx, srcVolume, srcPath, dstVolume, dstPath, meta, skipParent)
})
}
func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string) (err error) {

View File

@ -2917,7 +2917,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
// RenamePart - rename part path to destination path atomically, this is meant to be used
// only with multipart API
func (s *xlStorage) RenamePart(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string, meta []byte) (err error) {
func (s *xlStorage) RenamePart(ctx context.Context, srcVolume, srcPath, dstVolume, dstPath string, meta []byte, skipParent string) (err error) {
srcVolumeDir, err := s.getVolDir(srcVolume)
if err != nil {
return err
@ -2964,12 +2964,22 @@ func (s *xlStorage) RenamePart(ctx context.Context, srcVolume, srcPath, dstVolum
if err = checkPathLength(dstFilePath); err != nil {
return err
}
// when skipParent is from rpc. its ok for not adding another rpc HandlerID like HandlerRenamePart2
// For this case, skipParent is empty, destBaseDir is equal to dstVolumeDir, that behavior is the same as the previous one
destBaseDir := pathutil.Join(dstVolumeDir, skipParent)
if err = checkPathLength(destBaseDir); err != nil {
return err
}
if err = renameAll(srcFilePath, dstFilePath, dstVolumeDir); err != nil {
if err = renameAll(srcFilePath, dstFilePath, destBaseDir); err != nil {
if isSysErrNotEmpty(err) || isSysErrNotDir(err) {
return errFileAccessDenied
}
return osErrToFileErr(err)
err = osErrToFileErr(err)
if errors.Is(err, errFileNotFound) {
return errUploadIDNotFound
}
return err
}
if err = s.WriteAll(ctx, dstVolume, dstPath+".meta", meta); err != nil {