diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index 0a2bd6748..1f9ddf4ca 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -47,38 +47,6 @@ func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) { return newDisks } -func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) { - disks := er.getDisks() - var wg sync.WaitGroup - var mu sync.Mutex - for _, i := range hashOrder(UTCNow().String(), len(disks)) { - i := i - wg.Add(1) - go func() { - defer wg.Done() - if disks[i-1] == nil { - return - } - di, err := disks[i-1].DiskInfo(context.Background()) - if err != nil || di.Healing { - // - Do not consume disks which are not reachable - // unformatted or simply not accessible for some reason. - // - // - Do not consume disks which are being healed - // - // - Future: skip busy disks - return - } - - mu.Lock() - newDisks = append(newDisks, disks[i-1]) - mu.Unlock() - }() - } - wg.Wait() - return newDisks -} - // getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice. // ensures to skip disks if they are not healing and online. func (er erasureObjects) getLoadBalancedDisks(optimized bool) []StorageAPI { diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 9cab3c6b6..841607cc3 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -351,18 +351,18 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt // All operations are performed without locks, so we must be careful and allow for failures. // Read metadata associated with the object from a disk. if retries > 0 { - disks := er.getOnlineDisks() - if len(disks) == 0 { - time.Sleep(retryDelay) - retries++ - continue - } - - _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "", false) - if err != nil { - time.Sleep(retryDelay) - retries++ - continue + for _, disk := range er.getDisks() { + if disk == nil { + continue + } + _, err := disk.ReadVersion(ctx, minioMetaBucket, + o.objectPath(0), "", false) + if err != nil { + time.Sleep(retryDelay) + retries++ + continue + } + break } } @@ -421,20 +421,21 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt if retries > 0 { // Load from one disk only - disks := er.getOnlineDisks() - if len(disks) == 0 { - time.Sleep(retryDelay) - retries++ - continue - } - - _, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "", false) - if err != nil { - time.Sleep(retryDelay) - retries++ - continue + for _, disk := range er.getDisks() { + if disk == nil { + continue + } + _, err := disk.ReadVersion(ctx, minioMetaBucket, + o.objectPath(partN), "", false) + if err != nil { + time.Sleep(retryDelay) + retries++ + continue + } + break } } + // Load first part metadata... fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{}, true) if err != nil { @@ -512,7 +513,8 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, resul askDisks := o.AskDisks listingQuorum := o.AskDisks - 1 - disks := er.getOnlineDisks() + disks := er.getDisks() + var fallbackDisks []StorageAPI // Special case: ask all disks if the drive count is 4 if askDisks == -1 || er.setDriveCount == 4 { @@ -527,6 +529,7 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, resul rand.Shuffle(len(disks), func(i, j int) { disks[i], disks[j] = disks[j], disks[i] }) + fallbackDisks = disks[askDisks:] disks = disks[:askDisks] } @@ -539,13 +542,14 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, resul ctxDone := ctx.Done() return listPathRaw(ctx, listPathRawOptions{ - disks: disks, - bucket: o.Bucket, - path: o.BaseDir, - recursive: o.Recursive, - filterPrefix: o.FilterPrefix, - minDisks: listingQuorum, - forwardTo: o.Marker, + disks: disks, + fallbackDisks: fallbackDisks, + bucket: o.Bucket, + path: o.BaseDir, + recursive: o.Recursive, + filterPrefix: o.FilterPrefix, + minDisks: listingQuorum, + forwardTo: o.Marker, agreed: func(entry metaCacheEntry) { select { case <-ctxDone: @@ -710,9 +714,10 @@ func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCache } type listPathRawOptions struct { - disks []StorageAPI - bucket, path string - recursive bool + disks []StorageAPI + fallbackDisks []StorageAPI + bucket, path string + recursive bool // Only return results with this prefix. filterPrefix string @@ -752,10 +757,18 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) { if len(disks) == 0 { return fmt.Errorf("listPathRaw: 0 drives provided") } + // Cancel upstream if we finish before we expect. ctx, cancel := context.WithCancel(ctx) defer cancel() + fallback := func(err error) bool { + if err == nil { + return false + } + return err.Error() == errUnformattedDisk.Error() || + err.Error() == errVolumeNotFound.Error() + } askDisks := len(disks) readers := make([]*metacacheReader, askDisks) for i := range disks { @@ -768,15 +781,44 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) { // Send request to each disk. go func() { - werr := d.WalkDir(ctx, WalkDirOptions{ - Bucket: opts.bucket, - BaseDir: opts.path, - Recursive: opts.recursive, - ReportNotFound: opts.reportNotFound, - FilterPrefix: opts.filterPrefix, - ForwardTo: opts.forwardTo, - }, w) + var werr error + if d == nil { + werr = errDiskNotFound + } else { + werr = d.WalkDir(ctx, WalkDirOptions{ + Bucket: opts.bucket, + BaseDir: opts.path, + Recursive: opts.recursive, + ReportNotFound: opts.reportNotFound, + FilterPrefix: opts.filterPrefix, + ForwardTo: opts.forwardTo, + }, w) + } + + // fallback only when set. + if len(opts.fallbackDisks) > 0 && fallback(werr) { + // This fallback is only set when + // askDisks is less than total + // number of disks per set. + for _, fd := range opts.fallbackDisks { + if fd == nil { + continue + } + werr = fd.WalkDir(ctx, WalkDirOptions{ + Bucket: opts.bucket, + BaseDir: opts.path, + Recursive: opts.recursive, + ReportNotFound: opts.reportNotFound, + FilterPrefix: opts.filterPrefix, + ForwardTo: opts.forwardTo, + }, w) + if werr == nil { + break + } + } + } w.CloseWithError(werr) + if werr != io.EOF && werr != nil && werr.Error() != errFileNotFound.Error() && werr.Error() != errVolumeNotFound.Error() && @@ -795,10 +837,8 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) { for i := range topEntries { topEntries[i] = metaCacheEntry{} } - select { - case <-ctx.Done(): + if contextCanceled(ctx) { return ctx.Err() - default: } for i, r := range readers { if errs[i] != nil { diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index 67349f74f..78af572a4 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -22,7 +22,6 @@ import ( "io" "net/http" "net/url" - "os" "sort" "strconv" "strings" @@ -66,8 +65,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ } // Stat a volume entry. - _, err = os.Lstat(volumeDir) - if err != nil { + if err = Access(volumeDir); err != nil { if osIsNotExist(err) { return errVolumeNotFound } else if isSysErrIO(err) { @@ -100,7 +98,8 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ metadata: metadata, } } else { - if st, err := Lstat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile)); err == nil && st.Mode().IsRegular() { + st, sterr := Lstat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile)) + if sterr == nil && st.Mode().IsRegular() { return errFileNotFound } } @@ -118,7 +117,6 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ forward = forward[:idx] } } - if contextCanceled(ctx) { return ctx.Err() } @@ -134,6 +132,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ // Forward some errors? return nil } + if len(entries) == 0 { + return nil + } dirObjects := make(map[string]struct{}) for i, entry := range entries { if len(prefix) > 0 && !strings.HasPrefix(entry, prefix) { @@ -245,8 +246,6 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ case osIsNotExist(err): meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1)) if err == nil { - // Maybe rename? Would make it inconsistent across disks though. - // os.Rename(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1), pathJoin(volumeDir, meta.name, xlStorageFormatFile)) // It was an object out <- meta continue @@ -265,6 +264,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ logger.LogIf(ctx, err) } } + // If directory entry left on stack, pop it now. for len(dirStack) > 0 { pop := dirStack[len(dirStack)-1] diff --git a/cmd/utils.go b/cmd/utils.go index 81d76e72b..875a6efa6 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -513,6 +513,7 @@ func newCustomHTTPProxyTransport(tlsConfig *tls.Config, dialTimeout time.Duratio Proxy: http.ProxyFromEnvironment, DialContext: xhttp.DialContextWithDNSCache(globalDNSCache, xhttp.NewInternodeDialContext(dialTimeout)), MaxIdleConnsPerHost: 1024, + MaxConnsPerHost: 1024, WriteBufferSize: 16 << 10, // 16KiB moving up from 4KiB default ReadBufferSize: 16 << 10, // 16KiB moving up from 4KiB default IdleConnTimeout: 15 * time.Second, diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 6a97a255b..2a1a55a57 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -519,7 +519,6 @@ func (s *xlStorage) DiskInfo(context.Context) (info DiskInfo, err error) { dcinfo.UsedInodes = di.Files - di.Ffree dcinfo.FreeInodes = di.Ffree dcinfo.FSType = di.FSType - diskID, err := s.GetDiskID() if errors.Is(err, errUnformattedDisk) { // if we found an unformatted disk then @@ -530,7 +529,6 @@ func (s *xlStorage) DiskInfo(context.Context) (info DiskInfo, err error) { // returned any error other than fresh disk dcinfo.Healing = s.Healing() != nil } - dcinfo.ID = diskID return dcinfo, err } @@ -553,28 +551,7 @@ func (s *xlStorage) getVolDir(volume string) (string, error) { return volumeDir, nil } -// GetDiskID - returns the cached disk uuid -func (s *xlStorage) GetDiskID() (string, error) { - s.RLock() - diskID := s.diskID - fileInfo := s.formatFileInfo - lastCheck := s.formatLastCheck - s.RUnlock() - - // check if we have a valid disk ID that is less than 1 second old. - if fileInfo != nil && diskID != "" && time.Since(lastCheck) <= time.Second { - return diskID, nil - } - - s.Lock() - // If somebody else updated the disk ID and changed the time, return what they got. - if !lastCheck.IsZero() && !s.formatLastCheck.Equal(lastCheck) && diskID != "" { - s.Unlock() - // Somebody else got the lock first. - return diskID, nil - } - s.Unlock() - +func (s *xlStorage) checkFormatJSON() (os.FileInfo, error) { formatFile := pathJoin(s.diskPath, minioMetaBucket, formatConfigFile) fi, err := Lstat(formatFile) if err != nil { @@ -582,20 +559,41 @@ func (s *xlStorage) GetDiskID() (string, error) { if osIsNotExist(err) { if err = Access(s.diskPath); err == nil { // Disk is present but missing `format.json` - return "", errUnformattedDisk + return nil, errUnformattedDisk } if osIsNotExist(err) { - return "", errDiskNotFound + return nil, errDiskNotFound } else if osIsPermission(err) { - return "", errDiskAccessDenied + return nil, errDiskAccessDenied } logger.LogIf(GlobalContext, err) // log unexpected errors - return "", errCorruptedFormat + return nil, errCorruptedFormat } else if osIsPermission(err) { - return "", errDiskAccessDenied + return nil, errDiskAccessDenied } logger.LogIf(GlobalContext, err) // log unexpected errors - return "", errCorruptedFormat + return nil, errCorruptedFormat + } + return fi, nil +} + +// GetDiskID - returns the cached disk uuid +func (s *xlStorage) GetDiskID() (string, error) { + s.RLock() + diskID := s.diskID + fileInfo := s.formatFileInfo + lastCheck := s.formatLastCheck + + // check if we have a valid disk ID that is less than 1 second old. + if fileInfo != nil && diskID != "" && time.Since(lastCheck) <= time.Second { + s.RUnlock() + return diskID, nil + } + s.RUnlock() + + fi, err := s.checkFormatJSON() + if err != nil { + return "", err } if xioutil.SameFile(fi, fileInfo) && diskID != "" { @@ -606,6 +604,7 @@ func (s *xlStorage) GetDiskID() (string, error) { return diskID, nil } + formatFile := pathJoin(s.diskPath, minioMetaBucket, formatConfigFile) b, err := xioutil.ReadFile(formatFile) if err != nil { // If the disk is still not initialized.