mirror of
https://github.com/minio/minio.git
synced 2025-05-22 18:11:50 -04:00
This commit is contained in:
parent
0f0dcf0c5e
commit
ae95384dd8
@ -27,7 +27,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/minio/madmin-go/v2"
|
"github.com/minio/madmin-go/v2"
|
||||||
xhttp "github.com/minio/minio/internal/http"
|
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
"github.com/minio/pkg/sync/errgroup"
|
"github.com/minio/pkg/sync/errgroup"
|
||||||
)
|
)
|
||||||
@ -408,31 +407,12 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
|
|||||||
availableDisks, dataErrs, diskMTime := disksWithAllParts(ctx, onlineDisks, partsMetadata,
|
availableDisks, dataErrs, diskMTime := disksWithAllParts(ctx, onlineDisks, partsMetadata,
|
||||||
errs, latestMeta, bucket, object, scanMode)
|
errs, latestMeta, bucket, object, scanMode)
|
||||||
|
|
||||||
var (
|
var erasure Erasure
|
||||||
reencode, recreate bool
|
var recreate bool
|
||||||
erasureWriter, erasureReader Erasure
|
|
||||||
)
|
|
||||||
|
|
||||||
if !latestMeta.Deleted && !latestMeta.IsRemote() {
|
if !latestMeta.Deleted && !latestMeta.IsRemote() {
|
||||||
erasureReader, err = NewErasure(ctx, latestMeta.Erasure.DataBlocks, latestMeta.Erasure.ParityBlocks, latestMeta.Erasure.BlockSize)
|
|
||||||
if err != nil {
|
|
||||||
return result, err
|
|
||||||
}
|
|
||||||
|
|
||||||
parity := erasureReader.parityBlocks
|
|
||||||
if opts.UpdateParity && bucket != minioMetaBucket {
|
|
||||||
p := globalStorageClass.GetParityForSC(latestMeta.Metadata[xhttp.AmzStorageClass])
|
|
||||||
if p < 0 {
|
|
||||||
return result, fmt.Errorf("invalid parity number: %d", p)
|
|
||||||
}
|
|
||||||
if p != parity {
|
|
||||||
reencode = true
|
|
||||||
parity = p
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize erasure coding
|
// Initialize erasure coding
|
||||||
erasureWriter, err = NewErasure(ctx, len(storageDisks)-parity, parity, latestMeta.Erasure.BlockSize)
|
erasure, err = NewErasure(ctx, latestMeta.Erasure.DataBlocks,
|
||||||
|
latestMeta.Erasure.ParityBlocks, latestMeta.Erasure.BlockSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
@ -444,7 +424,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
|
|||||||
recreate = (opts.Recreate &&
|
recreate = (opts.Recreate &&
|
||||||
!latestMeta.InlineData() &&
|
!latestMeta.InlineData() &&
|
||||||
len(latestMeta.Parts) == 1 &&
|
len(latestMeta.Parts) == 1 &&
|
||||||
erasureWriter.ShardFileSize(latestMeta.Parts[0].ActualSize) < smallFileThreshold)
|
erasure.ShardFileSize(latestMeta.Parts[0].ActualSize) < smallFileThreshold)
|
||||||
}
|
}
|
||||||
|
|
||||||
result.ObjectSize, err = latestMeta.GetActualSize()
|
result.ObjectSize, err = latestMeta.GetActualSize()
|
||||||
@ -473,7 +453,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
|
|||||||
driveState = madmin.DriveStateCorrupt
|
driveState = madmin.DriveStateCorrupt
|
||||||
}
|
}
|
||||||
|
|
||||||
if reencode || shouldHealObjectOnDisk(errs[i], dataErrs[i], partsMetadata[i], latestMeta, recreate) {
|
if shouldHealObjectOnDisk(errs[i], dataErrs[i], partsMetadata[i], latestMeta, recreate) {
|
||||||
outDatedDisks[i] = storageDisks[i]
|
outDatedDisks[i] = storageDisks[i]
|
||||||
disksToHealCount++
|
disksToHealCount++
|
||||||
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
|
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
|
||||||
@ -521,7 +501,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if !latestMeta.XLV1 && !latestMeta.Deleted && !recreate && !reencode && disksToHealCount > latestMeta.Erasure.ParityBlocks {
|
if !latestMeta.XLV1 && !latestMeta.Deleted && !recreate && disksToHealCount > latestMeta.Erasure.ParityBlocks {
|
||||||
// When disk to heal count is greater than parity blocks we should simply error out.
|
// When disk to heal count is greater than parity blocks we should simply error out.
|
||||||
err := fmt.Errorf("more drives are expected to heal than parity, returned errors: %v (dataErrs %v) -> %s/%s(%s)", errs, dataErrs, bucket, object, versionID)
|
err := fmt.Errorf("more drives are expected to heal than parity, returned errors: %v (dataErrs %v) -> %s/%s(%s)", errs, dataErrs, bucket, object, versionID)
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
@ -608,10 +588,6 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
|
|||||||
inlineBuffers = make([]*bytes.Buffer, len(outDatedDisks))
|
inlineBuffers = make([]*bytes.Buffer, len(outDatedDisks))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Heal each part. The healed data will be written
|
|
||||||
// to .minio.sys/tmp/uuid/ which needs to be renamed
|
|
||||||
// later to the final location.
|
|
||||||
|
|
||||||
erasureInfo := latestMeta.Erasure
|
erasureInfo := latestMeta.Erasure
|
||||||
for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ {
|
for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ {
|
||||||
partSize := latestMeta.Parts[partIndex].Size
|
partSize := latestMeta.Parts[partIndex].Size
|
||||||
@ -620,20 +596,17 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
|
|||||||
partNumber := latestMeta.Parts[partIndex].Number
|
partNumber := latestMeta.Parts[partIndex].Number
|
||||||
partIdx := latestMeta.Parts[partIndex].Index
|
partIdx := latestMeta.Parts[partIndex].Index
|
||||||
partChecksums := latestMeta.Parts[partIndex].Checksums
|
partChecksums := latestMeta.Parts[partIndex].Checksums
|
||||||
|
tillOffset := erasure.ShardFileOffset(0, partSize, partSize)
|
||||||
readers := make([]io.ReaderAt, len(latestDisks))
|
readers := make([]io.ReaderAt, len(latestDisks))
|
||||||
checksumAlgo := erasureInfo.GetChecksumInfo(partNumber).Algorithm
|
checksumAlgo := erasureInfo.GetChecksumInfo(partNumber).Algorithm
|
||||||
|
|
||||||
origPartErasureSize := erasureReader.ShardFileOffset(0, partSize, partSize)
|
|
||||||
newPartErasureSize := erasureWriter.ShardFileOffset(0, partSize, partSize)
|
|
||||||
|
|
||||||
for i, disk := range latestDisks {
|
for i, disk := range latestDisks {
|
||||||
if disk == OfflineDisk {
|
if disk == OfflineDisk {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
checksumInfo := copyPartsMetadata[i].Erasure.GetChecksumInfo(partNumber)
|
checksumInfo := copyPartsMetadata[i].Erasure.GetChecksumInfo(partNumber)
|
||||||
partPath := pathJoin(object, srcDataDir, fmt.Sprintf("part.%d", partNumber))
|
partPath := pathJoin(object, srcDataDir, fmt.Sprintf("part.%d", partNumber))
|
||||||
readers[i] = newBitrotReader(disk, copyPartsMetadata[i].Data, bucket, partPath, origPartErasureSize, checksumAlgo,
|
readers[i] = newBitrotReader(disk, copyPartsMetadata[i].Data, bucket, partPath, tillOffset, checksumAlgo,
|
||||||
checksumInfo.Hash, erasureReader.ShardSize())
|
checksumInfo.Hash, erasure.ShardSize())
|
||||||
}
|
}
|
||||||
writers := make([]io.Writer, len(outDatedDisks))
|
writers := make([]io.Writer, len(outDatedDisks))
|
||||||
for i, disk := range outDatedDisks {
|
for i, disk := range outDatedDisks {
|
||||||
@ -642,35 +615,18 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
|
|||||||
}
|
}
|
||||||
partPath := pathJoin(tmpID, dstDataDir, fmt.Sprintf("part.%d", partNumber))
|
partPath := pathJoin(tmpID, dstDataDir, fmt.Sprintf("part.%d", partNumber))
|
||||||
if len(inlineBuffers) > 0 {
|
if len(inlineBuffers) > 0 {
|
||||||
inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, erasureWriter.ShardFileSize(latestMeta.Size)+32))
|
inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, erasure.ShardFileSize(latestMeta.Size)+32))
|
||||||
writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasureWriter.ShardSize())
|
writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||||
} else {
|
} else {
|
||||||
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath,
|
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath,
|
||||||
newPartErasureSize, DefaultBitrotAlgorithm, erasureWriter.ShardSize())
|
tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if reencode {
|
// Heal each part. erasure.Heal() will write the healed
|
||||||
rd, rw := io.Pipe()
|
// part to .minio/tmp/uuid/ which needs to be renamed
|
||||||
go func() {
|
// later to the final location.
|
||||||
n, err := erasureReader.Decode(ctx, rw, readers, 0, partSize, partSize, nil)
|
err = erasure.Heal(ctx, writers, readers, partSize)
|
||||||
if err == nil && n != partSize {
|
|
||||||
err = errors.New("unexpected content")
|
|
||||||
}
|
|
||||||
rw.CloseWithError(err)
|
|
||||||
}()
|
|
||||||
buffer := er.bp.Get()
|
|
||||||
n, encodeErr := erasureWriter.Encode(ctx, rd, writers, buffer, erasureWriter.dataBlocks)
|
|
||||||
er.bp.Put(buffer)
|
|
||||||
if encodeErr == nil && n != partSize {
|
|
||||||
encodeErr = errors.New("unexpected content")
|
|
||||||
}
|
|
||||||
rd.CloseWithError(encodeErr)
|
|
||||||
err = encodeErr
|
|
||||||
} else {
|
|
||||||
err = erasureReader.Heal(ctx, writers, readers, partSize)
|
|
||||||
}
|
|
||||||
|
|
||||||
closeBitrotReaders(readers)
|
closeBitrotReaders(readers)
|
||||||
closeBitrotWriters(writers)
|
closeBitrotWriters(writers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -692,13 +648,6 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if reencode {
|
|
||||||
partsMetadata[i].SetErasureParityUpdated(erasureReader.parityBlocks, erasureWriter.parityBlocks)
|
|
||||||
}
|
|
||||||
|
|
||||||
partsMetadata[i].Erasure.ParityBlocks = erasureWriter.parityBlocks
|
|
||||||
partsMetadata[i].Erasure.DataBlocks = erasureWriter.dataBlocks
|
|
||||||
|
|
||||||
partsMetadata[i].DataDir = dstDataDir
|
partsMetadata[i].DataDir = dstDataDir
|
||||||
partsMetadata[i].AddObjectPart(partNumber, "", partSize, partActualSize, partModTime, partIdx, partChecksums)
|
partsMetadata[i].AddObjectPart(partNumber, "", partSize, partActualSize, partModTime, partIdx, partChecksums)
|
||||||
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
|
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
|
||||||
|
@ -18,7 +18,6 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -298,14 +297,6 @@ func (fi *FileInfo) SetInlineData() {
|
|||||||
fi.Metadata[ReservedMetadataPrefixLower+"inline-data"] = "true"
|
fi.Metadata[ReservedMetadataPrefixLower+"inline-data"] = "true"
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetErasureParityUpdated adds trace information about an object parity update during healing
|
|
||||||
func (fi *FileInfo) SetErasureParityUpdated(old, new int) {
|
|
||||||
if fi.Metadata == nil {
|
|
||||||
fi.Metadata = make(map[string]string, 1)
|
|
||||||
}
|
|
||||||
fi.Metadata[ReservedMetadataPrefixLower+"erasure-parity-update"] = fmt.Sprintf("%d->%d", old, new)
|
|
||||||
}
|
|
||||||
|
|
||||||
// VersionPurgeStatusKey denotes purge status in metadata
|
// VersionPurgeStatusKey denotes purge status in metadata
|
||||||
const (
|
const (
|
||||||
VersionPurgeStatusKey = ReservedMetadataPrefixLower + "purgestatus"
|
VersionPurgeStatusKey = ReservedMetadataPrefixLower + "purgestatus"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user