do not heal outdated disks > parityBlocks (#14976)

this PR also fixes a situation where incorrect
partsMetadata slice was used where fi.Data was
re-used from a single drive causing duplication
of the shards across all drives.

This happens for situations where shouldHeal()
returns true for all drives > parityBlocks.

To avoid this we should never attempt to heal on all
drives > parityBlocks, unless we are doing metadata
migration from xl.json -> xl.meta
This commit is contained in:
Harshavardhana 2022-05-25 15:17:10 -07:00 committed by GitHub
parent a4be0b88f6
commit dea8220eee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -418,10 +418,19 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
return result, nil return result, nil
} }
if !latestMeta.XLV1 && !latestMeta.Deleted && disksToHealCount > latestMeta.Erasure.ParityBlocks {
// When disk to heal count is greater than parity blocks we should simply error out.
err := fmt.Errorf("more disks are expected to heal than parity, returned errors: %v -> %s/%s(%s)", errs, bucket, object, versionID)
logger.LogIf(ctx, err)
return er.defaultHealResult(latestMeta, storageDisks, storageEndpoints, errs,
bucket, object, versionID), err
}
cleanFileInfo := func(fi FileInfo) FileInfo { cleanFileInfo := func(fi FileInfo) FileInfo {
// Returns a copy of the 'fi' with checksums and parts nil'ed. // Returns a copy of the 'fi' with erasure index, checksums and inline data niled.
nfi := fi nfi := fi
if !fi.IsRemote() { if !nfi.IsRemote() {
nfi.Data = nil
nfi.Erasure.Index = 0 nfi.Erasure.Index = 0
nfi.Erasure.Checksums = nil nfi.Erasure.Checksums = nil
} }
@ -432,12 +441,25 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
tmpID := mustGetUUID() tmpID := mustGetUUID()
migrateDataDir := mustGetUUID() migrateDataDir := mustGetUUID()
// Reorder so that we have data disks first and parity disks next.
latestDisks := shuffleDisks(availableDisks, latestMeta.Erasure.Distribution)
outDatedDisks = shuffleDisks(outDatedDisks, latestMeta.Erasure.Distribution)
partsMetadata = shufflePartsMetadata(partsMetadata, latestMeta.Erasure.Distribution)
copyPartsMetadata := make([]FileInfo, len(partsMetadata)) copyPartsMetadata := make([]FileInfo, len(partsMetadata))
for i := range latestDisks {
if latestDisks[i] == nil {
continue
}
copyPartsMetadata[i] = partsMetadata[i]
}
for i := range outDatedDisks { for i := range outDatedDisks {
if outDatedDisks[i] == nil { if outDatedDisks[i] == nil {
continue continue
} }
copyPartsMetadata[i] = partsMetadata[i] // Make sure to write the FileInfo information
// that is expected to be in quorum.
partsMetadata[i] = cleanFileInfo(latestMeta) partsMetadata[i] = cleanFileInfo(latestMeta)
} }
@ -456,12 +478,6 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
inlineBuffers = make([]*bytes.Buffer, len(outDatedDisks)) inlineBuffers = make([]*bytes.Buffer, len(outDatedDisks))
} }
// Reorder so that we have data disks first and parity disks next.
latestDisks := shuffleDisks(availableDisks, latestMeta.Erasure.Distribution)
outDatedDisks = shuffleDisks(outDatedDisks, latestMeta.Erasure.Distribution)
partsMetadata = shufflePartsMetadata(partsMetadata, latestMeta.Erasure.Distribution)
copyPartsMetadata = shufflePartsMetadata(copyPartsMetadata, latestMeta.Erasure.Distribution)
if !latestMeta.Deleted && !latestMeta.IsRemote() { if !latestMeta.Deleted && !latestMeta.IsRemote() {
// Heal each part. erasureHealFile() will write the healed // Heal each part. erasureHealFile() will write the healed
// part to .minio/tmp/uuid/ which needs to be renamed later to // part to .minio/tmp/uuid/ which needs to be renamed later to
@ -486,7 +502,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
} }
checksumInfo := copyPartsMetadata[i].Erasure.GetChecksumInfo(partNumber) checksumInfo := copyPartsMetadata[i].Erasure.GetChecksumInfo(partNumber)
partPath := pathJoin(object, srcDataDir, fmt.Sprintf("part.%d", partNumber)) partPath := pathJoin(object, srcDataDir, fmt.Sprintf("part.%d", partNumber))
readers[i] = newBitrotReader(disk, partsMetadata[i].Data, bucket, partPath, tillOffset, checksumAlgo, readers[i] = newBitrotReader(disk, copyPartsMetadata[i].Data, bucket, partPath, tillOffset, checksumAlgo,
checksumInfo.Hash, erasure.ShardSize()) checksumInfo.Hash, erasure.ShardSize())
} }
writers := make([]io.Writer, len(outDatedDisks)) writers := make([]io.Writer, len(outDatedDisks))