optimize listing operation concurrency (#12728)

- remove use of getOnlineDisks() instead rely on fallbackDisks()
  when disk return errors like diskNotFound, unformattedDisk
  use other fallback disks to list from, instead of paying the
  price for checking getOnlineDisks()

- optimize getDiskID() further to avoid large write locks when
  looking formatLastCheck time window

This new change allows for a more relaxed fallback for listing
allowing for more tolerance and also eventually gain more
consistency in results even if using '3' disks by default.
This commit is contained in:
Harshavardhana 2021-07-24 22:03:38 -07:00 committed by GitHub
parent de00b641da
commit e124d88788
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 123 additions and 115 deletions

View File

@ -47,38 +47,6 @@ func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) {
return newDisks return newDisks
} }
func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) {
disks := er.getDisks()
var wg sync.WaitGroup
var mu sync.Mutex
for _, i := range hashOrder(UTCNow().String(), len(disks)) {
i := i
wg.Add(1)
go func() {
defer wg.Done()
if disks[i-1] == nil {
return
}
di, err := disks[i-1].DiskInfo(context.Background())
if err != nil || di.Healing {
// - Do not consume disks which are not reachable
// unformatted or simply not accessible for some reason.
//
// - Do not consume disks which are being healed
//
// - Future: skip busy disks
return
}
mu.Lock()
newDisks = append(newDisks, disks[i-1])
mu.Unlock()
}()
}
wg.Wait()
return newDisks
}
// getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice. // getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice.
// ensures to skip disks if they are not healing and online. // ensures to skip disks if they are not healing and online.
func (er erasureObjects) getLoadBalancedDisks(optimized bool) []StorageAPI { func (er erasureObjects) getLoadBalancedDisks(optimized bool) []StorageAPI {

View File

@ -351,18 +351,18 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
// All operations are performed without locks, so we must be careful and allow for failures. // All operations are performed without locks, so we must be careful and allow for failures.
// Read metadata associated with the object from a disk. // Read metadata associated with the object from a disk.
if retries > 0 { if retries > 0 {
disks := er.getOnlineDisks() for _, disk := range er.getDisks() {
if len(disks) == 0 { if disk == nil {
time.Sleep(retryDelay) continue
retries++ }
continue _, err := disk.ReadVersion(ctx, minioMetaBucket,
} o.objectPath(0), "", false)
if err != nil {
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "", false) time.Sleep(retryDelay)
if err != nil { retries++
time.Sleep(retryDelay) continue
retries++ }
continue break
} }
} }
@ -421,20 +421,21 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
if retries > 0 { if retries > 0 {
// Load from one disk only // Load from one disk only
disks := er.getOnlineDisks() for _, disk := range er.getDisks() {
if len(disks) == 0 { if disk == nil {
time.Sleep(retryDelay) continue
retries++ }
continue _, err := disk.ReadVersion(ctx, minioMetaBucket,
} o.objectPath(partN), "", false)
if err != nil {
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "", false) time.Sleep(retryDelay)
if err != nil { retries++
time.Sleep(retryDelay) continue
retries++ }
continue break
} }
} }
// Load first part metadata... // Load first part metadata...
fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{}, true) fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{}, true)
if err != nil { if err != nil {
@ -512,7 +513,8 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, resul
askDisks := o.AskDisks askDisks := o.AskDisks
listingQuorum := o.AskDisks - 1 listingQuorum := o.AskDisks - 1
disks := er.getOnlineDisks() disks := er.getDisks()
var fallbackDisks []StorageAPI
// Special case: ask all disks if the drive count is 4 // Special case: ask all disks if the drive count is 4
if askDisks == -1 || er.setDriveCount == 4 { if askDisks == -1 || er.setDriveCount == 4 {
@ -527,6 +529,7 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, resul
rand.Shuffle(len(disks), func(i, j int) { rand.Shuffle(len(disks), func(i, j int) {
disks[i], disks[j] = disks[j], disks[i] disks[i], disks[j] = disks[j], disks[i]
}) })
fallbackDisks = disks[askDisks:]
disks = disks[:askDisks] disks = disks[:askDisks]
} }
@ -539,13 +542,14 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, resul
ctxDone := ctx.Done() ctxDone := ctx.Done()
return listPathRaw(ctx, listPathRawOptions{ return listPathRaw(ctx, listPathRawOptions{
disks: disks, disks: disks,
bucket: o.Bucket, fallbackDisks: fallbackDisks,
path: o.BaseDir, bucket: o.Bucket,
recursive: o.Recursive, path: o.BaseDir,
filterPrefix: o.FilterPrefix, recursive: o.Recursive,
minDisks: listingQuorum, filterPrefix: o.FilterPrefix,
forwardTo: o.Marker, minDisks: listingQuorum,
forwardTo: o.Marker,
agreed: func(entry metaCacheEntry) { agreed: func(entry metaCacheEntry) {
select { select {
case <-ctxDone: case <-ctxDone:
@ -710,9 +714,10 @@ func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCache
} }
type listPathRawOptions struct { type listPathRawOptions struct {
disks []StorageAPI disks []StorageAPI
bucket, path string fallbackDisks []StorageAPI
recursive bool bucket, path string
recursive bool
// Only return results with this prefix. // Only return results with this prefix.
filterPrefix string filterPrefix string
@ -752,10 +757,18 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
if len(disks) == 0 { if len(disks) == 0 {
return fmt.Errorf("listPathRaw: 0 drives provided") return fmt.Errorf("listPathRaw: 0 drives provided")
} }
// Cancel upstream if we finish before we expect. // Cancel upstream if we finish before we expect.
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
fallback := func(err error) bool {
if err == nil {
return false
}
return err.Error() == errUnformattedDisk.Error() ||
err.Error() == errVolumeNotFound.Error()
}
askDisks := len(disks) askDisks := len(disks)
readers := make([]*metacacheReader, askDisks) readers := make([]*metacacheReader, askDisks)
for i := range disks { for i := range disks {
@ -768,15 +781,44 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
// Send request to each disk. // Send request to each disk.
go func() { go func() {
werr := d.WalkDir(ctx, WalkDirOptions{ var werr error
Bucket: opts.bucket, if d == nil {
BaseDir: opts.path, werr = errDiskNotFound
Recursive: opts.recursive, } else {
ReportNotFound: opts.reportNotFound, werr = d.WalkDir(ctx, WalkDirOptions{
FilterPrefix: opts.filterPrefix, Bucket: opts.bucket,
ForwardTo: opts.forwardTo, BaseDir: opts.path,
}, w) Recursive: opts.recursive,
ReportNotFound: opts.reportNotFound,
FilterPrefix: opts.filterPrefix,
ForwardTo: opts.forwardTo,
}, w)
}
// fallback only when set.
if len(opts.fallbackDisks) > 0 && fallback(werr) {
// This fallback is only set when
// askDisks is less than total
// number of disks per set.
for _, fd := range opts.fallbackDisks {
if fd == nil {
continue
}
werr = fd.WalkDir(ctx, WalkDirOptions{
Bucket: opts.bucket,
BaseDir: opts.path,
Recursive: opts.recursive,
ReportNotFound: opts.reportNotFound,
FilterPrefix: opts.filterPrefix,
ForwardTo: opts.forwardTo,
}, w)
if werr == nil {
break
}
}
}
w.CloseWithError(werr) w.CloseWithError(werr)
if werr != io.EOF && werr != nil && if werr != io.EOF && werr != nil &&
werr.Error() != errFileNotFound.Error() && werr.Error() != errFileNotFound.Error() &&
werr.Error() != errVolumeNotFound.Error() && werr.Error() != errVolumeNotFound.Error() &&
@ -795,10 +837,8 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
for i := range topEntries { for i := range topEntries {
topEntries[i] = metaCacheEntry{} topEntries[i] = metaCacheEntry{}
} }
select { if contextCanceled(ctx) {
case <-ctx.Done():
return ctx.Err() return ctx.Err()
default:
} }
for i, r := range readers { for i, r := range readers {
if errs[i] != nil { if errs[i] != nil {

View File

@ -22,7 +22,6 @@ import (
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
"os"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -66,8 +65,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
} }
// Stat a volume entry. // Stat a volume entry.
_, err = os.Lstat(volumeDir) if err = Access(volumeDir); err != nil {
if err != nil {
if osIsNotExist(err) { if osIsNotExist(err) {
return errVolumeNotFound return errVolumeNotFound
} else if isSysErrIO(err) { } else if isSysErrIO(err) {
@ -100,7 +98,8 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
metadata: metadata, metadata: metadata,
} }
} else { } else {
if st, err := Lstat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile)); err == nil && st.Mode().IsRegular() { st, sterr := Lstat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile))
if sterr == nil && st.Mode().IsRegular() {
return errFileNotFound return errFileNotFound
} }
} }
@ -118,7 +117,6 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
forward = forward[:idx] forward = forward[:idx]
} }
} }
if contextCanceled(ctx) { if contextCanceled(ctx) {
return ctx.Err() return ctx.Err()
} }
@ -134,6 +132,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// Forward some errors? // Forward some errors?
return nil return nil
} }
if len(entries) == 0 {
return nil
}
dirObjects := make(map[string]struct{}) dirObjects := make(map[string]struct{})
for i, entry := range entries { for i, entry := range entries {
if len(prefix) > 0 && !strings.HasPrefix(entry, prefix) { if len(prefix) > 0 && !strings.HasPrefix(entry, prefix) {
@ -245,8 +246,6 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
case osIsNotExist(err): case osIsNotExist(err):
meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1)) meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1))
if err == nil { if err == nil {
// Maybe rename? Would make it inconsistent across disks though.
// os.Rename(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1), pathJoin(volumeDir, meta.name, xlStorageFormatFile))
// It was an object // It was an object
out <- meta out <- meta
continue continue
@ -265,6 +264,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
} }
} }
// If directory entry left on stack, pop it now. // If directory entry left on stack, pop it now.
for len(dirStack) > 0 { for len(dirStack) > 0 {
pop := dirStack[len(dirStack)-1] pop := dirStack[len(dirStack)-1]

View File

@ -513,6 +513,7 @@ func newCustomHTTPProxyTransport(tlsConfig *tls.Config, dialTimeout time.Duratio
Proxy: http.ProxyFromEnvironment, Proxy: http.ProxyFromEnvironment,
DialContext: xhttp.DialContextWithDNSCache(globalDNSCache, xhttp.NewInternodeDialContext(dialTimeout)), DialContext: xhttp.DialContextWithDNSCache(globalDNSCache, xhttp.NewInternodeDialContext(dialTimeout)),
MaxIdleConnsPerHost: 1024, MaxIdleConnsPerHost: 1024,
MaxConnsPerHost: 1024,
WriteBufferSize: 16 << 10, // 16KiB moving up from 4KiB default WriteBufferSize: 16 << 10, // 16KiB moving up from 4KiB default
ReadBufferSize: 16 << 10, // 16KiB moving up from 4KiB default ReadBufferSize: 16 << 10, // 16KiB moving up from 4KiB default
IdleConnTimeout: 15 * time.Second, IdleConnTimeout: 15 * time.Second,

View File

@ -519,7 +519,6 @@ func (s *xlStorage) DiskInfo(context.Context) (info DiskInfo, err error) {
dcinfo.UsedInodes = di.Files - di.Ffree dcinfo.UsedInodes = di.Files - di.Ffree
dcinfo.FreeInodes = di.Ffree dcinfo.FreeInodes = di.Ffree
dcinfo.FSType = di.FSType dcinfo.FSType = di.FSType
diskID, err := s.GetDiskID() diskID, err := s.GetDiskID()
if errors.Is(err, errUnformattedDisk) { if errors.Is(err, errUnformattedDisk) {
// if we found an unformatted disk then // if we found an unformatted disk then
@ -530,7 +529,6 @@ func (s *xlStorage) DiskInfo(context.Context) (info DiskInfo, err error) {
// returned any error other than fresh disk // returned any error other than fresh disk
dcinfo.Healing = s.Healing() != nil dcinfo.Healing = s.Healing() != nil
} }
dcinfo.ID = diskID dcinfo.ID = diskID
return dcinfo, err return dcinfo, err
} }
@ -553,28 +551,7 @@ func (s *xlStorage) getVolDir(volume string) (string, error) {
return volumeDir, nil return volumeDir, nil
} }
// GetDiskID - returns the cached disk uuid func (s *xlStorage) checkFormatJSON() (os.FileInfo, error) {
func (s *xlStorage) GetDiskID() (string, error) {
s.RLock()
diskID := s.diskID
fileInfo := s.formatFileInfo
lastCheck := s.formatLastCheck
s.RUnlock()
// check if we have a valid disk ID that is less than 1 second old.
if fileInfo != nil && diskID != "" && time.Since(lastCheck) <= time.Second {
return diskID, nil
}
s.Lock()
// If somebody else updated the disk ID and changed the time, return what they got.
if !lastCheck.IsZero() && !s.formatLastCheck.Equal(lastCheck) && diskID != "" {
s.Unlock()
// Somebody else got the lock first.
return diskID, nil
}
s.Unlock()
formatFile := pathJoin(s.diskPath, minioMetaBucket, formatConfigFile) formatFile := pathJoin(s.diskPath, minioMetaBucket, formatConfigFile)
fi, err := Lstat(formatFile) fi, err := Lstat(formatFile)
if err != nil { if err != nil {
@ -582,20 +559,41 @@ func (s *xlStorage) GetDiskID() (string, error) {
if osIsNotExist(err) { if osIsNotExist(err) {
if err = Access(s.diskPath); err == nil { if err = Access(s.diskPath); err == nil {
// Disk is present but missing `format.json` // Disk is present but missing `format.json`
return "", errUnformattedDisk return nil, errUnformattedDisk
} }
if osIsNotExist(err) { if osIsNotExist(err) {
return "", errDiskNotFound return nil, errDiskNotFound
} else if osIsPermission(err) { } else if osIsPermission(err) {
return "", errDiskAccessDenied return nil, errDiskAccessDenied
} }
logger.LogIf(GlobalContext, err) // log unexpected errors logger.LogIf(GlobalContext, err) // log unexpected errors
return "", errCorruptedFormat return nil, errCorruptedFormat
} else if osIsPermission(err) { } else if osIsPermission(err) {
return "", errDiskAccessDenied return nil, errDiskAccessDenied
} }
logger.LogIf(GlobalContext, err) // log unexpected errors logger.LogIf(GlobalContext, err) // log unexpected errors
return "", errCorruptedFormat return nil, errCorruptedFormat
}
return fi, nil
}
// GetDiskID - returns the cached disk uuid
func (s *xlStorage) GetDiskID() (string, error) {
s.RLock()
diskID := s.diskID
fileInfo := s.formatFileInfo
lastCheck := s.formatLastCheck
// check if we have a valid disk ID that is less than 1 second old.
if fileInfo != nil && diskID != "" && time.Since(lastCheck) <= time.Second {
s.RUnlock()
return diskID, nil
}
s.RUnlock()
fi, err := s.checkFormatJSON()
if err != nil {
return "", err
} }
if xioutil.SameFile(fi, fileInfo) && diskID != "" { if xioutil.SameFile(fi, fileInfo) && diskID != "" {
@ -606,6 +604,7 @@ func (s *xlStorage) GetDiskID() (string, error) {
return diskID, nil return diskID, nil
} }
formatFile := pathJoin(s.diskPath, minioMetaBucket, formatConfigFile)
b, err := xioutil.ReadFile(formatFile) b, err := xioutil.ReadFile(formatFile)
if err != nil { if err != nil {
// If the disk is still not initialized. // If the disk is still not initialized.