mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
fix: quorum calculation mistake with reduced parity (#10186)
With reduced parity our write quorum should be same as read quorum, but code was still assuming ``` readQuorum+1 ``` In all situations which is not necessary.
This commit is contained in:
parent
d61eac080b
commit
b68bc75dad
@ -88,9 +88,10 @@ func (fi FileInfo) IsValid() bool {
|
||||
// for erasure coding information
|
||||
return true
|
||||
}
|
||||
data := fi.Erasure.DataBlocks
|
||||
parity := fi.Erasure.ParityBlocks
|
||||
return ((data >= parity) && (data != 0) && (parity != 0))
|
||||
dataBlocks := fi.Erasure.DataBlocks
|
||||
parityBlocks := fi.Erasure.ParityBlocks
|
||||
return ((dataBlocks >= parityBlocks) &&
|
||||
(dataBlocks != 0) && (parityBlocks != 0))
|
||||
}
|
||||
|
||||
// ToObjectInfo - Converts metadata to object info.
|
||||
@ -323,7 +324,18 @@ func objectQuorumFromMeta(ctx context.Context, er erasureObjects, partsMetaData
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
dataBlocks := latestFileInfo.Erasure.DataBlocks
|
||||
parityBlocks := globalStorageClass.GetParityForSC(latestFileInfo.Metadata[xhttp.AmzStorageClass])
|
||||
if parityBlocks == 0 {
|
||||
parityBlocks = dataBlocks
|
||||
}
|
||||
|
||||
writeQuorum := dataBlocks
|
||||
if dataBlocks == parityBlocks {
|
||||
writeQuorum = dataBlocks + 1
|
||||
}
|
||||
|
||||
// Since all the valid erasure code meta updated at the same time are equivalent, pass dataBlocks
|
||||
// from latestFileInfo to get the quorum
|
||||
return latestFileInfo.Erasure.DataBlocks, latestFileInfo.Erasure.DataBlocks + 1, nil
|
||||
return dataBlocks, writeQuorum, nil
|
||||
}
|
||||
|
@ -367,7 +367,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
||||
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||
}
|
||||
|
||||
n, err := erasure.Encode(ctx, data, writers, buffer, fi.Erasure.DataBlocks+1)
|
||||
n, err := erasure.Encode(ctx, data, writers, buffer, writeQuorum)
|
||||
closeBitrotWriters(writers)
|
||||
if err != nil {
|
||||
return pi, toObjectErr(err, bucket, object)
|
||||
|
@ -670,7 +670,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||
}
|
||||
|
||||
n, erasureErr := erasure.Encode(ctx, data, writers, buffer, fi.Erasure.DataBlocks+1)
|
||||
n, erasureErr := erasure.Encode(ctx, data, writers, buffer, writeQuorum)
|
||||
closeBitrotWriters(writers)
|
||||
if erasureErr != nil {
|
||||
return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj)
|
||||
|
@ -420,6 +420,8 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
||||
|
||||
parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "")
|
||||
|
||||
parts1SC := globalStorageClass
|
||||
|
||||
// Object for test case 2 - No StorageClass defined, MetaData in PutObject requesting RRS Class
|
||||
object2 := "object2"
|
||||
metadata2 := make(map[string]string)
|
||||
@ -430,6 +432,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
||||
}
|
||||
|
||||
parts2, errs2 := readAllFileInfo(ctx, erasureDisks, bucket, object2, "")
|
||||
parts2SC := globalStorageClass
|
||||
|
||||
// Object for test case 3 - No StorageClass defined, MetaData in PutObject requesting Standard Storage Class
|
||||
object3 := "object3"
|
||||
@ -441,6 +444,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
||||
}
|
||||
|
||||
parts3, errs3 := readAllFileInfo(ctx, erasureDisks, bucket, object3, "")
|
||||
parts3SC := globalStorageClass
|
||||
|
||||
// Object for test case 4 - Standard StorageClass defined as Parity 6, MetaData in PutObject requesting Standard Storage Class
|
||||
object4 := "object4"
|
||||
@ -458,6 +462,11 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
||||
}
|
||||
|
||||
parts4, errs4 := readAllFileInfo(ctx, erasureDisks, bucket, object4, "")
|
||||
parts4SC := storageclass.Config{
|
||||
Standard: storageclass.StorageClass{
|
||||
Parity: 6,
|
||||
},
|
||||
}
|
||||
|
||||
// Object for test case 5 - RRS StorageClass defined as Parity 2, MetaData in PutObject requesting RRS Class
|
||||
// Reset global storage class flags
|
||||
@ -476,6 +485,11 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
||||
}
|
||||
|
||||
parts5, errs5 := readAllFileInfo(ctx, erasureDisks, bucket, object5, "")
|
||||
parts5SC := storageclass.Config{
|
||||
RRS: storageclass.StorageClass{
|
||||
Parity: 2,
|
||||
},
|
||||
}
|
||||
|
||||
// Object for test case 6 - RRS StorageClass defined as Parity 2, MetaData in PutObject requesting Standard Storage Class
|
||||
object6 := "object6"
|
||||
@ -493,6 +507,11 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
||||
}
|
||||
|
||||
parts6, errs6 := readAllFileInfo(ctx, erasureDisks, bucket, object6, "")
|
||||
parts6SC := storageclass.Config{
|
||||
RRS: storageclass.StorageClass{
|
||||
Parity: 2,
|
||||
},
|
||||
}
|
||||
|
||||
// Object for test case 7 - Standard StorageClass defined as Parity 5, MetaData in PutObject requesting RRS Class
|
||||
// Reset global storage class flags
|
||||
@ -511,25 +530,32 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
||||
}
|
||||
|
||||
parts7, errs7 := readAllFileInfo(ctx, erasureDisks, bucket, object7, "")
|
||||
parts7SC := storageclass.Config{
|
||||
Standard: storageclass.StorageClass{
|
||||
Parity: 5,
|
||||
},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
parts []FileInfo
|
||||
errs []error
|
||||
expectedReadQuorum int
|
||||
expectedWriteQuorum int
|
||||
storageClassCfg storageclass.Config
|
||||
expectedError error
|
||||
}{
|
||||
{parts1, errs1, 8, 9, nil},
|
||||
{parts2, errs2, 14, 15, nil},
|
||||
{parts3, errs3, 8, 9, nil},
|
||||
{parts4, errs4, 10, 11, nil},
|
||||
{parts5, errs5, 14, 15, nil},
|
||||
{parts6, errs6, 8, 9, nil},
|
||||
{parts7, errs7, 14, 15, nil},
|
||||
{parts1, errs1, 8, 9, parts1SC, nil},
|
||||
{parts2, errs2, 14, 14, parts2SC, nil},
|
||||
{parts3, errs3, 8, 9, parts3SC, nil},
|
||||
{parts4, errs4, 10, 10, parts4SC, nil},
|
||||
{parts5, errs5, 14, 14, parts5SC, nil},
|
||||
{parts6, errs6, 8, 9, parts6SC, nil},
|
||||
{parts7, errs7, 14, 14, parts7SC, nil},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.(*testing.T).Run("", func(t *testing.T) {
|
||||
globalStorageClass = tt.storageClassCfg
|
||||
actualReadQuorum, actualWriteQuorum, err := objectQuorumFromMeta(ctx, *xl, tt.parts, tt.errs)
|
||||
if tt.expectedError != nil && err == nil {
|
||||
t.Errorf("Expected %s, got %s", tt.expectedError, err)
|
||||
|
Loading…
Reference in New Issue
Block a user