diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index a0d5d27bb..876b7ec49 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -98,6 +98,15 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ objsReturned++ } } + send := func(entry metaCacheEntry) error { + objReturned(entry.metadata) + select { + case <-ctx.Done(): + return ctx.Err() + case out <- entry: + } + return nil + } // Fast exit track to check if we are listing an object with // a trailing slash, this will avoid to list the object content. @@ -109,11 +118,12 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ // if baseDir is already a directory object, consider it // as part of the list call, this is AWS S3 specific // behavior. - out <- metaCacheEntry{ + if err := send(metaCacheEntry{ name: opts.BaseDir, metadata: metadata, + }); err != nil { + return err } - objReturned(metadata) } else { st, sterr := Lstat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile)) if sterr == nil && st.Mode().IsRegular() { @@ -143,19 +153,25 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ return nil } - s.walkMu.Lock() + if s.walkMu != nil { + s.walkMu.Lock() + } entries, err := s.ListDir(ctx, opts.Bucket, current, -1) - s.walkMu.Unlock() + if s.walkMu != nil { + s.walkMu.Unlock() + } if err != nil { // Folder could have gone away in-between if err != errVolumeNotFound && err != errFileNotFound { logger.LogOnceIf(ctx, err, "metacache-walk-scan-dir") } if opts.ReportNotFound && err == errFileNotFound && current == opts.BaseDir { - return errFileNotFound + err = errFileNotFound + } else { + err = nil } - // Forward some errors? - return nil + diskHealthCheckOK(ctx, err) + return err } diskHealthCheckOK(ctx, err) if len(entries) == 0 { @@ -202,9 +218,13 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ // If root was an object return it as such. if HasSuffix(entry, xlStorageFormatFile) { var meta metaCacheEntry - s.walkReadMu.Lock() + if s.walkReadMu != nil { + s.walkReadMu.Lock() + } meta.metadata, err = s.readMetadata(ctx, pathJoinBuf(&sb, volumeDir, current, entry)) - s.walkReadMu.Unlock() + if s.walkReadMu != nil { + s.walkReadMu.Unlock() + } diskHealthCheckOK(ctx, err) if err != nil { // It is totally possible that xl.meta was overwritten @@ -219,17 +239,15 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ meta.name = strings.TrimSuffix(meta.name, SlashSeparator) meta.name = pathJoinBuf(&sb, current, meta.name) meta.name = decodeDirObject(meta.name) - - objReturned(meta.metadata) - out <- meta + if err := send(meta); err != nil { + return err + } return nil } // Check legacy. if HasSuffix(entry, xlStorageFormatFileV1) { var meta metaCacheEntry - s.walkReadMu.Lock() meta.metadata, err = xioutil.ReadFile(pathJoinBuf(&sb, volumeDir, current, entry)) - s.walkReadMu.Unlock() diskHealthCheckOK(ctx, err) if err != nil { if !IsErrIgnored(err, io.EOF, io.ErrUnexpectedEOF) { @@ -240,9 +258,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ meta.name = strings.TrimSuffix(entry, xlStorageFormatFileV1) meta.name = strings.TrimSuffix(meta.name, SlashSeparator) meta.name = pathJoinBuf(&sb, current, meta.name) - objReturned(meta.metadata) - - out <- meta + if err := send(meta); err != nil { + return err + } return nil } // Skip all other files. @@ -295,9 +313,13 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ meta.name = meta.name[:len(meta.name)-1] + globalDirSuffixWithSlash } - s.walkReadMu.Lock() + if s.walkReadMu != nil { + s.walkReadMu.Lock() + } meta.metadata, err = s.readMetadata(ctx, pathJoinBuf(&sb, volumeDir, meta.name, xlStorageFormatFile)) - s.walkReadMu.Unlock() + if s.walkReadMu != nil { + s.walkReadMu.Unlock() + } diskHealthCheckOK(ctx, err) switch { case err == nil: @@ -305,17 +327,17 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ if isDirObj { meta.name = strings.TrimSuffix(meta.name, globalDirSuffixWithSlash) + slashSeparator } - objReturned(meta.metadata) - - out <- meta + if err := send(meta); err != nil { + return err + } case osIsNotExist(err), isSysErrIsDir(err): meta.metadata, err = xioutil.ReadFile(pathJoinBuf(&sb, volumeDir, meta.name, xlStorageFormatFileV1)) diskHealthCheckOK(ctx, err) if err == nil { // It was an object - objReturned(meta.metadata) - - out <- meta + if err := send(meta); err != nil { + return err + } continue } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index b0550fb7f..7342c2c3d 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -115,8 +115,8 @@ type xlStorage struct { formatData []byte // mutex to prevent concurrent read operations overloading walks. - walkMu sync.Mutex - walkReadMu sync.Mutex + walkMu *sync.Mutex + walkReadMu *sync.Mutex } // checkPathLength - returns error if given path name length more than 255 @@ -216,18 +216,17 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) { return nil, err } + info, err := disk.GetInfo(path) + if err != nil { + return nil, err + } + var rootDisk bool if !globalIsCICD && !globalIsErasureSD { if globalRootDiskThreshold > 0 { // Use MINIO_ROOTDISK_THRESHOLD_SIZE to figure out if - // this disk is a root disk. - info, err := disk.GetInfo(path) - if err != nil { - return nil, err - } - - // treat those disks with size less than or equal to the - // threshold as rootDisks. + // this disk is a root disk. treat those disks with + // size less than or equal to the threshold as rootDisks. rootDisk = info.Total <= globalRootDiskThreshold } else { rootDisk, err = disk.IsRootDisk(path, SlashSeparator) @@ -247,6 +246,12 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) { diskIndex: -1, } + // We stagger listings only on HDDs. + if info.Rotational == nil || *info.Rotational { + s.walkMu = &sync.Mutex{} + s.walkReadMu = &sync.Mutex{} + } + if cleanUp { bgFormatErasureCleanupTmp(s.diskPath) // cleanup any old data. } diff --git a/internal/disk/disk.go b/internal/disk/disk.go index 4baa478eb..d028df6d1 100644 --- a/internal/disk/disk.go +++ b/internal/disk/disk.go @@ -23,15 +23,20 @@ package disk // Files - total inodes available // Ffree - free inodes available // FSType - file system type +// Major - major dev id +// Minor - minor dev id +// Devname - device name type Info struct { - Total uint64 - Free uint64 - Used uint64 - Files uint64 - Ffree uint64 - FSType string - Major uint32 - Minor uint32 + Total uint64 + Free uint64 + Used uint64 + Files uint64 + Ffree uint64 + FSType string + Major uint32 + Minor uint32 + Name string + Rotational *bool } // DevID is the drive major and minor ids diff --git a/internal/disk/stat_linux.go b/internal/disk/stat_linux.go index e489cfd22..9fb879068 100644 --- a/internal/disk/stat_linux.go +++ b/internal/disk/stat_linux.go @@ -1,7 +1,7 @@ //go:build linux && !s390x && !arm && !386 // +build linux,!s390x,!arm,!386 -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -28,6 +28,7 @@ import ( "strings" "syscall" + "github.com/prometheus/procfs/blockdevice" "golang.org/x/sys/unix" ) @@ -47,14 +48,6 @@ func GetInfo(path string) (info Info, err error) { //nolint:unconvert FSType: getFSType(int64(s.Type)), } - // Check for overflows. - // https://github.com/minio/minio/issues/8035 - // XFS can show wrong values at times error out - // in such scenarios. - if info.Free > info.Total { - return info, fmt.Errorf("detected free space (%d) > total drive space (%d), fs corruption at (%s). please run 'fsck'", info.Free, info.Total, path) - } - info.Used = info.Total - info.Free st := syscall.Stat_t{} err = syscall.Stat(path, &st) @@ -65,6 +58,37 @@ func GetInfo(path string) (info Info, err error) { devID := uint64(st.Dev) // Needed to support multiple GOARCHs info.Major = unix.Major(devID) info.Minor = unix.Minor(devID) + + // Check for overflows. + // https://github.com/minio/minio/issues/8035 + // XFS can show wrong values at times error out + // in such scenarios. + if info.Free > info.Total { + return info, fmt.Errorf("detected free space (%d) > total drive space (%d), fs corruption at (%s). please run 'fsck'", info.Free, info.Total, path) + } + info.Used = info.Total - info.Free + + bfs, err := blockdevice.NewDefaultFS() + if err == nil { + diskstats, _ := bfs.ProcDiskstats() + for _, dstat := range diskstats { + // ignore all loop devices + if strings.HasPrefix(dstat.DeviceName, "loop") { + continue + } + qst, err := bfs.SysBlockDeviceQueueStats(dstat.DeviceName) + if err != nil { + continue + } + rot := qst.Rotational == 1 // Rotational is '1' if the device is HDD + if dstat.MajorNumber == info.Major && dstat.MinorNumber == info.Minor { + info.Name = dstat.DeviceName + info.Rotational = &rot + break + } + } + } + return info, nil }