de-couple walkMu and walkReadMu for some granularity (#13231)

This commit brings two locks instead of single lock for
WalkDir() calls on top of c25816eabc.

The main reason is to avoid contention between readMetadata()
and ListDir() calls, ListDir() can take time on prefixes that
are huge for readdir() but this shouldn't end up blocking
all readMetadata() operations, this allows for more room for
I/O while not overly penalizing all listing operations.
This commit is contained in:
Harshavardhana 2021-09-17 12:14:12 -07:00 committed by GitHub
parent 1fc0e9a6aa
commit 66fcd02aa2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 9 additions and 9 deletions

View File

@ -121,6 +121,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
if contextCanceled(ctx) { if contextCanceled(ctx) {
return ctx.Err() return ctx.Err()
} }
s.walkMu.Lock() s.walkMu.Lock()
entries, err := s.ListDir(ctx, opts.Bucket, current, -1) entries, err := s.ListDir(ctx, opts.Bucket, current, -1)
s.walkMu.Unlock() s.walkMu.Unlock()
@ -173,9 +174,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// If root was an object return it as such. // If root was an object return it as such.
if HasSuffix(entry, xlStorageFormatFile) { if HasSuffix(entry, xlStorageFormatFile) {
var meta metaCacheEntry var meta metaCacheEntry
s.walkMu.Lock() s.walkReadMu.Lock()
meta.metadata, err = s.readMetadata(pathJoin(volumeDir, current, entry)) meta.metadata, err = s.readMetadata(pathJoin(volumeDir, current, entry))
s.walkMu.Unlock() s.walkReadMu.Unlock()
if err != nil { if err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
continue continue
@ -190,9 +191,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// Check legacy. // Check legacy.
if HasSuffix(entry, xlStorageFormatFileV1) { if HasSuffix(entry, xlStorageFormatFileV1) {
var meta metaCacheEntry var meta metaCacheEntry
s.walkMu.Lock() s.walkReadMu.Lock()
meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, current, entry)) meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, current, entry))
s.walkMu.Unlock() s.walkReadMu.Unlock()
if err != nil { if err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
continue continue
@ -248,9 +249,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
meta.name = meta.name[:len(meta.name)-1] + globalDirSuffixWithSlash meta.name = meta.name[:len(meta.name)-1] + globalDirSuffixWithSlash
} }
s.walkMu.Lock() s.walkReadMu.Lock()
meta.metadata, err = s.readMetadata(pathJoin(volumeDir, meta.name, xlStorageFormatFile)) meta.metadata, err = s.readMetadata(pathJoin(volumeDir, meta.name, xlStorageFormatFile))
s.walkMu.Unlock() s.walkReadMu.Unlock()
switch { switch {
case err == nil: case err == nil:
// It was an object // It was an object
@ -259,9 +260,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
} }
out <- meta out <- meta
case osIsNotExist(err), isSysErrIsDir(err): case osIsNotExist(err), isSysErrIsDir(err):
s.walkMu.Lock()
meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1)) meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1))
s.walkMu.Unlock()
if err == nil { if err == nil {
// It was an object // It was an object
out <- meta out <- meta

View File

@ -142,7 +142,8 @@ type xlStorage struct {
sync.RWMutex sync.RWMutex
// mutex to prevent concurrent read operations overloading walks. // mutex to prevent concurrent read operations overloading walks.
walkMu sync.Mutex walkMu sync.Mutex
walkReadMu sync.Mutex
} }
// checkPathLength - returns error if given path name length more than 255 // checkPathLength - returns error if given path name length more than 255