heal: Avoid deadline error with very large objects (#140) (#20586)

Healing a large object with a normal scan mode where no parts read 
is involved can still fail after 30 seconds if an object has

There are too many parts when hard disks are being used mainly. 
The reason is there is a general deadline that checks for all parts we 
do a deadline per part.
This commit is contained in:
Anis Eleuch 2024-10-26 10:56:26 +01:00 committed by GitHub
parent 72a0d14195
commit f7e176d4ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 41 additions and 42 deletions

View File

@ -509,9 +509,7 @@ func (p *xlStorageDiskIDCheck) CheckParts(ctx context.Context, volume string, pa
} }
defer done(0, &err) defer done(0, &err)
return xioutil.WithDeadline[*CheckPartsResp](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (res *CheckPartsResp, err error) { return p.storage.CheckParts(ctx, volume, path, fi)
return p.storage.CheckParts(ctx, volume, path, fi)
})
} }
func (p *xlStorageDiskIDCheck) DeleteBulk(ctx context.Context, volume string, paths ...string) (err error) { func (p *xlStorageDiskIDCheck) DeleteBulk(ctx context.Context, volume string, paths ...string) (err error) {

View File

@ -2367,6 +2367,41 @@ func (s *xlStorage) AppendFile(ctx context.Context, volume string, path string,
return nil return nil
} }
// checkPart is a light check of an existing and size of a part, without doing a bitrot operation
// For any unexpected error, return checkPartUnknown (zero)
func (s *xlStorage) checkPart(volumeDir, path, dataDir string, partNum int, expectedSize int64, skipAccessCheck bool) (resp int) {
partPath := pathJoin(path, dataDir, fmt.Sprintf("part.%d", partNum))
filePath := pathJoin(volumeDir, partPath)
st, err := Lstat(filePath)
if err != nil {
if osIsNotExist(err) {
if !skipAccessCheck {
// Stat a volume entry.
if verr := Access(volumeDir); verr != nil {
if osIsNotExist(verr) {
resp = checkPartVolumeNotFound
}
return
}
}
}
if osErrToFileErr(err) == errFileNotFound {
resp = checkPartFileNotFound
}
return
}
if st.Mode().IsDir() {
resp = checkPartFileNotFound
return
}
// Check if shard is truncated.
if st.Size() < expectedSize {
resp = checkPartFileCorrupt
return
}
return checkPartSuccess
}
// CheckParts check if path has necessary parts available. // CheckParts check if path has necessary parts available.
func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string, fi FileInfo) (*CheckPartsResp, error) { func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string, fi FileInfo) (*CheckPartsResp, error) {
volumeDir, err := s.getVolDir(volume) volumeDir, err := s.getVolDir(volume)
@ -2385,36 +2420,12 @@ func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string,
} }
for i, part := range fi.Parts { for i, part := range fi.Parts {
partPath := pathJoin(path, fi.DataDir, fmt.Sprintf("part.%d", part.Number)) resp.Results[i], err = xioutil.WithDeadline[int](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (int, error) {
filePath := pathJoin(volumeDir, partPath) return s.checkPart(volumeDir, path, fi.DataDir, part.Number, fi.Erasure.ShardFileSize(part.Size), skipAccessChecks(volume)), nil
st, err := Lstat(filePath) })
if err != nil { if err != nil {
if osIsNotExist(err) { return nil, err
if !skipAccessChecks(volume) {
// Stat a volume entry.
if verr := Access(volumeDir); verr != nil {
if osIsNotExist(verr) {
resp.Results[i] = checkPartVolumeNotFound
}
continue
}
}
}
if osErrToFileErr(err) == errFileNotFound {
resp.Results[i] = checkPartFileNotFound
}
continue
} }
if st.Mode().IsDir() {
resp.Results[i] = checkPartFileNotFound
continue
}
// Check if shard is truncated.
if st.Size() < fi.Erasure.ShardFileSize(part.Size) {
resp.Results[i] = checkPartFileCorrupt
continue
}
resp.Results[i] = checkPartSuccess
} }
return &resp, nil return &resp, nil
@ -2546,17 +2557,7 @@ func (s *xlStorage) Delete(ctx context.Context, volume string, path string, dele
} }
func skipAccessChecks(volume string) (ok bool) { func skipAccessChecks(volume string) (ok bool) {
for _, prefix := range []string{ return strings.HasPrefix(volume, minioMetaBucket)
minioMetaTmpDeletedBucket,
minioMetaTmpBucket,
minioMetaMultipartBucket,
minioMetaBucket,
} {
if strings.HasPrefix(volume, prefix) {
return true
}
}
return ok
} }
// RenameData - rename source path to destination path atomically, metadata and data directory. // RenameData - rename source path to destination path atomically, metadata and data directory.