xl walk: Limit walk concurrent IO (#12885)

We are observing heavy system loads, potentially
locking the system up for periods when concurrent
listing operations are performed.

We place a per-disk lock on walk IO operations.
This will minimize the impact of concurrent listing
operations on the entire system and de-prioritize
them compared to other operations.

Single list operations should remain largely unaffected.
This commit is contained in:
Klaus Post 2021-08-19 03:10:36 +02:00 committed by GitHub
parent ee028a4693
commit c25816eabc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 0 deletions

View File

@ -121,7 +121,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
if contextCanceled(ctx) { if contextCanceled(ctx) {
return ctx.Err() return ctx.Err()
} }
s.walkMu.Lock()
entries, err := s.ListDir(ctx, opts.Bucket, current, -1) entries, err := s.ListDir(ctx, opts.Bucket, current, -1)
s.walkMu.Unlock()
if err != nil { if err != nil {
// Folder could have gone away in-between // Folder could have gone away in-between
if err != errVolumeNotFound && err != errFileNotFound { if err != errVolumeNotFound && err != errFileNotFound {
@ -171,7 +173,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// If root was an object return it as such. // If root was an object return it as such.
if HasSuffix(entry, xlStorageFormatFile) { if HasSuffix(entry, xlStorageFormatFile) {
var meta metaCacheEntry var meta metaCacheEntry
s.walkMu.Lock()
meta.metadata, err = s.readMetadata(pathJoin(volumeDir, current, entry)) meta.metadata, err = s.readMetadata(pathJoin(volumeDir, current, entry))
s.walkMu.Unlock()
if err != nil { if err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
continue continue
@ -186,7 +190,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// Check legacy. // Check legacy.
if HasSuffix(entry, xlStorageFormatFileV1) { if HasSuffix(entry, xlStorageFormatFileV1) {
var meta metaCacheEntry var meta metaCacheEntry
s.walkMu.Lock()
meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, current, entry)) meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, current, entry))
s.walkMu.Unlock()
if err != nil { if err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
continue continue
@ -242,7 +248,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
meta.name = meta.name[:len(meta.name)-1] + globalDirSuffixWithSlash meta.name = meta.name[:len(meta.name)-1] + globalDirSuffixWithSlash
} }
s.walkMu.Lock()
meta.metadata, err = s.readMetadata(pathJoin(volumeDir, meta.name, xlStorageFormatFile)) meta.metadata, err = s.readMetadata(pathJoin(volumeDir, meta.name, xlStorageFormatFile))
s.walkMu.Unlock()
switch { switch {
case err == nil: case err == nil:
// It was an object // It was an object
@ -251,7 +259,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
} }
out <- meta out <- meta
case osIsNotExist(err): case osIsNotExist(err):
s.walkMu.Lock()
meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1)) meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1))
s.walkMu.Unlock()
if err == nil { if err == nil {
// It was an object // It was an object
out <- meta out <- meta

View File

@ -139,6 +139,9 @@ type xlStorage struct {
ctx context.Context ctx context.Context
sync.RWMutex sync.RWMutex
// mutex to prevent concurrent read operations overloading walks.
walkMu sync.Mutex
} }
// checkPathLength - returns error if given path name length more than 255 // checkPathLength - returns error if given path name length more than 255