diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 02257e47f..ee5ec6928 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -509,9 +509,7 @@ func (p *xlStorageDiskIDCheck) CheckParts(ctx context.Context, volume string, pa } defer done(0, &err) - return xioutil.WithDeadline[*CheckPartsResp](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (res *CheckPartsResp, err error) { - return p.storage.CheckParts(ctx, volume, path, fi) - }) + return p.storage.CheckParts(ctx, volume, path, fi) } func (p *xlStorageDiskIDCheck) DeleteBulk(ctx context.Context, volume string, paths ...string) (err error) { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 77937210d..f655ea5e3 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -2367,6 +2367,41 @@ func (s *xlStorage) AppendFile(ctx context.Context, volume string, path string, return nil } +// checkPart is a light check of an existing and size of a part, without doing a bitrot operation +// For any unexpected error, return checkPartUnknown (zero) +func (s *xlStorage) checkPart(volumeDir, path, dataDir string, partNum int, expectedSize int64, skipAccessCheck bool) (resp int) { + partPath := pathJoin(path, dataDir, fmt.Sprintf("part.%d", partNum)) + filePath := pathJoin(volumeDir, partPath) + st, err := Lstat(filePath) + if err != nil { + if osIsNotExist(err) { + if !skipAccessCheck { + // Stat a volume entry. + if verr := Access(volumeDir); verr != nil { + if osIsNotExist(verr) { + resp = checkPartVolumeNotFound + } + return + } + } + } + if osErrToFileErr(err) == errFileNotFound { + resp = checkPartFileNotFound + } + return + } + if st.Mode().IsDir() { + resp = checkPartFileNotFound + return + } + // Check if shard is truncated. + if st.Size() < expectedSize { + resp = checkPartFileCorrupt + return + } + return checkPartSuccess +} + // CheckParts check if path has necessary parts available. func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string, fi FileInfo) (*CheckPartsResp, error) { volumeDir, err := s.getVolDir(volume) @@ -2385,36 +2420,12 @@ func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string, } for i, part := range fi.Parts { - partPath := pathJoin(path, fi.DataDir, fmt.Sprintf("part.%d", part.Number)) - filePath := pathJoin(volumeDir, partPath) - st, err := Lstat(filePath) + resp.Results[i], err = xioutil.WithDeadline[int](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (int, error) { + return s.checkPart(volumeDir, path, fi.DataDir, part.Number, fi.Erasure.ShardFileSize(part.Size), skipAccessChecks(volume)), nil + }) if err != nil { - if osIsNotExist(err) { - if !skipAccessChecks(volume) { - // Stat a volume entry. - if verr := Access(volumeDir); verr != nil { - if osIsNotExist(verr) { - resp.Results[i] = checkPartVolumeNotFound - } - continue - } - } - } - if osErrToFileErr(err) == errFileNotFound { - resp.Results[i] = checkPartFileNotFound - } - continue + return nil, err } - if st.Mode().IsDir() { - resp.Results[i] = checkPartFileNotFound - continue - } - // Check if shard is truncated. - if st.Size() < fi.Erasure.ShardFileSize(part.Size) { - resp.Results[i] = checkPartFileCorrupt - continue - } - resp.Results[i] = checkPartSuccess } return &resp, nil @@ -2546,17 +2557,7 @@ func (s *xlStorage) Delete(ctx context.Context, volume string, path string, dele } func skipAccessChecks(volume string) (ok bool) { - for _, prefix := range []string{ - minioMetaTmpDeletedBucket, - minioMetaTmpBucket, - minioMetaMultipartBucket, - minioMetaBucket, - } { - if strings.HasPrefix(volume, prefix) { - return true - } - } - return ok + return strings.HasPrefix(volume, minioMetaBucket) } // RenameData - rename source path to destination path atomically, metadata and data directory.