mirror of
https://github.com/minio/minio.git
synced 2025-01-23 12:43:16 -05:00
correctly calculate read quorum based on the available fileInfo (#14000)
The current usage of assuming `default` parity of `4` is not correct for all objects stored on MinIO, objects in .minio.sys have maximum parity, healing won't trigger on these objects due to incorrect verification of quorum.
This commit is contained in:
parent
3e28af1723
commit
79df2c7ce7
@ -286,12 +286,10 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
|
||||
// Initialize heal result object
|
||||
result = madmin.HealResultItem{
|
||||
Type: madmin.HealItemObject,
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
DiskCount: len(storageDisks),
|
||||
ParityBlocks: er.defaultParityCount,
|
||||
DataBlocks: len(storageDisks) - er.defaultParityCount,
|
||||
Type: madmin.HealItemObject,
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
DiskCount: len(storageDisks),
|
||||
}
|
||||
|
||||
if !opts.NoLock {
|
||||
@ -306,18 +304,27 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
|
||||
// Re-read when we have lock...
|
||||
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, object, versionID, true)
|
||||
if isAllNotFound(errs) {
|
||||
// Nothing to do, file is already gone.
|
||||
return er.defaultHealResult(FileInfo{}, storageDisks, storageEndpoints,
|
||||
errs, bucket, object, versionID), nil
|
||||
}
|
||||
|
||||
if _, err = getLatestFileInfo(ctx, partsMetadata, errs); err != nil {
|
||||
readQuorum, _, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
|
||||
if err != nil {
|
||||
return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, []error{}, opts)
|
||||
}
|
||||
|
||||
result.ParityBlocks = result.DiskCount - readQuorum
|
||||
result.DataBlocks = readQuorum
|
||||
|
||||
// List of disks having latest version of the object er.meta
|
||||
// (by modtime).
|
||||
onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||
|
||||
// Latest FileInfo for reference. If a valid metadata is not
|
||||
// present, it is as good as object not found.
|
||||
latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, result.DataBlocks)
|
||||
latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, readQuorum)
|
||||
if err != nil {
|
||||
return result, toObjectErr(err, bucket, object, versionID)
|
||||
}
|
||||
@ -392,13 +399,9 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
}
|
||||
|
||||
if isAllNotFound(errs) {
|
||||
err = toObjectErr(errFileNotFound, bucket, object)
|
||||
if versionID != "" {
|
||||
err = toObjectErr(errFileVersionNotFound, bucket, object, versionID)
|
||||
}
|
||||
// File is fully gone, fileInfo is empty.
|
||||
return er.defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs,
|
||||
bucket, object, versionID), err
|
||||
bucket, object, versionID), nil
|
||||
}
|
||||
|
||||
// If less than read quorum number of disks have all the parts
|
||||
@ -663,7 +666,7 @@ func (er erasureObjects) healObjectDir(ctx context.Context, bucket, object strin
|
||||
}
|
||||
if dryRun || danglingObject || isAllNotFound(errs) {
|
||||
// Nothing to do, file is already gone.
|
||||
return hr, toObjectErr(errFileNotFound, bucket, object)
|
||||
return hr, nil
|
||||
}
|
||||
for i, err := range errs {
|
||||
if err == errVolumeNotFound || err == errFileNotFound {
|
||||
@ -768,8 +771,15 @@ func statAllDirs(ctx context.Context, storageDisks []StorageAPI, bucket, prefix
|
||||
// A 0 length slice will always return false.
|
||||
func isAllNotFound(errs []error) bool {
|
||||
for _, err := range errs {
|
||||
if errors.Is(err, errFileNotFound) || errors.Is(err, errVolumeNotFound) || errors.Is(err, errFileVersionNotFound) {
|
||||
continue
|
||||
if err != nil {
|
||||
switch err.Error() {
|
||||
case errFileNotFound.Error():
|
||||
fallthrough
|
||||
case errVolumeNotFound.Error():
|
||||
fallthrough
|
||||
case errFileVersionNotFound.Error():
|
||||
continue
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@ -329,6 +329,131 @@ func TestHealingDanglingObject(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealCorrectQuorum(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
resetGlobalHealState()
|
||||
defer resetGlobalHealState()
|
||||
|
||||
nDisks := 32
|
||||
fsDirs, err := getRandomDisks(nDisks)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
defer removeRoots(fsDirs)
|
||||
|
||||
pools := mustGetPoolEndpoints(fsDirs[:16]...)
|
||||
pools = append(pools, mustGetPoolEndpoints(fsDirs[16:]...)...)
|
||||
|
||||
// Everything is fine, should return nil
|
||||
objLayer, _, err := initObjectLayer(ctx, pools)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
bucket := getRandomBucketName()
|
||||
object := getRandomObjectName()
|
||||
data := bytes.Repeat([]byte("a"), 5*1024*1024)
|
||||
var opts ObjectOptions
|
||||
|
||||
err = objLayer.MakeBucketWithLocation(ctx, bucket, BucketOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to make a bucket - %v", err)
|
||||
}
|
||||
|
||||
// Create an object with multiple parts uploaded in decreasing
|
||||
// part number.
|
||||
uploadID, err := objLayer.NewMultipartUpload(ctx, bucket, object, opts)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create a multipart upload - %v", err)
|
||||
}
|
||||
|
||||
var uploadedParts []CompletePart
|
||||
for _, partID := range []int{2, 1} {
|
||||
pInfo, err1 := objLayer.PutObjectPart(ctx, bucket, object, uploadID, partID, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), opts)
|
||||
if err1 != nil {
|
||||
t.Fatalf("Failed to upload a part - %v", err1)
|
||||
}
|
||||
uploadedParts = append(uploadedParts, CompletePart{
|
||||
PartNumber: pInfo.PartNumber,
|
||||
ETag: pInfo.ETag,
|
||||
})
|
||||
}
|
||||
|
||||
_, err = objLayer.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, ObjectOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to complete multipart upload - %v", err)
|
||||
}
|
||||
|
||||
cfgFile := pathJoin(bucketConfigPrefix, bucket, ".test.bin")
|
||||
if err = saveConfig(ctx, objLayer, cfgFile, data); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
hopts := madmin.HealOpts{
|
||||
DryRun: false,
|
||||
Remove: true,
|
||||
ScanMode: madmin.HealNormalScan,
|
||||
}
|
||||
|
||||
// Test 1: Remove the object backend files from the first disk.
|
||||
z := objLayer.(*erasureServerPools)
|
||||
for _, set := range z.serverPools {
|
||||
er := set.sets[0]
|
||||
erasureDisks := er.getDisks()
|
||||
|
||||
fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
||||
nfi, err := getLatestFileInfo(ctx, fileInfos, errs)
|
||||
if errors.Is(err, errFileNotFound) {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to getLatestFileInfo - %v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < nfi.Erasure.ParityBlocks; i++ {
|
||||
erasureDisks[i].Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false)
|
||||
}
|
||||
|
||||
// Try healing now, it should heal the content properly.
|
||||
_, err = objLayer.HealObject(ctx, bucket, object, "", hopts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
|
||||
if countErrs(errs, nil) != len(fileInfos) {
|
||||
t.Fatal("Expected all xl.meta healed, but partial heal detected")
|
||||
}
|
||||
|
||||
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, minioMetaBucket, cfgFile, "", false)
|
||||
nfi, err = getLatestFileInfo(ctx, fileInfos, errs)
|
||||
if errors.Is(err, errFileNotFound) {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to getLatestFileInfo - %v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < nfi.Erasure.ParityBlocks; i++ {
|
||||
erasureDisks[i].Delete(context.Background(), minioMetaBucket, pathJoin(cfgFile, xlStorageFormatFile), false)
|
||||
}
|
||||
|
||||
// Try healing now, it should heal the content properly.
|
||||
_, err = objLayer.HealObject(ctx, minioMetaBucket, cfgFile, "", hopts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, minioMetaBucket, cfgFile, "", false)
|
||||
if countErrs(errs, nil) != len(fileInfos) {
|
||||
t.Fatal("Expected all xl.meta healed, but partial heal detected")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealObjectCorruptedPools(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
Loading…
x
Reference in New Issue
Block a user