refactor objectQuorumFromMeta() to search for parity quorum (#15844)

This commit is contained in:
Anis Elleuch
2022-10-13 00:42:45 +01:00
committed by GitHub
parent 97112c69be
commit 783dd875f7
3 changed files with 109 additions and 73 deletions

View File

@@ -394,21 +394,73 @@ func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, bucket, prefix
return evalDisks(disks, mErrs), err
}
func commonParity(parities []int) int {
occMap := make(map[int]int)
for _, p := range parities {
occMap[p]++
}
var maxOcc, commonParity int
for parity, occ := range occMap {
if parity == -1 {
// Ignore non defined parity
continue
}
if occ >= maxOcc {
maxOcc = occ
commonParity = parity
}
}
if maxOcc == 0 {
// Did not found anything useful
return -1
}
return commonParity
}
func listObjectParities(partsMetadata []FileInfo, errs []error) (parities []int) {
parities = make([]int, len(partsMetadata))
for index, metadata := range partsMetadata {
if errs[index] != nil {
parities[index] = -1
continue
}
parities[index] = metadata.Erasure.ParityBlocks
}
return
}
// Returns per object readQuorum and writeQuorum
// readQuorum is the min required disks to read data.
// writeQuorum is the min required disks to write data.
func objectQuorumFromMeta(ctx context.Context, partsMetaData []FileInfo, errs []error, defaultParityCount int) (objectReadQuorum, objectWriteQuorum int, err error) {
// get the latest updated Metadata and a count of all the latest updated FileInfo(s)
latestFileInfo, err := getLatestFileInfo(ctx, partsMetaData, defaultParityCount, errs)
if err != nil {
return 0, 0, err
// There should be atleast half correct entries, if not return failure
expectedRQuorum := len(partsMetaData) / 2
if defaultParityCount == 0 {
// if parity count is '0', we expected all entries to be present.
expectedRQuorum = len(partsMetaData)
}
if latestFileInfo.Deleted {
// special case when parity is '0'
if defaultParityCount == 0 {
return len(partsMetaData), len(partsMetaData), nil
}
reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, expectedRQuorum)
if reducedErr != nil {
return -1, -1, reducedErr
}
// special case when parity is '0'
if defaultParityCount == 0 {
return len(partsMetaData), len(partsMetaData), nil
}
parities := listObjectParities(partsMetaData, errs)
parityBlocks := commonParity(parities)
if parityBlocks < 0 {
return -1, -1, errErasureReadQuorum
}
if parityBlocks == 0 {
// For delete markers do not use 'defaultParityCount' as it is not expected to be the case.
// Use maximum allowed read quorum instead, writeQuorum+1 is returned for compatibility sake
// but there are no callers that shall be using this.
@@ -416,23 +468,7 @@ func objectQuorumFromMeta(ctx context.Context, partsMetaData []FileInfo, errs []
return readQuorum, readQuorum + 1, nil
}
parityBlocks := globalStorageClass.GetParityForSC(latestFileInfo.Metadata[xhttp.AmzStorageClass])
if parityBlocks < 0 {
parityBlocks = defaultParityCount
}
// For erasure code upgraded objects choose the parity
// blocks saved internally, instead of 'defaultParityCount'
if _, ok := latestFileInfo.Metadata[minIOErasureUpgraded]; ok {
if latestFileInfo.Erasure.ParityBlocks != 0 {
parityBlocks = latestFileInfo.Erasure.ParityBlocks
}
}
dataBlocks := latestFileInfo.Erasure.DataBlocks
if dataBlocks == 0 {
dataBlocks = len(partsMetaData) - parityBlocks
}
dataBlocks := len(partsMetaData) - parityBlocks
writeQuorum := dataBlocks
if dataBlocks == parityBlocks {