heal: Dangling check to evaluate object parts separately (#19797)

This commit is contained in:
Anis Eleuch
2024-06-10 16:51:27 +01:00
committed by GitHub
parent 0662c90b5c
commit 789cbc6fb2
17 changed files with 614 additions and 183 deletions

View File

@@ -2312,18 +2312,25 @@ func (s *xlStorage) AppendFile(ctx context.Context, volume string, path string,
}
// CheckParts check if path has necessary parts available.
func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string, fi FileInfo) error {
func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string, fi FileInfo) (*CheckPartsResp, error) {
volumeDir, err := s.getVolDir(volume)
if err != nil {
return err
return nil, err
}
for _, part := range fi.Parts {
err = checkPathLength(pathJoin(volumeDir, path))
if err != nil {
return nil, err
}
resp := CheckPartsResp{
// By default, all results have an unknown status
Results: make([]int, len(fi.Parts)),
}
for i, part := range fi.Parts {
partPath := pathJoin(path, fi.DataDir, fmt.Sprintf("part.%d", part.Number))
filePath := pathJoin(volumeDir, partPath)
if err = checkPathLength(filePath); err != nil {
return err
}
st, err := Lstat(filePath)
if err != nil {
if osIsNotExist(err) {
@@ -2331,24 +2338,30 @@ func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string,
// Stat a volume entry.
if verr := Access(volumeDir); verr != nil {
if osIsNotExist(verr) {
return errVolumeNotFound
resp.Results[i] = checkPartVolumeNotFound
}
return verr
continue
}
}
}
return osErrToFileErr(err)
if osErrToFileErr(err) == errFileNotFound {
resp.Results[i] = checkPartFileNotFound
}
continue
}
if st.Mode().IsDir() {
return errFileNotFound
resp.Results[i] = checkPartFileNotFound
continue
}
// Check if shard is truncated.
if st.Size() < fi.Erasure.ShardFileSize(part.Size) {
return errFileCorrupt
resp.Results[i] = checkPartFileCorrupt
continue
}
resp.Results[i] = checkPartSuccess
}
return nil
return &resp, nil
}
// deleteFile deletes a file or a directory if its empty unless recursive
@@ -2922,42 +2935,43 @@ func (s *xlStorage) bitrotVerify(ctx context.Context, partPath string, partSize
return bitrotVerify(diskHealthReader(ctx, file), fi.Size(), partSize, algo, sum, shardSize)
}
func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi FileInfo) (err error) {
func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi FileInfo) (*CheckPartsResp, error) {
volumeDir, err := s.getVolDir(volume)
if err != nil {
return err
return nil, err
}
if !skipAccessChecks(volume) {
// Stat a volume entry.
if err = Access(volumeDir); err != nil {
return convertAccessError(err, errVolumeAccessDenied)
return nil, convertAccessError(err, errVolumeAccessDenied)
}
}
resp := CheckPartsResp{
// By default, the result is unknown per part
Results: make([]int, len(fi.Parts)),
}
erasure := fi.Erasure
for _, part := range fi.Parts {
for i, part := range fi.Parts {
checksumInfo := erasure.GetChecksumInfo(part.Number)
partPath := pathJoin(volumeDir, path, fi.DataDir, fmt.Sprintf("part.%d", part.Number))
if err := s.bitrotVerify(ctx, partPath,
err := s.bitrotVerify(ctx, partPath,
erasure.ShardFileSize(part.Size),
checksumInfo.Algorithm,
checksumInfo.Hash, erasure.ShardSize()); err != nil {
if !IsErr(err, []error{
errFileNotFound,
errVolumeNotFound,
errFileCorrupt,
errFileAccessDenied,
errFileVersionNotFound,
}...) {
logger.GetReqInfo(ctx).AppendTags("disk", s.String())
storageLogOnceIf(ctx, err, partPath)
}
return err
checksumInfo.Hash, erasure.ShardSize())
resp.Results[i] = convPartErrToInt(err)
// Only log unknown errors
if resp.Results[i] == checkPartUnknown && err != errFileAccessDenied {
logger.GetReqInfo(ctx).AppendTags("disk", s.String())
storageLogOnceIf(ctx, err, partPath)
}
}
return nil
return &resp, nil
}
// ReadMultiple will read multiple files and send each back as response.