diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 8e2d9b4bc..faaf09946 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -22,7 +22,6 @@ import ( "fmt" "net/http" "sort" - "strings" "sync" "time" @@ -883,11 +882,6 @@ func (h *healSequence) healMinioSysMeta(objAPI ObjectLayer, metaPrefix string) f return errHealStopSignalled } - // Skip metacache entries healing - if strings.HasPrefix(object, "buckets/.minio.sys/.metacache/") { - return nil - } - err := h.queueHealTask(healSource{ bucket: bucket, object: object, diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 71f1042ee..f387aa848 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -544,6 +544,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo foundObjs := false dangling := false ctx, cancel := context.WithCancel(ctx) + err := listPathRaw(ctx, listPathRawOptions{ disks: f.disks, bucket: bucket, diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 1a764d50c..bb7c4a128 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1450,69 +1450,80 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re type HealObjectFn func(bucket, object, versionID string) error func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObject HealObjectFn) error { - // If listing did not return any entries upon first attempt, we - // return `ObjectNotFound`, to indicate the caller for any - // actions they may want to take as if `prefix` is missing. - err := toObjectErr(errFileNotFound, bucket, prefix) - for _, erasureSet := range z.serverPools { - for _, set := range erasureSet.sets { - var entryChs []FileInfoVersionsCh - var mu sync.Mutex + errCh := make(chan error) + ctx, cancel := context.WithCancel(ctx) + go func() { + defer close(errCh) + defer cancel() + + for _, erasureSet := range z.serverPools { var wg sync.WaitGroup - for _, disk := range set.getOnlineDisks() { - disk := disk + for _, set := range erasureSet.sets { + set := set wg.Add(1) go func() { defer wg.Done() - entryCh, err := disk.WalkVersions(ctx, bucket, prefix, "", true, ctx.Done()) - if err != nil { - // Disk walk returned error, ignore it. + + disks, _ := set.getOnlineDisksWithHealing() + if len(disks) == 0 { + errCh <- errors.New("HealObjects: No non-healing disks found") + cancel() + return + } + + healEntry := func(entry metaCacheEntry) { + if entry.isDir() { + return + } + fivs, err := entry.fileInfoVersions(bucket) + if err != nil { + errCh <- err + cancel() + return + } + waitForLowHTTPReq(globalHealConfig.IOCount, globalHealConfig.Sleep) + for _, version := range fivs.Versions { + if err := healObject(bucket, version.Name, version.VersionID); err != nil { + errCh <- err + cancel() + return + } + } + } + + // How to resolve partial results. + resolver := metadataResolutionParams{ + dirQuorum: 1, + objQuorum: 1, + bucket: bucket, + } + + if err := listPathRaw(ctx, listPathRawOptions{ + disks: disks, + bucket: bucket, + path: baseDirFromPrefix(prefix), + recursive: true, + forwardTo: "", + minDisks: 1, + reportNotFound: false, + agreed: healEntry, + partial: func(entries metaCacheEntries, nAgreed int, errs []error) { + entry, ok := entries.resolve(&resolver) + if ok { + healEntry(*entry) + } + }, + finished: nil, + }); err != nil { + cancel() return } - mu.Lock() - entryChs = append(entryChs, FileInfoVersionsCh{ - Ch: entryCh, - }) - mu.Unlock() }() } wg.Wait() - - entriesValid := make([]bool, len(entryChs)) - entries := make([]FileInfoVersions, len(entryChs)) - - for { - entry, quorumCount, ok := lexicallySortedEntryVersions(entryChs, entries, entriesValid) - if !ok { - break - } - - // Remove empty directories if found - they have no meaning. - // Can be left over from highly concurrent put/remove. - if quorumCount > set.setDriveCount/2 && entry.IsEmptyDir { - if !opts.DryRun && opts.Remove { - set.deleteEmptyDir(ctx, bucket, entry.Name) - } - } - - // Indicate that first attempt was a success and subsequent loop - // knows that its not our first attempt at 'prefix' - err = nil - - if quorumCount == set.setDriveCount && opts.ScanMode == madmin.HealNormalScan { - continue - } - - for _, version := range entry.Versions { - if err := healObject(bucket, version.Name, version.VersionID); err != nil { - return toObjectErr(err, bucket, version.Name) - } - } - } } - } - - return err + }() + return <-errCh } func (z *erasureServerPools) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) { diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index ff22ce894..b2c908a30 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -973,150 +973,6 @@ func (s *erasureSets) CopyObject(ctx context.Context, srcBucket, srcObject, dstB return dstSet.putObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts) } -// FileInfoVersionsCh - file info versions channel -type FileInfoVersionsCh struct { - Ch chan FileInfoVersions - Prev FileInfoVersions - Valid bool -} - -// Pop - pops a cached entry if any, or from the cached channel. -func (f *FileInfoVersionsCh) Pop() (fi FileInfoVersions, ok bool) { - if f.Valid { - f.Valid = false - return f.Prev, true - } // No cached entries found, read from channel - f.Prev, ok = <-f.Ch - return f.Prev, ok -} - -// Push - cache an entry, for Pop() later. -func (f *FileInfoVersionsCh) Push(fi FileInfoVersions) { - f.Prev = fi - f.Valid = true -} - -// FileInfoCh - file info channel -type FileInfoCh struct { - Ch chan FileInfo - Prev FileInfo - Valid bool -} - -// Pop - pops a cached entry if any, or from the cached channel. -func (f *FileInfoCh) Pop() (fi FileInfo, ok bool) { - if f.Valid { - f.Valid = false - return f.Prev, true - } // No cached entries found, read from channel - f.Prev, ok = <-f.Ch - return f.Prev, ok -} - -// Push - cache an entry, for Pop() later. -func (f *FileInfoCh) Push(fi FileInfo) { - f.Prev = fi - f.Valid = true -} - -// Calculate lexically least entry across multiple FileInfo channels, -// returns the lexically common entry and the total number of times -// we found this entry. Additionally also returns a boolean -// to indicate if the caller needs to call this function -// again to list the next entry. It is callers responsibility -// if the caller wishes to list N entries to call lexicallySortedEntry -// N times until this boolean is 'false'. -func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileInfoVersions, entriesValid []bool) (FileInfoVersions, int, bool) { - for j := range entryChs { - entries[j], entriesValid[j] = entryChs[j].Pop() - } - - var isTruncated = false - for _, valid := range entriesValid { - if !valid { - continue - } - isTruncated = true - break - } - - var lentry FileInfoVersions - var found bool - for i, valid := range entriesValid { - if !valid { - continue - } - if !found { - lentry = entries[i] - found = true - continue - } - if entries[i].Name < lentry.Name { - lentry = entries[i] - } - } - - // We haven't been able to find any lexically least entry, - // this would mean that we don't have valid entry. - if !found { - return lentry, 0, isTruncated - } - - lexicallySortedEntryCount := 0 - for i, valid := range entriesValid { - if !valid { - continue - } - - // Entries are duplicated across disks, - // we should simply skip such entries. - if lentry.Name == entries[i].Name && lentry.LatestModTime.Equal(entries[i].LatestModTime) { - lexicallySortedEntryCount++ - continue - } - - // Push all entries which are lexically higher - // and will be returned later in Pop() - entryChs[i].Push(entries[i]) - } - - return lentry, lexicallySortedEntryCount, isTruncated -} - -func (s *erasureSets) startMergeWalksVersions(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}) []FileInfoVersionsCh { - return s.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, -1) -} - -// Starts a walk versions channel across N number of disks and returns a slice. -// FileInfoCh which can be read from. -func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}, ndisks int) []FileInfoVersionsCh { - var entryChs []FileInfoVersionsCh - var wg sync.WaitGroup - var mutex sync.Mutex - for _, set := range s.sets { - // Reset for the next erasure set. - for _, disk := range set.getLoadBalancedNDisks(ndisks) { - wg.Add(1) - go func(disk StorageAPI) { - defer wg.Done() - - entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, endWalkCh) - if err != nil { - return - } - - mutex.Lock() - entryChs = append(entryChs, FileInfoVersionsCh{ - Ch: entryCh, - }) - mutex.Unlock() - }(disk) - } - } - wg.Wait() - return entryChs -} - func (s *erasureSets) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) { // In list multipart uploads we are going to treat input prefix as the object, // this means that we are not supporting directory navigation. diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 49a877c86..d7d55625b 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -195,10 +195,12 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn if len(disks) == 0 { return errors.New("healErasureSet: No non-healing disks found") } + // Limit listing to 3 drives. if len(disks) > 3 { disks = disks[:3] } + healEntry := func(entry metaCacheEntry) { if entry.isDir() { return @@ -210,7 +212,8 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn } waitForLowHTTPReq(globalHealConfig.IOCount, globalHealConfig.Sleep) for _, version := range fivs.Versions { - if _, err := er.HealObject(ctx, bucket.Name, version.Name, version.VersionID, madmin.HealOpts{ScanMode: madmin.HealNormalScan, Remove: healDeleteDangling}); err != nil { + if _, err := er.HealObject(ctx, bucket.Name, version.Name, version.VersionID, madmin.HealOpts{ + ScanMode: madmin.HealNormalScan, Remove: healDeleteDangling}); err != nil { if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { // If not deleted, assume they failed. tracker.ObjectsFailed++ @@ -228,6 +231,14 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn logger.LogIf(ctx, tracker.update(ctx)) } } + + // How to resolve partial results. + resolver := metadataResolutionParams{ + dirQuorum: 1, + objQuorum: 1, + bucket: bucket.Name, + } + err := listPathRaw(ctx, listPathRawOptions{ disks: disks, bucket: bucket.Name, @@ -237,13 +248,14 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn reportNotFound: false, agreed: healEntry, partial: func(entries metaCacheEntries, nAgreed int, errs []error) { - entry, _ := entries.firstFound() - if entry != nil && !entry.isDir() { + entry, ok := entries.resolve(&resolver) + if ok { healEntry(*entry) } }, finished: nil, }) + select { // If context is canceled don't mark as done... case <-ctx.Done(): diff --git a/cmd/metacache-entries.go b/cmd/metacache-entries.go index cb57237ef..f19992c64 100644 --- a/cmd/metacache-entries.go +++ b/cmd/metacache-entries.go @@ -232,15 +232,16 @@ func (m metaCacheEntries) resolve(r *metadataResolutionParams) (selected *metaCa } } - // If directory, we need quorum. - if dirExists > 0 && dirExists < r.dirQuorum { + if selected == nil { return nil, false } - if objExists < r.objQuorum { + + if selected.isDir() && dirExists < r.dirQuorum { + return nil, false + } else if !selected.isDir() && objExists < r.objQuorum { return nil, false } - // Take the latest selected. - return selected, selected != nil + return selected, true } // firstFound returns the first found and the number of set entries. diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 6dc6497b1..8be141da5 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -812,10 +812,6 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) { return fmt.Errorf("listPathRaw: 0 drives provided") } - // Disconnect from call above, but cancel on exit. - ctx, cancel := context.WithCancel(GlobalContext) - defer cancel() - askDisks := len(disks) readers := make([]*metacacheReader, askDisks) for i := range disks { diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 487aeb37c..5c201fdab 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -156,13 +156,6 @@ func (d *naughtyDisk) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Wr return d.disk.WalkDir(ctx, opts, wr) } -func (d *naughtyDisk) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) { - if err := d.calcError(); err != nil { - return nil, err - } - return d.disk.WalkVersions(ctx, volume, dirPath, marker, recursive, endWalkCh) -} - func (d *naughtyDisk) ListDir(ctx context.Context, volume, dirPath string, count int) (entries []string, err error) { if err := d.calcError(); err != nil { return []string{}, err diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 220cf3d30..5ca2f84d7 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -51,9 +51,6 @@ type StorageAPI interface { // WalkDir will walk a directory on disk and return a metacache stream on wr. WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) error - // WalkVersions in sorted order directly on disk. - WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) - // Metadata operations DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 0d225bac5..a3a72f800 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -508,45 +508,6 @@ func (client *storageRESTClient) ReadFile(ctx context.Context, volume string, pa return int64(n), err } -func (client *storageRESTClient) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) { - values := make(url.Values) - values.Set(storageRESTVolume, volume) - values.Set(storageRESTDirPath, dirPath) - values.Set(storageRESTMarkerPath, marker) - values.Set(storageRESTRecursive, strconv.FormatBool(recursive)) - respBody, err := client.call(ctx, storageRESTMethodWalkVersions, values, nil, -1) - if err != nil { - return nil, err - } - - ch := make(chan FileInfoVersions) - go func() { - defer close(ch) - defer http.DrainBody(respBody) - - dec := msgpNewReader(respBody) - defer readMsgpReaderPool.Put(dec) - - for { - var fi FileInfoVersions - if gerr := fi.DecodeMsg(dec); gerr != nil { - // Upon error return - if msgp.Cause(gerr) != io.EOF { - logger.LogIf(GlobalContext, gerr) - } - return - } - select { - case ch <- fi: - case <-endWalkCh: - return - } - } - }() - - return ch, nil -} - // ListDir - lists a directory. func (client *storageRESTClient) ListDir(ctx context.Context, volume, dirPath string, count int) (entries []string, err error) { values := make(url.Values) diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 0f1de4579..7db5666fe 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -17,7 +17,7 @@ package cmd const ( - storageRESTVersion = "v28" // Renamed crawl -> scanner + storageRESTVersion = "v29" // Removed WalkVersions() storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) @@ -45,7 +45,6 @@ const ( storageRESTMethodReadFile = "/readfile" storageRESTMethodReadFileStream = "/readfilestream" storageRESTMethodListDir = "/listdir" - storageRESTMethodWalkVersions = "/walkversions" storageRESTMethodDeleteFile = "/deletefile" storageRESTMethodDeleteVersions = "/deleteverions" storageRESTMethodRenameFile = "/renamefile" @@ -70,7 +69,6 @@ const ( storageRESTOffset = "offset" storageRESTLength = "length" storageRESTCount = "count" - storageRESTMarkerPath = "marker" storageRESTPrefixFilter = "prefix" storageRESTForwardFilter = "forward" storageRESTRecursive = "recursive" diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index c06988a3f..baf3e9070 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -537,35 +537,6 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http w.(http.Flusher).Flush() } -// WalkVersionsHandler - remote caller to start walking at a requested directory path. -func (s *storageRESTServer) WalkVersionsHandler(w http.ResponseWriter, r *http.Request) { - if !s.IsValid(w, r) { - return - } - vars := mux.Vars(r) - volume := vars[storageRESTVolume] - dirPath := vars[storageRESTDirPath] - markerPath := vars[storageRESTMarkerPath] - recursive, err := strconv.ParseBool(vars[storageRESTRecursive]) - if err != nil { - s.writeErrorResponse(w, err) - return - } - - setEventStreamHeaders(w) - - fch, err := s.storage.WalkVersions(r.Context(), volume, dirPath, markerPath, recursive, r.Context().Done()) - if err != nil { - s.writeErrorResponse(w, err) - return - } - encoder := msgp.NewWriter(w) - for fi := range fch { - logger.LogIf(r.Context(), fi.EncodeMsg(encoder)) - } - logger.LogIf(r.Context(), encoder.Flush()) -} - // ListDirHandler - list a directory. func (s *storageRESTServer) ListDirHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -1071,8 +1042,6 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTOffset, storageRESTLength)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodListDir).HandlerFunc(httpTraceHdrs(server.ListDirHandler)). Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTCount)...) - subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkVersions).HandlerFunc(httpTraceHdrs(server.WalkVersionsHandler)). - Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath, storageRESTRecursive)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersions).HandlerFunc(httpTraceHdrs(server.DeleteVersionsHandler)). Queries(restQueries(storageRESTVolume, storageRESTTotalVersions)...) diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 908e326e4..904c68050 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -192,19 +192,6 @@ func (p *xlStorageDiskIDCheck) DeleteVol(ctx context.Context, volume string, for return p.storage.DeleteVol(ctx, volume, forceDelete) } -func (p *xlStorageDiskIDCheck) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - } - - if err := p.checkDiskStale(); err != nil { - return nil, err - } - return p.storage.WalkVersions(ctx, volume, dirPath, marker, recursive, endWalkCh) -} - func (p *xlStorageDiskIDCheck) ListDir(ctx context.Context, volume, dirPath string, count int) ([]string, error) { select { case <-ctx.Done(): diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 3e13e65bd..126298cb4 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -791,92 +791,6 @@ func (s *xlStorage) isLeafDir(volume, leafPath string) bool { return isDirEmpty(pathJoin(volumeDir, leafPath)) } -// WalkVersions - is a sorted walker which returns file entries in lexically sorted order, -// additionally along with metadata version info about each of those entries. -func (s *xlStorage) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (ch chan FileInfoVersions, err error) { - atomic.AddInt32(&s.activeIOCount, 1) - defer func() { - atomic.AddInt32(&s.activeIOCount, -1) - }() - - // Verify if volume is valid and it exists. - volumeDir, err := s.getVolDir(volume) - if err != nil { - return nil, err - } - - // Stat a volume entry. - _, err = os.Lstat(volumeDir) - if err != nil { - if osIsNotExist(err) { - return nil, errVolumeNotFound - } else if isSysErrIO(err) { - return nil, errFaultyDisk - } - return nil, err - } - - // Fast exit track to check if we are listing an object with - // a trailing slash, this will avoid to list the object content. - if HasSuffix(dirPath, SlashSeparator) { - if st, err := os.Lstat(pathJoin(volumeDir, dirPath, xlStorageFormatFile)); err == nil && st.Mode().IsRegular() { - return nil, errFileNotFound - } - } - - ch = make(chan FileInfoVersions) - go func() { - defer close(ch) - listDir := func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string, delayIsLeaf bool) { - entries, err := s.ListDir(ctx, volume, dirPath, -1) - if err != nil { - return false, nil, false - } - if len(entries) == 0 { - return true, nil, false - } - entries, delayIsLeaf = filterListEntries(volume, dirPath, entries, dirEntry, s.isLeaf) - return false, entries, delayIsLeaf - } - - walkResultCh := startTreeWalk(GlobalContext, volume, dirPath, marker, recursive, listDir, s.isLeaf, s.isLeafDir, endWalkCh) - for walkResult := range walkResultCh { - var fiv FileInfoVersions - if HasSuffix(walkResult.entry, SlashSeparator) { - fiv = FileInfoVersions{ - Volume: volume, - Name: walkResult.entry, - IsEmptyDir: walkResult.isEmptyDir, - Versions: []FileInfo{ - { - Volume: volume, - Name: walkResult.entry, - Mode: uint32(os.ModeDir), - }, - }, - } - } else { - xlMetaBuf, err := xioutil.ReadFile(pathJoin(volumeDir, walkResult.entry, xlStorageFormatFile)) - if err != nil { - continue - } - - fiv, err = getFileInfoVersions(xlMetaBuf, volume, walkResult.entry) - if err != nil { - continue - } - } - select { - case ch <- fiv: - case <-endWalkCh: - return - } - } - }() - - return ch, nil -} - // ListDir - return all the entries at the given directory path. // If an entry is a directory it will be returned with a trailing SlashSeparator. func (s *xlStorage) ListDir(ctx context.Context, volume, dirPath string, count int) (entries []string, err error) {