mirror of
https://github.com/minio/minio.git
synced 2025-11-07 12:52:58 -05:00
readParts: Return error when quorum unavailable (#20389)
readParts requires that both part.N and part.N.meta files be present. This change addresses an issue with how an error to return to the upper layers was picked from most drives where a UploadPart operation had failed.
This commit is contained in:
committed by
GitHub
parent
b6b7cddc9c
commit
a0f9e9f661
@@ -985,27 +985,25 @@ func readParts(ctx context.Context, disks []StorageAPI, bucket string, partMetaP
|
||||
}
|
||||
|
||||
partInfosInQuorum := make([]ObjectPartInfo, len(partMetaPaths))
|
||||
partMetaQuorumMap := make(map[string]int, len(partNumbers))
|
||||
for pidx := range partMetaPaths {
|
||||
// partMetaQuorumMap uses
|
||||
// - path/to/part.N as key to collate errors from failed drives.
|
||||
// - part ETag to collate part metadata
|
||||
partMetaQuorumMap := make(map[string]int, len(partNumbers))
|
||||
var pinfos []*ObjectPartInfo
|
||||
for idx := range disks {
|
||||
if len(objectPartInfos[idx]) == 0 {
|
||||
if len(objectPartInfos[idx]) != len(partMetaPaths) {
|
||||
partMetaQuorumMap[partMetaPaths[pidx]]++
|
||||
continue
|
||||
}
|
||||
|
||||
pinfo := objectPartInfos[idx][pidx]
|
||||
if pinfo == nil {
|
||||
partMetaQuorumMap[partMetaPaths[pidx]]++
|
||||
continue
|
||||
}
|
||||
|
||||
if pinfo.ETag == "" {
|
||||
partMetaQuorumMap[partMetaPaths[pidx]]++
|
||||
} else {
|
||||
if pinfo != nil && pinfo.ETag != "" {
|
||||
pinfos = append(pinfos, pinfo)
|
||||
partMetaQuorumMap[pinfo.ETag]++
|
||||
continue
|
||||
}
|
||||
partMetaQuorumMap[partMetaPaths[pidx]]++
|
||||
}
|
||||
|
||||
var maxQuorum int
|
||||
@@ -1018,50 +1016,48 @@ func readParts(ctx context.Context, disks []StorageAPI, bucket string, partMetaP
|
||||
maxPartMeta = etag
|
||||
}
|
||||
}
|
||||
|
||||
var pinfo *ObjectPartInfo
|
||||
for _, pinfo = range pinfos {
|
||||
if pinfo != nil && maxETag != "" && pinfo.ETag == maxETag {
|
||||
// found is a representative ObjectPartInfo which either has the maximally occurring ETag or an error.
|
||||
var found *ObjectPartInfo
|
||||
for _, pinfo := range pinfos {
|
||||
if pinfo == nil {
|
||||
continue
|
||||
}
|
||||
if maxETag != "" && pinfo.ETag == maxETag {
|
||||
found = pinfo
|
||||
break
|
||||
}
|
||||
if maxPartMeta != "" && path.Base(maxPartMeta) == fmt.Sprintf("part.%d.meta", pinfo.Number) {
|
||||
if pinfo.ETag == "" && maxPartMeta != "" && path.Base(maxPartMeta) == fmt.Sprintf("part.%d.meta", pinfo.Number) {
|
||||
found = pinfo
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if pinfo != nil && pinfo.ETag != "" && partMetaQuorumMap[maxETag] >= readQuorum {
|
||||
partInfosInQuorum[pidx] = *pinfo
|
||||
if found != nil && found.ETag != "" && partMetaQuorumMap[maxETag] >= readQuorum {
|
||||
partInfosInQuorum[pidx] = *found
|
||||
continue
|
||||
}
|
||||
|
||||
if partMetaQuorumMap[maxPartMeta] == len(disks) {
|
||||
if pinfo != nil && pinfo.Error != "" {
|
||||
partInfosInQuorum[pidx] = ObjectPartInfo{Error: pinfo.Error}
|
||||
} else {
|
||||
partInfosInQuorum[pidx] = ObjectPartInfo{
|
||||
Error: InvalidPart{
|
||||
PartNumber: partNumbers[pidx],
|
||||
}.Error(),
|
||||
}
|
||||
}
|
||||
} else {
|
||||
partInfosInQuorum[pidx] = ObjectPartInfo{Error: errErasureReadQuorum.Error()}
|
||||
partInfosInQuorum[pidx] = ObjectPartInfo{
|
||||
Number: partNumbers[pidx],
|
||||
Error: InvalidPart{
|
||||
PartNumber: partNumbers[pidx],
|
||||
}.Error(),
|
||||
}
|
||||
|
||||
}
|
||||
return partInfosInQuorum, nil
|
||||
}
|
||||
|
||||
func errStrToPartErr(errStr string) error {
|
||||
if strings.Contains(errStr, "file not found") {
|
||||
return InvalidPart{}
|
||||
func objPartToPartErr(part ObjectPartInfo) error {
|
||||
if strings.Contains(part.Error, "file not found") {
|
||||
return InvalidPart{PartNumber: part.Number}
|
||||
}
|
||||
if strings.Contains(errStr, "Specified part could not be found") {
|
||||
return InvalidPart{}
|
||||
if strings.Contains(part.Error, "Specified part could not be found") {
|
||||
return InvalidPart{PartNumber: part.Number}
|
||||
}
|
||||
if strings.Contains(errStr, errErasureReadQuorum.Error()) {
|
||||
if strings.Contains(part.Error, errErasureReadQuorum.Error()) {
|
||||
return errErasureReadQuorum
|
||||
}
|
||||
return errors.New(errStr)
|
||||
return errors.New(part.Error)
|
||||
}
|
||||
|
||||
// CompleteMultipartUpload - completes an ongoing multipart
|
||||
@@ -1196,7 +1192,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
|
||||
for idx, part := range partInfoFiles {
|
||||
if part.Error != "" {
|
||||
err = errStrToPartErr(part.Error)
|
||||
err = objPartToPartErr(part)
|
||||
bugLogIf(ctx, err)
|
||||
return oi, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user