heal: Update object parity with the latest configured SC (#17187)

This commit is contained in:
Anis Eleuch 2023-05-16 05:32:13 +01:00 committed by GitHub
parent ef2fc0f99e
commit e2b7a08c10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 77 additions and 17 deletions

View File

@ -27,6 +27,7 @@ import (
"time" "time"
"github.com/minio/madmin-go/v2" "github.com/minio/madmin-go/v2"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/minio/pkg/sync/errgroup" "github.com/minio/pkg/sync/errgroup"
) )
@ -407,12 +408,31 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
availableDisks, dataErrs, diskMTime := disksWithAllParts(ctx, onlineDisks, partsMetadata, availableDisks, dataErrs, diskMTime := disksWithAllParts(ctx, onlineDisks, partsMetadata,
errs, latestMeta, bucket, object, scanMode) errs, latestMeta, bucket, object, scanMode)
var erasure Erasure var (
var recreate bool reencode, recreate bool
erasureWriter, erasureReader Erasure
)
if !latestMeta.Deleted && !latestMeta.IsRemote() { if !latestMeta.Deleted && !latestMeta.IsRemote() {
erasureReader, err = NewErasure(ctx, latestMeta.Erasure.DataBlocks, latestMeta.Erasure.ParityBlocks, latestMeta.Erasure.BlockSize)
if err != nil {
return result, err
}
parity := erasureReader.parityBlocks
if opts.UpdateParity && bucket != minioMetaBucket {
p := globalStorageClass.GetParityForSC(latestMeta.Metadata[xhttp.AmzStorageClass])
if p < 0 {
return result, fmt.Errorf("invalid parity number: %d", p)
}
if p != parity {
reencode = true
parity = p
}
}
// Initialize erasure coding // Initialize erasure coding
erasure, err = NewErasure(ctx, latestMeta.Erasure.DataBlocks, erasureWriter, err = NewErasure(ctx, len(storageDisks)-parity, parity, latestMeta.Erasure.BlockSize)
latestMeta.Erasure.ParityBlocks, latestMeta.Erasure.BlockSize)
if err != nil { if err != nil {
return result, err return result, err
} }
@ -424,7 +444,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
recreate = (opts.Recreate && recreate = (opts.Recreate &&
!latestMeta.InlineData() && !latestMeta.InlineData() &&
len(latestMeta.Parts) == 1 && len(latestMeta.Parts) == 1 &&
erasure.ShardFileSize(latestMeta.Parts[0].ActualSize) < smallFileThreshold) erasureWriter.ShardFileSize(latestMeta.Parts[0].ActualSize) < smallFileThreshold)
} }
result.ObjectSize, err = latestMeta.GetActualSize() result.ObjectSize, err = latestMeta.GetActualSize()
@ -453,7 +473,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
driveState = madmin.DriveStateCorrupt driveState = madmin.DriveStateCorrupt
} }
if shouldHealObjectOnDisk(errs[i], dataErrs[i], partsMetadata[i], latestMeta, recreate) { if reencode || shouldHealObjectOnDisk(errs[i], dataErrs[i], partsMetadata[i], latestMeta, recreate) {
outDatedDisks[i] = storageDisks[i] outDatedDisks[i] = storageDisks[i]
disksToHealCount++ disksToHealCount++
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{ result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
@ -501,7 +521,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
return result, nil return result, nil
} }
if !latestMeta.XLV1 && !latestMeta.Deleted && !recreate && disksToHealCount > latestMeta.Erasure.ParityBlocks { if !latestMeta.XLV1 && !latestMeta.Deleted && !recreate && !reencode && disksToHealCount > latestMeta.Erasure.ParityBlocks {
// When disk to heal count is greater than parity blocks we should simply error out. // When disk to heal count is greater than parity blocks we should simply error out.
err := fmt.Errorf("more drives are expected to heal than parity, returned errors: %v (dataErrs %v) -> %s/%s(%s)", errs, dataErrs, bucket, object, versionID) err := fmt.Errorf("more drives are expected to heal than parity, returned errors: %v (dataErrs %v) -> %s/%s(%s)", errs, dataErrs, bucket, object, versionID)
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
@ -588,6 +608,10 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
inlineBuffers = make([]*bytes.Buffer, len(outDatedDisks)) inlineBuffers = make([]*bytes.Buffer, len(outDatedDisks))
} }
// Heal each part. The healed data will be written
// to .minio.sys/tmp/uuid/ which needs to be renamed
// later to the final location.
erasureInfo := latestMeta.Erasure erasureInfo := latestMeta.Erasure
for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ { for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ {
partSize := latestMeta.Parts[partIndex].Size partSize := latestMeta.Parts[partIndex].Size
@ -596,17 +620,20 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
partNumber := latestMeta.Parts[partIndex].Number partNumber := latestMeta.Parts[partIndex].Number
partIdx := latestMeta.Parts[partIndex].Index partIdx := latestMeta.Parts[partIndex].Index
partChecksums := latestMeta.Parts[partIndex].Checksums partChecksums := latestMeta.Parts[partIndex].Checksums
tillOffset := erasure.ShardFileOffset(0, partSize, partSize)
readers := make([]io.ReaderAt, len(latestDisks)) readers := make([]io.ReaderAt, len(latestDisks))
checksumAlgo := erasureInfo.GetChecksumInfo(partNumber).Algorithm checksumAlgo := erasureInfo.GetChecksumInfo(partNumber).Algorithm
origPartErasureSize := erasureReader.ShardFileOffset(0, partSize, partSize)
newPartErasureSize := erasureWriter.ShardFileOffset(0, partSize, partSize)
for i, disk := range latestDisks { for i, disk := range latestDisks {
if disk == OfflineDisk { if disk == OfflineDisk {
continue continue
} }
checksumInfo := copyPartsMetadata[i].Erasure.GetChecksumInfo(partNumber) checksumInfo := copyPartsMetadata[i].Erasure.GetChecksumInfo(partNumber)
partPath := pathJoin(object, srcDataDir, fmt.Sprintf("part.%d", partNumber)) partPath := pathJoin(object, srcDataDir, fmt.Sprintf("part.%d", partNumber))
readers[i] = newBitrotReader(disk, copyPartsMetadata[i].Data, bucket, partPath, tillOffset, checksumAlgo, readers[i] = newBitrotReader(disk, copyPartsMetadata[i].Data, bucket, partPath, origPartErasureSize, checksumAlgo,
checksumInfo.Hash, erasure.ShardSize()) checksumInfo.Hash, erasureReader.ShardSize())
} }
writers := make([]io.Writer, len(outDatedDisks)) writers := make([]io.Writer, len(outDatedDisks))
for i, disk := range outDatedDisks { for i, disk := range outDatedDisks {
@ -615,18 +642,35 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
} }
partPath := pathJoin(tmpID, dstDataDir, fmt.Sprintf("part.%d", partNumber)) partPath := pathJoin(tmpID, dstDataDir, fmt.Sprintf("part.%d", partNumber))
if len(inlineBuffers) > 0 { if len(inlineBuffers) > 0 {
inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, erasure.ShardFileSize(latestMeta.Size)+32)) inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, erasureWriter.ShardFileSize(latestMeta.Size)+32))
writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize()) writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasureWriter.ShardSize())
} else { } else {
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath, writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath,
tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) newPartErasureSize, DefaultBitrotAlgorithm, erasureWriter.ShardSize())
} }
} }
// Heal each part. erasure.Heal() will write the healed if reencode {
// part to .minio/tmp/uuid/ which needs to be renamed rd, rw := io.Pipe()
// later to the final location. go func() {
err = erasure.Heal(ctx, writers, readers, partSize) n, err := erasureReader.Decode(ctx, rw, readers, 0, partSize, partSize, nil)
if err == nil && n != partSize {
err = errors.New("unexpected content")
}
rw.CloseWithError(err)
}()
buffer := er.bp.Get()
n, encodeErr := erasureWriter.Encode(ctx, rd, writers, buffer, erasureWriter.dataBlocks)
er.bp.Put(buffer)
if encodeErr == nil && n != partSize {
encodeErr = errors.New("unexpected content")
}
rd.CloseWithError(encodeErr)
err = encodeErr
} else {
err = erasureReader.Heal(ctx, writers, readers, partSize)
}
closeBitrotReaders(readers) closeBitrotReaders(readers)
closeBitrotWriters(writers) closeBitrotWriters(writers)
if err != nil { if err != nil {
@ -648,6 +692,13 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
continue continue
} }
if reencode {
partsMetadata[i].SetErasureParityUpdated(erasureReader.parityBlocks, erasureWriter.parityBlocks)
}
partsMetadata[i].Erasure.ParityBlocks = erasureWriter.parityBlocks
partsMetadata[i].Erasure.DataBlocks = erasureWriter.dataBlocks
partsMetadata[i].DataDir = dstDataDir partsMetadata[i].DataDir = dstDataDir
partsMetadata[i].AddObjectPart(partNumber, "", partSize, partActualSize, partModTime, partIdx, partChecksums) partsMetadata[i].AddObjectPart(partNumber, "", partSize, partActualSize, partModTime, partIdx, partChecksums)
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{ partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{

View File

@ -18,6 +18,7 @@
package cmd package cmd
import ( import (
"fmt"
"time" "time"
) )
@ -297,6 +298,14 @@ func (fi *FileInfo) SetInlineData() {
fi.Metadata[ReservedMetadataPrefixLower+"inline-data"] = "true" fi.Metadata[ReservedMetadataPrefixLower+"inline-data"] = "true"
} }
// SetErasureParityUpdated adds trace information about an object parity update during healing
func (fi *FileInfo) SetErasureParityUpdated(old, new int) {
if fi.Metadata == nil {
fi.Metadata = make(map[string]string, 1)
}
fi.Metadata[ReservedMetadataPrefixLower+"erasure-parity-update"] = fmt.Sprintf("%d->%d", old, new)
}
// VersionPurgeStatusKey denotes purge status in metadata // VersionPurgeStatusKey denotes purge status in metadata
const ( const (
VersionPurgeStatusKey = ReservedMetadataPrefixLower + "purgestatus" VersionPurgeStatusKey = ReservedMetadataPrefixLower + "purgestatus"