mirror of
https://github.com/minio/minio.git
synced 2025-01-12 15:33:22 -05:00
fix: listObjectParts to prefer local and single disks (#18309)
This commit is contained in:
parent
a7b1834772
commit
069d118329
@ -824,46 +824,8 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
|||||||
return result, toObjectErr(err, bucket, object, uploadID)
|
return result, toObjectErr(err, bucket, object, uploadID)
|
||||||
}
|
}
|
||||||
|
|
||||||
onlineDisks := er.getDisks()
|
|
||||||
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
||||||
|
|
||||||
if maxParts == 0 {
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if partNumberMarker < 0 {
|
|
||||||
partNumberMarker = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// Limit output to maxPartsList.
|
|
||||||
if maxParts > maxPartsList-partNumberMarker {
|
|
||||||
maxParts = maxPartsList - partNumberMarker
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read Part info for all parts
|
|
||||||
partPath := pathJoin(uploadIDPath, fi.DataDir) + "/"
|
|
||||||
req := ReadMultipleReq{
|
|
||||||
Bucket: minioMetaMultipartBucket,
|
|
||||||
Prefix: partPath,
|
|
||||||
MaxSize: 1 << 20, // Each part should realistically not be > 1MiB.
|
|
||||||
MaxResults: maxParts + 1,
|
|
||||||
}
|
|
||||||
|
|
||||||
start := partNumberMarker + 1
|
|
||||||
end := start + maxParts
|
|
||||||
|
|
||||||
// Parts are 1 based, so index 0 is part one, etc.
|
|
||||||
for i := start; i <= end; i++ {
|
|
||||||
req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", i))
|
|
||||||
}
|
|
||||||
|
|
||||||
writeQuorum := fi.WriteQuorum(er.defaultWQuorum())
|
|
||||||
|
|
||||||
partInfoFiles, err := readMultipleFiles(ctx, onlineDisks, req, writeQuorum)
|
|
||||||
if err != nil {
|
|
||||||
return result, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Populate the result stub.
|
// Populate the result stub.
|
||||||
result.Bucket = bucket
|
result.Bucket = bucket
|
||||||
result.Object = object
|
result.Object = object
|
||||||
@ -873,13 +835,65 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
|||||||
result.UserDefined = cloneMSS(fi.Metadata)
|
result.UserDefined = cloneMSS(fi.Metadata)
|
||||||
result.ChecksumAlgorithm = fi.Metadata[hash.MinIOMultipartChecksum]
|
result.ChecksumAlgorithm = fi.Metadata[hash.MinIOMultipartChecksum]
|
||||||
|
|
||||||
// For empty number of parts or maxParts as zero, return right here.
|
if partNumberMarker < 0 {
|
||||||
if len(partInfoFiles) == 0 || maxParts == 0 {
|
partNumberMarker = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Limit output to maxPartsList.
|
||||||
|
if maxParts > maxPartsList-partNumberMarker {
|
||||||
|
maxParts = maxPartsList - partNumberMarker
|
||||||
|
}
|
||||||
|
|
||||||
|
if maxParts == 0 {
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, part := range partInfoFiles {
|
// Read Part info for all parts
|
||||||
|
partPath := pathJoin(uploadIDPath, fi.DataDir) + "/"
|
||||||
|
req := ReadMultipleReq{
|
||||||
|
Bucket: minioMetaMultipartBucket,
|
||||||
|
Prefix: partPath,
|
||||||
|
MaxSize: 1 << 20, // Each part should realistically not be > 1MiB.
|
||||||
|
MaxResults: maxParts + 1,
|
||||||
|
MetadataOnly: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
start := partNumberMarker + 1
|
||||||
|
end := start + maxParts
|
||||||
|
|
||||||
|
// Parts are 1 based, so index 0 is part one, etc.
|
||||||
|
for i := start; i <= end; i++ {
|
||||||
|
req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", i))
|
||||||
|
}
|
||||||
|
|
||||||
|
var disk StorageAPI
|
||||||
|
disks := er.getLoadBalancedLocalDisks()
|
||||||
|
if len(disks) == 0 {
|
||||||
|
// using er.getLoadBalancedLocalDisks() has one side-affect where
|
||||||
|
// on a pooled setup all disks are remote, add a fallback
|
||||||
|
disks = er.getDisks()
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan ReadMultipleResp) // channel is closed by ReadMultiple()
|
||||||
|
|
||||||
|
for _, disk = range disks {
|
||||||
|
if disk == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !disk.IsOnline() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
go func(disk StorageAPI) {
|
||||||
|
disk.ReadMultiple(ctx, req, ch) // ignore error since this function only ever returns an error i
|
||||||
|
}(disk)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
var i int
|
||||||
|
for part := range ch {
|
||||||
partN := i + partNumberMarker + 1
|
partN := i + partNumberMarker + 1
|
||||||
|
i++
|
||||||
|
|
||||||
if part.Error != "" || !part.Exists {
|
if part.Error != "" || !part.Exists {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -964,11 +978,12 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
|||||||
// Read Part info for all parts
|
// Read Part info for all parts
|
||||||
partPath := pathJoin(uploadIDPath, fi.DataDir) + "/"
|
partPath := pathJoin(uploadIDPath, fi.DataDir) + "/"
|
||||||
req := ReadMultipleReq{
|
req := ReadMultipleReq{
|
||||||
Bucket: minioMetaMultipartBucket,
|
Bucket: minioMetaMultipartBucket,
|
||||||
Prefix: partPath,
|
Prefix: partPath,
|
||||||
MaxSize: 1 << 20, // Each part should realistically not be > 1MiB.
|
MaxSize: 1 << 20, // Each part should realistically not be > 1MiB.
|
||||||
Files: make([]string, 0, len(parts)),
|
Files: make([]string, 0, len(parts)),
|
||||||
AbortOn404: true,
|
AbortOn404: true,
|
||||||
|
MetadataOnly: true,
|
||||||
}
|
}
|
||||||
for _, part := range parts {
|
for _, part := range parts {
|
||||||
req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", part.PartNumber))
|
req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", part.PartNumber))
|
||||||
|
Loading…
Reference in New Issue
Block a user