mirror of
https://github.com/minio/minio.git
synced 2025-11-09 21:49:46 -05:00
remove serializing WalkDir() across all buckets/prefixes on SSDs (#17707)
slower drives get knocked off because they are too slow via active monitoring, we do not need to block calls arbitrarily. Serializing adds latencies for already slow calls, remove it for SSDs/NVMEs Also, add a selection with context when writing to `out <-` channel, to avoid any potential blocks.
This commit is contained in:
@@ -98,6 +98,15 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
||||
objsReturned++
|
||||
}
|
||||
}
|
||||
send := func(entry metaCacheEntry) error {
|
||||
objReturned(entry.metadata)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case out <- entry:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fast exit track to check if we are listing an object with
|
||||
// a trailing slash, this will avoid to list the object content.
|
||||
@@ -109,11 +118,12 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
||||
// if baseDir is already a directory object, consider it
|
||||
// as part of the list call, this is AWS S3 specific
|
||||
// behavior.
|
||||
out <- metaCacheEntry{
|
||||
if err := send(metaCacheEntry{
|
||||
name: opts.BaseDir,
|
||||
metadata: metadata,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
objReturned(metadata)
|
||||
} else {
|
||||
st, sterr := Lstat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile))
|
||||
if sterr == nil && st.Mode().IsRegular() {
|
||||
@@ -143,19 +153,25 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
||||
return nil
|
||||
}
|
||||
|
||||
s.walkMu.Lock()
|
||||
if s.walkMu != nil {
|
||||
s.walkMu.Lock()
|
||||
}
|
||||
entries, err := s.ListDir(ctx, opts.Bucket, current, -1)
|
||||
s.walkMu.Unlock()
|
||||
if s.walkMu != nil {
|
||||
s.walkMu.Unlock()
|
||||
}
|
||||
if err != nil {
|
||||
// Folder could have gone away in-between
|
||||
if err != errVolumeNotFound && err != errFileNotFound {
|
||||
logger.LogOnceIf(ctx, err, "metacache-walk-scan-dir")
|
||||
}
|
||||
if opts.ReportNotFound && err == errFileNotFound && current == opts.BaseDir {
|
||||
return errFileNotFound
|
||||
err = errFileNotFound
|
||||
} else {
|
||||
err = nil
|
||||
}
|
||||
// Forward some errors?
|
||||
return nil
|
||||
diskHealthCheckOK(ctx, err)
|
||||
return err
|
||||
}
|
||||
diskHealthCheckOK(ctx, err)
|
||||
if len(entries) == 0 {
|
||||
@@ -202,9 +218,13 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
||||
// If root was an object return it as such.
|
||||
if HasSuffix(entry, xlStorageFormatFile) {
|
||||
var meta metaCacheEntry
|
||||
s.walkReadMu.Lock()
|
||||
if s.walkReadMu != nil {
|
||||
s.walkReadMu.Lock()
|
||||
}
|
||||
meta.metadata, err = s.readMetadata(ctx, pathJoinBuf(&sb, volumeDir, current, entry))
|
||||
s.walkReadMu.Unlock()
|
||||
if s.walkReadMu != nil {
|
||||
s.walkReadMu.Unlock()
|
||||
}
|
||||
diskHealthCheckOK(ctx, err)
|
||||
if err != nil {
|
||||
// It is totally possible that xl.meta was overwritten
|
||||
@@ -219,17 +239,15 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
||||
meta.name = strings.TrimSuffix(meta.name, SlashSeparator)
|
||||
meta.name = pathJoinBuf(&sb, current, meta.name)
|
||||
meta.name = decodeDirObject(meta.name)
|
||||
|
||||
objReturned(meta.metadata)
|
||||
out <- meta
|
||||
if err := send(meta); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// Check legacy.
|
||||
if HasSuffix(entry, xlStorageFormatFileV1) {
|
||||
var meta metaCacheEntry
|
||||
s.walkReadMu.Lock()
|
||||
meta.metadata, err = xioutil.ReadFile(pathJoinBuf(&sb, volumeDir, current, entry))
|
||||
s.walkReadMu.Unlock()
|
||||
diskHealthCheckOK(ctx, err)
|
||||
if err != nil {
|
||||
if !IsErrIgnored(err, io.EOF, io.ErrUnexpectedEOF) {
|
||||
@@ -240,9 +258,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
||||
meta.name = strings.TrimSuffix(entry, xlStorageFormatFileV1)
|
||||
meta.name = strings.TrimSuffix(meta.name, SlashSeparator)
|
||||
meta.name = pathJoinBuf(&sb, current, meta.name)
|
||||
objReturned(meta.metadata)
|
||||
|
||||
out <- meta
|
||||
if err := send(meta); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// Skip all other files.
|
||||
@@ -295,9 +313,13 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
||||
meta.name = meta.name[:len(meta.name)-1] + globalDirSuffixWithSlash
|
||||
}
|
||||
|
||||
s.walkReadMu.Lock()
|
||||
if s.walkReadMu != nil {
|
||||
s.walkReadMu.Lock()
|
||||
}
|
||||
meta.metadata, err = s.readMetadata(ctx, pathJoinBuf(&sb, volumeDir, meta.name, xlStorageFormatFile))
|
||||
s.walkReadMu.Unlock()
|
||||
if s.walkReadMu != nil {
|
||||
s.walkReadMu.Unlock()
|
||||
}
|
||||
diskHealthCheckOK(ctx, err)
|
||||
switch {
|
||||
case err == nil:
|
||||
@@ -305,17 +327,17 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
||||
if isDirObj {
|
||||
meta.name = strings.TrimSuffix(meta.name, globalDirSuffixWithSlash) + slashSeparator
|
||||
}
|
||||
objReturned(meta.metadata)
|
||||
|
||||
out <- meta
|
||||
if err := send(meta); err != nil {
|
||||
return err
|
||||
}
|
||||
case osIsNotExist(err), isSysErrIsDir(err):
|
||||
meta.metadata, err = xioutil.ReadFile(pathJoinBuf(&sb, volumeDir, meta.name, xlStorageFormatFileV1))
|
||||
diskHealthCheckOK(ctx, err)
|
||||
if err == nil {
|
||||
// It was an object
|
||||
objReturned(meta.metadata)
|
||||
|
||||
out <- meta
|
||||
if err := send(meta); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -115,8 +115,8 @@ type xlStorage struct {
|
||||
formatData []byte
|
||||
|
||||
// mutex to prevent concurrent read operations overloading walks.
|
||||
walkMu sync.Mutex
|
||||
walkReadMu sync.Mutex
|
||||
walkMu *sync.Mutex
|
||||
walkReadMu *sync.Mutex
|
||||
}
|
||||
|
||||
// checkPathLength - returns error if given path name length more than 255
|
||||
@@ -216,18 +216,17 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info, err := disk.GetInfo(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var rootDisk bool
|
||||
if !globalIsCICD && !globalIsErasureSD {
|
||||
if globalRootDiskThreshold > 0 {
|
||||
// Use MINIO_ROOTDISK_THRESHOLD_SIZE to figure out if
|
||||
// this disk is a root disk.
|
||||
info, err := disk.GetInfo(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// treat those disks with size less than or equal to the
|
||||
// threshold as rootDisks.
|
||||
// this disk is a root disk. treat those disks with
|
||||
// size less than or equal to the threshold as rootDisks.
|
||||
rootDisk = info.Total <= globalRootDiskThreshold
|
||||
} else {
|
||||
rootDisk, err = disk.IsRootDisk(path, SlashSeparator)
|
||||
@@ -247,6 +246,12 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) {
|
||||
diskIndex: -1,
|
||||
}
|
||||
|
||||
// We stagger listings only on HDDs.
|
||||
if info.Rotational == nil || *info.Rotational {
|
||||
s.walkMu = &sync.Mutex{}
|
||||
s.walkReadMu = &sync.Mutex{}
|
||||
}
|
||||
|
||||
if cleanUp {
|
||||
bgFormatErasureCleanupTmp(s.diskPath) // cleanup any old data.
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user