mirror of
https://github.com/minio/minio.git
synced 2025-11-07 12:52:58 -05:00
make sure listParts returns parts that are valid (#20390)
This commit is contained in:
@@ -795,6 +795,61 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (er erasureObjects) listParts(ctx context.Context, onlineDisks []StorageAPI, partPath string, readQuorum int) ([]int, error) {
|
||||
g := errgroup.WithNErrs(len(onlineDisks))
|
||||
|
||||
objectParts := make([][]string, len(onlineDisks))
|
||||
// List uploaded parts from drives.
|
||||
for index := range onlineDisks {
|
||||
index := index
|
||||
g.Go(func() (err error) {
|
||||
if onlineDisks[index] == nil {
|
||||
return errDiskNotFound
|
||||
}
|
||||
objectParts[index], err = onlineDisks[index].ListDir(ctx, minioMetaMultipartBucket, minioMetaMultipartBucket, partPath, -1)
|
||||
return err
|
||||
}, index)
|
||||
}
|
||||
|
||||
if err := reduceReadQuorumErrs(ctx, g.Wait(), objectOpIgnoredErrs, readQuorum); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
partQuorumMap := make(map[int]int)
|
||||
for _, driveParts := range objectParts {
|
||||
partsWithMetaCount := make(map[int]int, len(driveParts))
|
||||
// part files can be either part.N or part.N.meta
|
||||
for _, partPath := range driveParts {
|
||||
var partNum int
|
||||
if _, err := fmt.Sscanf(partPath, "part.%d", &partNum); err == nil {
|
||||
partsWithMetaCount[partNum]++
|
||||
continue
|
||||
}
|
||||
if _, err := fmt.Sscanf(partPath, "part.%d.meta", &partNum); err == nil {
|
||||
partsWithMetaCount[partNum]++
|
||||
}
|
||||
}
|
||||
// Include only part.N.meta files with corresponding part.N
|
||||
for partNum, cnt := range partsWithMetaCount {
|
||||
if cnt < 2 {
|
||||
continue
|
||||
}
|
||||
partQuorumMap[partNum]++
|
||||
}
|
||||
}
|
||||
|
||||
var partNums []int
|
||||
for partNum, count := range partQuorumMap {
|
||||
if count < readQuorum {
|
||||
continue
|
||||
}
|
||||
partNums = append(partNums, partNum)
|
||||
}
|
||||
|
||||
sort.Ints(partNums)
|
||||
return partNums, nil
|
||||
}
|
||||
|
||||
// ListObjectParts - lists all previously uploaded parts for a given
|
||||
// object and uploadID. Takes additional input of part-number-marker
|
||||
// to indicate where the listing should begin from.
|
||||
@@ -821,6 +876,14 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
||||
}
|
||||
|
||||
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
||||
if partNumberMarker < 0 {
|
||||
partNumberMarker = 0
|
||||
}
|
||||
|
||||
// Limit output to maxPartsList.
|
||||
if maxParts > maxPartsList {
|
||||
maxParts = maxPartsList
|
||||
}
|
||||
|
||||
// Populate the result stub.
|
||||
result.Bucket = bucket
|
||||
@@ -831,126 +894,77 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
||||
result.UserDefined = cloneMSS(fi.Metadata)
|
||||
result.ChecksumAlgorithm = fi.Metadata[hash.MinIOMultipartChecksum]
|
||||
|
||||
if partNumberMarker < 0 {
|
||||
partNumberMarker = 0
|
||||
}
|
||||
|
||||
// Limit output to maxPartsList.
|
||||
if maxParts > maxPartsList-partNumberMarker {
|
||||
maxParts = maxPartsList - partNumberMarker
|
||||
}
|
||||
|
||||
if maxParts == 0 {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
onlineDisks := er.getDisks()
|
||||
readQuorum := fi.ReadQuorum(er.defaultRQuorum())
|
||||
// Read Part info for all parts
|
||||
partPath := pathJoin(uploadIDPath, fi.DataDir) + "/"
|
||||
req := ReadMultipleReq{
|
||||
Bucket: minioMetaMultipartBucket,
|
||||
Prefix: partPath,
|
||||
MaxSize: 1 << 20, // Each part should realistically not be > 1MiB.
|
||||
MaxResults: maxParts + 1,
|
||||
MetadataOnly: true,
|
||||
}
|
||||
partPath := pathJoin(uploadIDPath, fi.DataDir) + SlashSeparator
|
||||
|
||||
start := partNumberMarker + 1
|
||||
end := start + maxParts
|
||||
|
||||
// Parts are 1 based, so index 0 is part one, etc.
|
||||
for i := start; i <= end; i++ {
|
||||
req.Files = append(req.Files, fmt.Sprintf("part.%d.meta", i))
|
||||
}
|
||||
|
||||
var disk StorageAPI
|
||||
disks := er.getOnlineLocalDisks()
|
||||
if len(disks) == 0 {
|
||||
// using er.getOnlineLocalDisks() has one side-affect where
|
||||
// on a pooled setup all disks are remote, add a fallback
|
||||
disks = er.getOnlineDisks()
|
||||
}
|
||||
|
||||
for _, disk = range disks {
|
||||
if disk == nil {
|
||||
continue
|
||||
// List parts in quorum
|
||||
partNums, err := er.listParts(ctx, onlineDisks, partPath, readQuorum)
|
||||
if err != nil {
|
||||
// This means that fi.DataDir, is not yet populated so we
|
||||
// return an empty response.
|
||||
if errors.Is(err, errFileNotFound) {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
if !disk.IsOnline() {
|
||||
continue
|
||||
}
|
||||
|
||||
break
|
||||
return result, toObjectErr(err, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
g := errgroup.WithNErrs(len(req.Files)).WithConcurrency(32)
|
||||
|
||||
partsInfo := make([]*ObjectPartInfo, len(req.Files))
|
||||
for i, file := range req.Files {
|
||||
file := file
|
||||
partN := i + start
|
||||
i := i
|
||||
|
||||
g.Go(func() error {
|
||||
buf, err := disk.ReadAll(ctx, minioMetaMultipartBucket, pathJoin(partPath, file))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pinfo := &ObjectPartInfo{}
|
||||
_, err = pinfo.UnmarshalMsg(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if partN != pinfo.Number {
|
||||
return fmt.Errorf("part.%d.meta has incorrect corresponding part number: expected %d, got %d", partN, partN, pinfo.Number)
|
||||
}
|
||||
|
||||
partsInfo[i] = pinfo
|
||||
return nil
|
||||
}, i)
|
||||
if len(partNums) == 0 {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
g.Wait()
|
||||
|
||||
for _, part := range partsInfo {
|
||||
if part != nil && part.Number != 0 && !part.ModTime.IsZero() {
|
||||
fi.AddObjectPart(part.Number, part.ETag, part.Size, part.ActualSize, part.ModTime, part.Index, part.Checksums)
|
||||
}
|
||||
start := objectPartIndexNums(partNums, partNumberMarker)
|
||||
if start != -1 {
|
||||
partNums = partNums[start+1:]
|
||||
}
|
||||
|
||||
// Only parts with higher part numbers will be listed.
|
||||
parts := fi.Parts
|
||||
result.Parts = make([]PartInfo, 0, len(parts))
|
||||
for _, part := range parts {
|
||||
result.Parts = make([]PartInfo, 0, len(partNums))
|
||||
partMetaPaths := make([]string, len(partNums))
|
||||
for i, part := range partNums {
|
||||
partMetaPaths[i] = pathJoin(partPath, fmt.Sprintf("part.%d.meta", part))
|
||||
}
|
||||
|
||||
// Read parts in quorum
|
||||
objParts, err := readParts(ctx, onlineDisks, minioMetaMultipartBucket, partMetaPaths,
|
||||
partNums, readQuorum)
|
||||
if err != nil {
|
||||
return result, toObjectErr(err, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
count := maxParts
|
||||
for _, objPart := range objParts {
|
||||
result.Parts = append(result.Parts, PartInfo{
|
||||
PartNumber: part.Number,
|
||||
ETag: part.ETag,
|
||||
LastModified: part.ModTime,
|
||||
ActualSize: part.ActualSize,
|
||||
Size: part.Size,
|
||||
ChecksumCRC32: part.Checksums["CRC32"],
|
||||
ChecksumCRC32C: part.Checksums["CRC32C"],
|
||||
ChecksumSHA1: part.Checksums["SHA1"],
|
||||
ChecksumSHA256: part.Checksums["SHA256"],
|
||||
PartNumber: objPart.Number,
|
||||
LastModified: objPart.ModTime,
|
||||
ETag: objPart.ETag,
|
||||
Size: objPart.Size,
|
||||
ActualSize: objPart.ActualSize,
|
||||
ChecksumCRC32: objPart.Checksums["CRC32"],
|
||||
ChecksumCRC32C: objPart.Checksums["CRC32C"],
|
||||
ChecksumSHA1: objPart.Checksums["SHA1"],
|
||||
ChecksumSHA256: objPart.Checksums["SHA256"],
|
||||
})
|
||||
if len(result.Parts) >= maxParts {
|
||||
count--
|
||||
if count == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// If listed entries are more than maxParts, we set IsTruncated as true.
|
||||
if len(parts) > len(result.Parts) {
|
||||
if len(objParts) > len(result.Parts) {
|
||||
result.IsTruncated = true
|
||||
// Make sure to fill next part number marker if IsTruncated is
|
||||
// true for subsequent listing.
|
||||
nextPartNumberMarker := result.Parts[len(result.Parts)-1].PartNumber
|
||||
result.NextPartNumberMarker = nextPartNumberMarker
|
||||
// Make sure to fill next part number marker if IsTruncated is true for subsequent listing.
|
||||
result.NextPartNumberMarker = result.Parts[len(result.Parts)-1].PartNumber
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func readParts(ctx context.Context, disks []StorageAPI, bucket string, partMetaPaths []string, partNumbers []int, readQuorum int) ([]*ObjectPartInfo, error) {
|
||||
func readParts(ctx context.Context, disks []StorageAPI, bucket string, partMetaPaths []string, partNumbers []int, readQuorum int) ([]ObjectPartInfo, error) {
|
||||
g := errgroup.WithNErrs(len(disks))
|
||||
|
||||
objectPartInfos := make([][]*ObjectPartInfo, len(disks))
|
||||
@@ -970,7 +984,7 @@ func readParts(ctx context.Context, disks []StorageAPI, bucket string, partMetaP
|
||||
return nil, err
|
||||
}
|
||||
|
||||
partInfosInQuorum := make([]*ObjectPartInfo, len(partMetaPaths))
|
||||
partInfosInQuorum := make([]ObjectPartInfo, len(partMetaPaths))
|
||||
partMetaQuorumMap := make(map[string]int, len(partNumbers))
|
||||
for pidx := range partMetaPaths {
|
||||
var pinfos []*ObjectPartInfo
|
||||
@@ -1016,22 +1030,22 @@ func readParts(ctx context.Context, disks []StorageAPI, bucket string, partMetaP
|
||||
}
|
||||
|
||||
if pinfo != nil && pinfo.ETag != "" && partMetaQuorumMap[maxETag] >= readQuorum {
|
||||
partInfosInQuorum[pidx] = pinfo
|
||||
partInfosInQuorum[pidx] = *pinfo
|
||||
continue
|
||||
}
|
||||
|
||||
if partMetaQuorumMap[maxPartMeta] == len(disks) {
|
||||
if pinfo != nil && pinfo.Error != "" {
|
||||
partInfosInQuorum[pidx] = &ObjectPartInfo{Error: pinfo.Error}
|
||||
partInfosInQuorum[pidx] = ObjectPartInfo{Error: pinfo.Error}
|
||||
} else {
|
||||
partInfosInQuorum[pidx] = &ObjectPartInfo{
|
||||
partInfosInQuorum[pidx] = ObjectPartInfo{
|
||||
Error: InvalidPart{
|
||||
PartNumber: partNumbers[pidx],
|
||||
}.Error(),
|
||||
}
|
||||
}
|
||||
} else {
|
||||
partInfosInQuorum[pidx] = &ObjectPartInfo{Error: errErasureReadQuorum.Error()}
|
||||
partInfosInQuorum[pidx] = ObjectPartInfo{Error: errErasureReadQuorum.Error()}
|
||||
}
|
||||
}
|
||||
return partInfosInQuorum, nil
|
||||
|
||||
Reference in New Issue
Block a user