Add support converting non-inlined to inlined (#15444)

This is a feature to allow for inode compaction on
large clusters that use a lot of small files spread
across a large heirarchy.
This commit is contained in:
Harshavardhana
2022-08-02 23:10:22 -07:00
committed by GitHub
parent e956369c4e
commit a6e0ec4e6f
6 changed files with 286 additions and 19 deletions

View File

@@ -243,7 +243,7 @@ func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets
// Only heal on disks where we are sure that healing is needed. We can expand
// this list as and when we figure out more errors can be added to this list safely.
func shouldHealObjectOnDisk(erErr, dataErr error, meta FileInfo, latestMeta FileInfo) bool {
func shouldHealObjectOnDisk(erErr, dataErr error, meta FileInfo, latestMeta FileInfo, doinline bool) bool {
switch {
case errors.Is(erErr, errFileNotFound) || errors.Is(erErr, errFileVersionNotFound):
return true
@@ -256,6 +256,10 @@ func shouldHealObjectOnDisk(erErr, dataErr error, meta FileInfo, latestMeta File
// always check first.
return true
}
if doinline {
// convert small files to 'inline'
return true
}
if !meta.Deleted && !meta.IsRemote() {
// If xl.meta was read fine but there may be problem with the part.N files.
if IsErr(dataErr, []error{
@@ -342,6 +346,26 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
availableDisks, dataErrs, diskMTime := disksWithAllParts(ctx, onlineDisks, partsMetadata,
errs, latestMeta, bucket, object, scanMode)
var erasure Erasure
var recreate bool
if !latestMeta.Deleted && !latestMeta.IsRemote() {
// Initialize erasure coding
erasure, err = NewErasure(ctx, latestMeta.Erasure.DataBlocks,
latestMeta.Erasure.ParityBlocks, latestMeta.Erasure.BlockSize)
if err != nil {
return result, err
}
// Is only 'true' if the opts.Recreate is true and
// the object shardSize < smallFileThreshold do not
// set this to 'true' arbitrarily and must be only
// 'true' with caller ask.
recreate = (opts.Recreate &&
!latestMeta.InlineData() &&
len(latestMeta.Parts) == 1 &&
erasure.ShardFileSize(latestMeta.Parts[0].ActualSize) < smallFileThreshold)
}
// Loop to find number of disks with valid data, per-drive
// data state and a list of outdated disks on which data needs
// to be healed.
@@ -368,7 +392,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
driveState = madmin.DriveStateCorrupt
}
if shouldHealObjectOnDisk(errs[i], dataErrs[i], partsMetadata[i], latestMeta) {
if shouldHealObjectOnDisk(errs[i], dataErrs[i], partsMetadata[i], latestMeta, recreate) {
outDatedDisks[i] = storageDisks[i]
disksToHealCount++
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
@@ -422,7 +446,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
return result, nil
}
if !latestMeta.XLV1 && !latestMeta.Deleted && disksToHealCount > latestMeta.Erasure.ParityBlocks {
if !latestMeta.XLV1 && !latestMeta.Deleted && !recreate && disksToHealCount > latestMeta.Erasure.ParityBlocks {
// When disk to heal count is greater than parity blocks we should simply error out.
err := fmt.Errorf("more disks are expected to heal than parity, returned errors: %v (dataErrs %v) -> %s/%s(%s)", errs, dataErrs, bucket, object, versionID)
logger.LogIf(ctx, err)
@@ -478,18 +502,9 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
}
var inlineBuffers []*bytes.Buffer
if latestMeta.InlineData() {
inlineBuffers = make([]*bytes.Buffer, len(outDatedDisks))
}
if !latestMeta.Deleted && !latestMeta.IsRemote() {
// Heal each part. erasureHealFile() will write the healed
// part to .minio/tmp/uuid/ which needs to be renamed later to
// the final location.
erasure, err := NewErasure(ctx, latestMeta.Erasure.DataBlocks,
latestMeta.Erasure.ParityBlocks, latestMeta.Erasure.BlockSize)
if err != nil {
return result, err
if latestMeta.InlineData() || recreate {
inlineBuffers = make([]*bytes.Buffer, len(outDatedDisks))
}
erasureInfo := latestMeta.Erasure
@@ -525,6 +540,10 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize())
}
}
// Heal each part. erasure.Heal() will write the healed
// part to .minio/tmp/uuid/ which needs to be renamed
// later to the final location.
err = erasure.Heal(ctx, writers, readers, partSize)
closeBitrotReaders(readers)
closeBitrotWriters(writers)
@@ -556,6 +575,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
})
if len(inlineBuffers) > 0 && inlineBuffers[i] != nil {
partsMetadata[i].Data = inlineBuffers[i].Bytes()
partsMetadata[i].SetInlineData()
} else {
partsMetadata[i].Data = nil
}
@@ -587,8 +607,9 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
return result, err
}
// Remove any remaining parts from outdated disks from before transition.
if partsMetadata[i].IsRemote() {
// - Remove any parts from healed disks after its been inlined.
// - Remove any remaining parts from outdated disks from before transition.
if recreate || partsMetadata[i].IsRemote() {
rmDataDir := partsMetadata[i].DataDir
disk.DeleteVol(ctx, pathJoin(bucket, encodeDirObject(object), rmDataDir), true)
}