diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 13cdea10b..2dd7da21e 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -824,46 +824,8 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up return result, toObjectErr(err, bucket, object, uploadID) } - onlineDisks := er.getDisks() uploadIDPath := er.getUploadIDDir(bucket, object, uploadID) - if maxParts == 0 { - return result, nil - } - - if partNumberMarker < 0 { - partNumberMarker = 0 - } - - // Limit output to maxPartsList. - if maxParts > maxPartsList-partNumberMarker { - maxParts = maxPartsList - partNumberMarker - } - - // Read Part info for all parts - partPath := pathJoin(uploadIDPath, fi.DataDir) + "/" - req := ReadMultipleReq{ - Bucket: minioMetaMultipartBucket, - Prefix: partPath, - MaxSize: 1 << 20, // Each part should realistically not be > 1MiB. - MaxResults: maxParts + 1, - } - - start := partNumberMarker + 1 - end := start + maxParts - - // Parts are 1 based, so index 0 is part one, etc. - for i := start; i <= end; i++ { - req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", i)) - } - - writeQuorum := fi.WriteQuorum(er.defaultWQuorum()) - - partInfoFiles, err := readMultipleFiles(ctx, onlineDisks, req, writeQuorum) - if err != nil { - return result, err - } - // Populate the result stub. result.Bucket = bucket result.Object = object @@ -873,13 +835,65 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up result.UserDefined = cloneMSS(fi.Metadata) result.ChecksumAlgorithm = fi.Metadata[hash.MinIOMultipartChecksum] - // For empty number of parts or maxParts as zero, return right here. - if len(partInfoFiles) == 0 || maxParts == 0 { + if partNumberMarker < 0 { + partNumberMarker = 0 + } + + // Limit output to maxPartsList. + if maxParts > maxPartsList-partNumberMarker { + maxParts = maxPartsList - partNumberMarker + } + + if maxParts == 0 { return result, nil } - for i, part := range partInfoFiles { + // Read Part info for all parts + partPath := pathJoin(uploadIDPath, fi.DataDir) + "/" + req := ReadMultipleReq{ + Bucket: minioMetaMultipartBucket, + Prefix: partPath, + MaxSize: 1 << 20, // Each part should realistically not be > 1MiB. + MaxResults: maxParts + 1, + MetadataOnly: true, + } + + start := partNumberMarker + 1 + end := start + maxParts + + // Parts are 1 based, so index 0 is part one, etc. + for i := start; i <= end; i++ { + req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", i)) + } + + var disk StorageAPI + disks := er.getLoadBalancedLocalDisks() + if len(disks) == 0 { + // using er.getLoadBalancedLocalDisks() has one side-affect where + // on a pooled setup all disks are remote, add a fallback + disks = er.getDisks() + } + + ch := make(chan ReadMultipleResp) // channel is closed by ReadMultiple() + + for _, disk = range disks { + if disk == nil { + continue + } + if !disk.IsOnline() { + continue + } + go func(disk StorageAPI) { + disk.ReadMultiple(ctx, req, ch) // ignore error since this function only ever returns an error i + }(disk) + break + } + + var i int + for part := range ch { partN := i + partNumberMarker + 1 + i++ + if part.Error != "" || !part.Exists { continue } @@ -964,11 +978,12 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str // Read Part info for all parts partPath := pathJoin(uploadIDPath, fi.DataDir) + "/" req := ReadMultipleReq{ - Bucket: minioMetaMultipartBucket, - Prefix: partPath, - MaxSize: 1 << 20, // Each part should realistically not be > 1MiB. - Files: make([]string, 0, len(parts)), - AbortOn404: true, + Bucket: minioMetaMultipartBucket, + Prefix: partPath, + MaxSize: 1 << 20, // Each part should realistically not be > 1MiB. + Files: make([]string, 0, len(parts)), + AbortOn404: true, + MetadataOnly: true, } for _, part := range parts { req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", part.PartNumber))