allow healing to prefer local disks over remote (#17788)

This commit is contained in:
Harshavardhana 2023-08-03 02:18:18 -07:00 committed by GitHub
parent 4a4950fe41
commit 45fb375c41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 9 additions and 3 deletions

View File

@ -289,12 +289,15 @@ func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []io.Read
}
// Heal reads from readers, reconstruct shards and writes the data to the writers.
func (e Erasure) Heal(ctx context.Context, writers []io.Writer, readers []io.ReaderAt, totalLength int64) (derr error) {
func (e Erasure) Heal(ctx context.Context, writers []io.Writer, readers []io.ReaderAt, totalLength int64, prefer []bool) (derr error) {
if len(writers) != e.parityBlocks+e.dataBlocks {
return errInvalidArgument
}
reader := newParallelReader(readers, e, 0, totalLength)
if len(readers) == len(prefer) {
reader.preferReaders(prefer)
}
startBlock := int64(0)
endBlock := totalLength / e.blockSize

View File

@ -132,7 +132,7 @@ func TestErasureHeal(t *testing.T) {
}
// test case setup is complete - now call Heal()
err = erasure.Heal(context.Background(), staleWriters, readers, test.size)
err = erasure.Heal(context.Background(), staleWriters, readers, test.size, nil)
closeBitrotReaders(readers)
closeBitrotWriters(staleWriters)
if err != nil && !test.shouldFail {

View File

@ -661,6 +661,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
partChecksums := latestMeta.Parts[partIndex].Checksums
tillOffset := erasure.ShardFileOffset(0, partSize, partSize)
readers := make([]io.ReaderAt, len(latestDisks))
prefer := make([]bool, len(latestDisks))
checksumAlgo := erasureInfo.GetChecksumInfo(partNumber).Algorithm
for i, disk := range latestDisks {
if disk == OfflineDisk {
@ -670,6 +671,8 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
partPath := pathJoin(object, srcDataDir, fmt.Sprintf("part.%d", partNumber))
readers[i] = newBitrotReader(disk, copyPartsMetadata[i].Data, bucket, partPath, tillOffset, checksumAlgo,
checksumInfo.Hash, erasure.ShardSize())
prefer[i] = disk.Hostname() == ""
}
writers := make([]io.Writer, len(outDatedDisks))
for i, disk := range outDatedDisks {
@ -689,7 +692,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
// Heal each part. erasure.Heal() will write the healed
// part to .minio/tmp/uuid/ which needs to be renamed
// later to the final location.
err = erasure.Heal(ctx, writers, readers, partSize)
err = erasure.Heal(ctx, writers, readers, partSize, prefer)
closeBitrotReaders(readers)
closeBitrotWriters(writers)
if err != nil {