fix: legacy objects with 10MiB blockSize should use right buffers (#12459)

healing code was using incorrect buffers to heal older
objects with 10MiB erasure blockSize, incorrect calculation
of such buffers can lead to incorrect premature closure of
io.Pipe() during healing.

fixes #12410
This commit is contained in:
Harshavardhana 2021-06-07 10:06:06 -07:00 committed by GitHub
parent dd2831c1a0
commit 542fe4ea2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 3 deletions

View File

@ -443,7 +443,10 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
} }
erasureInfo := latestMeta.Erasure erasureInfo := latestMeta.Erasure
bp := er.bp
if erasureInfo.BlockSize == blockSizeV1 {
bp = er.bpOld
}
for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ { for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ {
partSize := latestMeta.Parts[partIndex].Size partSize := latestMeta.Parts[partIndex].Size
partActualSize := latestMeta.Parts[partIndex].ActualSize partActualSize := latestMeta.Parts[partIndex].ActualSize
@ -457,7 +460,8 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
} }
checksumInfo := copyPartsMetadata[i].Erasure.GetChecksumInfo(partNumber) checksumInfo := copyPartsMetadata[i].Erasure.GetChecksumInfo(partNumber)
partPath := pathJoin(object, srcDataDir, fmt.Sprintf("part.%d", partNumber)) partPath := pathJoin(object, srcDataDir, fmt.Sprintf("part.%d", partNumber))
readers[i] = newBitrotReader(disk, partsMetadata[i].Data, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize()) readers[i] = newBitrotReader(disk, partsMetadata[i].Data, bucket, partPath, tillOffset, checksumAlgo,
checksumInfo.Hash, erasure.ShardSize())
} }
writers := make([]io.Writer, len(outDatedDisks)) writers := make([]io.Writer, len(outDatedDisks))
for i, disk := range outDatedDisks { for i, disk := range outDatedDisks {
@ -473,7 +477,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize())
} }
} }
err = erasure.Heal(ctx, readers, writers, partSize, er.bp) err = erasure.Heal(ctx, readers, writers, partSize, bp)
closeBitrotReaders(readers) closeBitrotReaders(readers)
closeBitrotWriters(writers) closeBitrotWriters(writers)
if err != nil { if err != nil {

View File

@ -391,6 +391,14 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
// setCount * setDriveCount with each memory upto blockSizeV2. // setCount * setDriveCount with each memory upto blockSizeV2.
bp := bpool.NewBytePoolCap(n, blockSizeV2, blockSizeV2*2) bp := bpool.NewBytePoolCap(n, blockSizeV2, blockSizeV2*2)
// Initialize byte pool for all sets, bpool size is set to
// setCount * setDriveCount with each memory upto blockSizeV1
//
// Number of buffers, max 10GiB
m := (10 * humanize.GiByte) / (blockSizeV1 * 2)
bpOld := bpool.NewBytePoolCap(m, blockSizeV1, blockSizeV1*2)
for i := 0; i < setCount; i++ { for i := 0; i < setCount; i++ {
s.erasureDisks[i] = make([]StorageAPI, setDriveCount) s.erasureDisks[i] = make([]StorageAPI, setDriveCount)
} }
@ -440,6 +448,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
deletedCleanupSleeper: newDynamicSleeper(10, 2*time.Second), deletedCleanupSleeper: newDynamicSleeper(10, 2*time.Second),
nsMutex: mutex, nsMutex: mutex,
bp: bp, bp: bp,
bpOld: bpOld,
mrfOpCh: make(chan partialOperation, 10000), mrfOpCh: make(chan partialOperation, 10000),
} }
} }

View File

@ -74,6 +74,10 @@ type erasureObjects struct {
// Byte pools used for temporary i/o buffers. // Byte pools used for temporary i/o buffers.
bp *bpool.BytePoolCap bp *bpool.BytePoolCap
// Byte pools used for temporary i/o buffers,
// legacy objects.
bpOld *bpool.BytePoolCap
mrfOpCh chan partialOperation mrfOpCh chan partialOperation
deletedCleanupSleeper *dynamicSleeper deletedCleanupSleeper *dynamicSleeper