mirror of https://github.com/minio/minio.git
use listPathRaw for HealObjects() instead of expensive WalkVersions() (#11675)
This commit is contained in:
parent
509bcc01ad
commit
d971061305
|
@ -22,7 +22,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -883,11 +882,6 @@ func (h *healSequence) healMinioSysMeta(objAPI ObjectLayer, metaPrefix string) f
|
||||||
return errHealStopSignalled
|
return errHealStopSignalled
|
||||||
}
|
}
|
||||||
|
|
||||||
// Skip metacache entries healing
|
|
||||||
if strings.HasPrefix(object, "buckets/.minio.sys/.metacache/") {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
err := h.queueHealTask(healSource{
|
err := h.queueHealTask(healSource{
|
||||||
bucket: bucket,
|
bucket: bucket,
|
||||||
object: object,
|
object: object,
|
||||||
|
|
|
@ -544,6 +544,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
||||||
foundObjs := false
|
foundObjs := false
|
||||||
dangling := false
|
dangling := false
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
err := listPathRaw(ctx, listPathRawOptions{
|
err := listPathRaw(ctx, listPathRawOptions{
|
||||||
disks: f.disks,
|
disks: f.disks,
|
||||||
bucket: bucket,
|
bucket: bucket,
|
||||||
|
|
|
@ -1450,69 +1450,80 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
|
||||||
type HealObjectFn func(bucket, object, versionID string) error
|
type HealObjectFn func(bucket, object, versionID string) error
|
||||||
|
|
||||||
func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObject HealObjectFn) error {
|
func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObject HealObjectFn) error {
|
||||||
// If listing did not return any entries upon first attempt, we
|
errCh := make(chan error)
|
||||||
// return `ObjectNotFound`, to indicate the caller for any
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
// actions they may want to take as if `prefix` is missing.
|
go func() {
|
||||||
err := toObjectErr(errFileNotFound, bucket, prefix)
|
defer close(errCh)
|
||||||
for _, erasureSet := range z.serverPools {
|
defer cancel()
|
||||||
for _, set := range erasureSet.sets {
|
|
||||||
var entryChs []FileInfoVersionsCh
|
for _, erasureSet := range z.serverPools {
|
||||||
var mu sync.Mutex
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for _, disk := range set.getOnlineDisks() {
|
for _, set := range erasureSet.sets {
|
||||||
disk := disk
|
set := set
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
entryCh, err := disk.WalkVersions(ctx, bucket, prefix, "", true, ctx.Done())
|
|
||||||
if err != nil {
|
disks, _ := set.getOnlineDisksWithHealing()
|
||||||
// Disk walk returned error, ignore it.
|
if len(disks) == 0 {
|
||||||
|
errCh <- errors.New("HealObjects: No non-healing disks found")
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
healEntry := func(entry metaCacheEntry) {
|
||||||
|
if entry.isDir() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fivs, err := entry.fileInfoVersions(bucket)
|
||||||
|
if err != nil {
|
||||||
|
errCh <- err
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
waitForLowHTTPReq(globalHealConfig.IOCount, globalHealConfig.Sleep)
|
||||||
|
for _, version := range fivs.Versions {
|
||||||
|
if err := healObject(bucket, version.Name, version.VersionID); err != nil {
|
||||||
|
errCh <- err
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// How to resolve partial results.
|
||||||
|
resolver := metadataResolutionParams{
|
||||||
|
dirQuorum: 1,
|
||||||
|
objQuorum: 1,
|
||||||
|
bucket: bucket,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := listPathRaw(ctx, listPathRawOptions{
|
||||||
|
disks: disks,
|
||||||
|
bucket: bucket,
|
||||||
|
path: baseDirFromPrefix(prefix),
|
||||||
|
recursive: true,
|
||||||
|
forwardTo: "",
|
||||||
|
minDisks: 1,
|
||||||
|
reportNotFound: false,
|
||||||
|
agreed: healEntry,
|
||||||
|
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
|
||||||
|
entry, ok := entries.resolve(&resolver)
|
||||||
|
if ok {
|
||||||
|
healEntry(*entry)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
finished: nil,
|
||||||
|
}); err != nil {
|
||||||
|
cancel()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
mu.Lock()
|
|
||||||
entryChs = append(entryChs, FileInfoVersionsCh{
|
|
||||||
Ch: entryCh,
|
|
||||||
})
|
|
||||||
mu.Unlock()
|
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
entriesValid := make([]bool, len(entryChs))
|
|
||||||
entries := make([]FileInfoVersions, len(entryChs))
|
|
||||||
|
|
||||||
for {
|
|
||||||
entry, quorumCount, ok := lexicallySortedEntryVersions(entryChs, entries, entriesValid)
|
|
||||||
if !ok {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove empty directories if found - they have no meaning.
|
|
||||||
// Can be left over from highly concurrent put/remove.
|
|
||||||
if quorumCount > set.setDriveCount/2 && entry.IsEmptyDir {
|
|
||||||
if !opts.DryRun && opts.Remove {
|
|
||||||
set.deleteEmptyDir(ctx, bucket, entry.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Indicate that first attempt was a success and subsequent loop
|
|
||||||
// knows that its not our first attempt at 'prefix'
|
|
||||||
err = nil
|
|
||||||
|
|
||||||
if quorumCount == set.setDriveCount && opts.ScanMode == madmin.HealNormalScan {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, version := range entry.Versions {
|
|
||||||
if err := healObject(bucket, version.Name, version.VersionID); err != nil {
|
|
||||||
return toObjectErr(err, bucket, version.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}()
|
||||||
|
return <-errCh
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerPools) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
|
func (z *erasureServerPools) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
|
||||||
|
|
|
@ -973,150 +973,6 @@ func (s *erasureSets) CopyObject(ctx context.Context, srcBucket, srcObject, dstB
|
||||||
return dstSet.putObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts)
|
return dstSet.putObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FileInfoVersionsCh - file info versions channel
|
|
||||||
type FileInfoVersionsCh struct {
|
|
||||||
Ch chan FileInfoVersions
|
|
||||||
Prev FileInfoVersions
|
|
||||||
Valid bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pop - pops a cached entry if any, or from the cached channel.
|
|
||||||
func (f *FileInfoVersionsCh) Pop() (fi FileInfoVersions, ok bool) {
|
|
||||||
if f.Valid {
|
|
||||||
f.Valid = false
|
|
||||||
return f.Prev, true
|
|
||||||
} // No cached entries found, read from channel
|
|
||||||
f.Prev, ok = <-f.Ch
|
|
||||||
return f.Prev, ok
|
|
||||||
}
|
|
||||||
|
|
||||||
// Push - cache an entry, for Pop() later.
|
|
||||||
func (f *FileInfoVersionsCh) Push(fi FileInfoVersions) {
|
|
||||||
f.Prev = fi
|
|
||||||
f.Valid = true
|
|
||||||
}
|
|
||||||
|
|
||||||
// FileInfoCh - file info channel
|
|
||||||
type FileInfoCh struct {
|
|
||||||
Ch chan FileInfo
|
|
||||||
Prev FileInfo
|
|
||||||
Valid bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pop - pops a cached entry if any, or from the cached channel.
|
|
||||||
func (f *FileInfoCh) Pop() (fi FileInfo, ok bool) {
|
|
||||||
if f.Valid {
|
|
||||||
f.Valid = false
|
|
||||||
return f.Prev, true
|
|
||||||
} // No cached entries found, read from channel
|
|
||||||
f.Prev, ok = <-f.Ch
|
|
||||||
return f.Prev, ok
|
|
||||||
}
|
|
||||||
|
|
||||||
// Push - cache an entry, for Pop() later.
|
|
||||||
func (f *FileInfoCh) Push(fi FileInfo) {
|
|
||||||
f.Prev = fi
|
|
||||||
f.Valid = true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calculate lexically least entry across multiple FileInfo channels,
|
|
||||||
// returns the lexically common entry and the total number of times
|
|
||||||
// we found this entry. Additionally also returns a boolean
|
|
||||||
// to indicate if the caller needs to call this function
|
|
||||||
// again to list the next entry. It is callers responsibility
|
|
||||||
// if the caller wishes to list N entries to call lexicallySortedEntry
|
|
||||||
// N times until this boolean is 'false'.
|
|
||||||
func lexicallySortedEntryVersions(entryChs []FileInfoVersionsCh, entries []FileInfoVersions, entriesValid []bool) (FileInfoVersions, int, bool) {
|
|
||||||
for j := range entryChs {
|
|
||||||
entries[j], entriesValid[j] = entryChs[j].Pop()
|
|
||||||
}
|
|
||||||
|
|
||||||
var isTruncated = false
|
|
||||||
for _, valid := range entriesValid {
|
|
||||||
if !valid {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
isTruncated = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
var lentry FileInfoVersions
|
|
||||||
var found bool
|
|
||||||
for i, valid := range entriesValid {
|
|
||||||
if !valid {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if !found {
|
|
||||||
lentry = entries[i]
|
|
||||||
found = true
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if entries[i].Name < lentry.Name {
|
|
||||||
lentry = entries[i]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// We haven't been able to find any lexically least entry,
|
|
||||||
// this would mean that we don't have valid entry.
|
|
||||||
if !found {
|
|
||||||
return lentry, 0, isTruncated
|
|
||||||
}
|
|
||||||
|
|
||||||
lexicallySortedEntryCount := 0
|
|
||||||
for i, valid := range entriesValid {
|
|
||||||
if !valid {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Entries are duplicated across disks,
|
|
||||||
// we should simply skip such entries.
|
|
||||||
if lentry.Name == entries[i].Name && lentry.LatestModTime.Equal(entries[i].LatestModTime) {
|
|
||||||
lexicallySortedEntryCount++
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Push all entries which are lexically higher
|
|
||||||
// and will be returned later in Pop()
|
|
||||||
entryChs[i].Push(entries[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
return lentry, lexicallySortedEntryCount, isTruncated
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *erasureSets) startMergeWalksVersions(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}) []FileInfoVersionsCh {
|
|
||||||
return s.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, -1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Starts a walk versions channel across N number of disks and returns a slice.
|
|
||||||
// FileInfoCh which can be read from.
|
|
||||||
func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}, ndisks int) []FileInfoVersionsCh {
|
|
||||||
var entryChs []FileInfoVersionsCh
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
var mutex sync.Mutex
|
|
||||||
for _, set := range s.sets {
|
|
||||||
// Reset for the next erasure set.
|
|
||||||
for _, disk := range set.getLoadBalancedNDisks(ndisks) {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(disk StorageAPI) {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, endWalkCh)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
mutex.Lock()
|
|
||||||
entryChs = append(entryChs, FileInfoVersionsCh{
|
|
||||||
Ch: entryCh,
|
|
||||||
})
|
|
||||||
mutex.Unlock()
|
|
||||||
}(disk)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
return entryChs
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *erasureSets) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
|
func (s *erasureSets) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
|
||||||
// In list multipart uploads we are going to treat input prefix as the object,
|
// In list multipart uploads we are going to treat input prefix as the object,
|
||||||
// this means that we are not supporting directory navigation.
|
// this means that we are not supporting directory navigation.
|
||||||
|
|
|
@ -195,10 +195,12 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn
|
||||||
if len(disks) == 0 {
|
if len(disks) == 0 {
|
||||||
return errors.New("healErasureSet: No non-healing disks found")
|
return errors.New("healErasureSet: No non-healing disks found")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Limit listing to 3 drives.
|
// Limit listing to 3 drives.
|
||||||
if len(disks) > 3 {
|
if len(disks) > 3 {
|
||||||
disks = disks[:3]
|
disks = disks[:3]
|
||||||
}
|
}
|
||||||
|
|
||||||
healEntry := func(entry metaCacheEntry) {
|
healEntry := func(entry metaCacheEntry) {
|
||||||
if entry.isDir() {
|
if entry.isDir() {
|
||||||
return
|
return
|
||||||
|
@ -210,7 +212,8 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn
|
||||||
}
|
}
|
||||||
waitForLowHTTPReq(globalHealConfig.IOCount, globalHealConfig.Sleep)
|
waitForLowHTTPReq(globalHealConfig.IOCount, globalHealConfig.Sleep)
|
||||||
for _, version := range fivs.Versions {
|
for _, version := range fivs.Versions {
|
||||||
if _, err := er.HealObject(ctx, bucket.Name, version.Name, version.VersionID, madmin.HealOpts{ScanMode: madmin.HealNormalScan, Remove: healDeleteDangling}); err != nil {
|
if _, err := er.HealObject(ctx, bucket.Name, version.Name, version.VersionID, madmin.HealOpts{
|
||||||
|
ScanMode: madmin.HealNormalScan, Remove: healDeleteDangling}); err != nil {
|
||||||
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
||||||
// If not deleted, assume they failed.
|
// If not deleted, assume they failed.
|
||||||
tracker.ObjectsFailed++
|
tracker.ObjectsFailed++
|
||||||
|
@ -228,6 +231,14 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn
|
||||||
logger.LogIf(ctx, tracker.update(ctx))
|
logger.LogIf(ctx, tracker.update(ctx))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// How to resolve partial results.
|
||||||
|
resolver := metadataResolutionParams{
|
||||||
|
dirQuorum: 1,
|
||||||
|
objQuorum: 1,
|
||||||
|
bucket: bucket.Name,
|
||||||
|
}
|
||||||
|
|
||||||
err := listPathRaw(ctx, listPathRawOptions{
|
err := listPathRaw(ctx, listPathRawOptions{
|
||||||
disks: disks,
|
disks: disks,
|
||||||
bucket: bucket.Name,
|
bucket: bucket.Name,
|
||||||
|
@ -237,13 +248,14 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn
|
||||||
reportNotFound: false,
|
reportNotFound: false,
|
||||||
agreed: healEntry,
|
agreed: healEntry,
|
||||||
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
|
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
|
||||||
entry, _ := entries.firstFound()
|
entry, ok := entries.resolve(&resolver)
|
||||||
if entry != nil && !entry.isDir() {
|
if ok {
|
||||||
healEntry(*entry)
|
healEntry(*entry)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
finished: nil,
|
finished: nil,
|
||||||
})
|
})
|
||||||
|
|
||||||
select {
|
select {
|
||||||
// If context is canceled don't mark as done...
|
// If context is canceled don't mark as done...
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
|
@ -232,15 +232,16 @@ func (m metaCacheEntries) resolve(r *metadataResolutionParams) (selected *metaCa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If directory, we need quorum.
|
if selected == nil {
|
||||||
if dirExists > 0 && dirExists < r.dirQuorum {
|
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
if objExists < r.objQuorum {
|
|
||||||
|
if selected.isDir() && dirExists < r.dirQuorum {
|
||||||
|
return nil, false
|
||||||
|
} else if !selected.isDir() && objExists < r.objQuorum {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
// Take the latest selected.
|
return selected, true
|
||||||
return selected, selected != nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// firstFound returns the first found and the number of set entries.
|
// firstFound returns the first found and the number of set entries.
|
||||||
|
|
|
@ -812,10 +812,6 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
|
||||||
return fmt.Errorf("listPathRaw: 0 drives provided")
|
return fmt.Errorf("listPathRaw: 0 drives provided")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Disconnect from call above, but cancel on exit.
|
|
||||||
ctx, cancel := context.WithCancel(GlobalContext)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
askDisks := len(disks)
|
askDisks := len(disks)
|
||||||
readers := make([]*metacacheReader, askDisks)
|
readers := make([]*metacacheReader, askDisks)
|
||||||
for i := range disks {
|
for i := range disks {
|
||||||
|
|
|
@ -156,13 +156,6 @@ func (d *naughtyDisk) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Wr
|
||||||
return d.disk.WalkDir(ctx, opts, wr)
|
return d.disk.WalkDir(ctx, opts, wr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *naughtyDisk) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) {
|
|
||||||
if err := d.calcError(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return d.disk.WalkVersions(ctx, volume, dirPath, marker, recursive, endWalkCh)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *naughtyDisk) ListDir(ctx context.Context, volume, dirPath string, count int) (entries []string, err error) {
|
func (d *naughtyDisk) ListDir(ctx context.Context, volume, dirPath string, count int) (entries []string, err error) {
|
||||||
if err := d.calcError(); err != nil {
|
if err := d.calcError(); err != nil {
|
||||||
return []string{}, err
|
return []string{}, err
|
||||||
|
|
|
@ -51,9 +51,6 @@ type StorageAPI interface {
|
||||||
// WalkDir will walk a directory on disk and return a metacache stream on wr.
|
// WalkDir will walk a directory on disk and return a metacache stream on wr.
|
||||||
WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) error
|
WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) error
|
||||||
|
|
||||||
// WalkVersions in sorted order directly on disk.
|
|
||||||
WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error)
|
|
||||||
|
|
||||||
// Metadata operations
|
// Metadata operations
|
||||||
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error
|
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error
|
||||||
DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error
|
DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error
|
||||||
|
|
|
@ -508,45 +508,6 @@ func (client *storageRESTClient) ReadFile(ctx context.Context, volume string, pa
|
||||||
return int64(n), err
|
return int64(n), err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *storageRESTClient) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) {
|
|
||||||
values := make(url.Values)
|
|
||||||
values.Set(storageRESTVolume, volume)
|
|
||||||
values.Set(storageRESTDirPath, dirPath)
|
|
||||||
values.Set(storageRESTMarkerPath, marker)
|
|
||||||
values.Set(storageRESTRecursive, strconv.FormatBool(recursive))
|
|
||||||
respBody, err := client.call(ctx, storageRESTMethodWalkVersions, values, nil, -1)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
ch := make(chan FileInfoVersions)
|
|
||||||
go func() {
|
|
||||||
defer close(ch)
|
|
||||||
defer http.DrainBody(respBody)
|
|
||||||
|
|
||||||
dec := msgpNewReader(respBody)
|
|
||||||
defer readMsgpReaderPool.Put(dec)
|
|
||||||
|
|
||||||
for {
|
|
||||||
var fi FileInfoVersions
|
|
||||||
if gerr := fi.DecodeMsg(dec); gerr != nil {
|
|
||||||
// Upon error return
|
|
||||||
if msgp.Cause(gerr) != io.EOF {
|
|
||||||
logger.LogIf(GlobalContext, gerr)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case ch <- fi:
|
|
||||||
case <-endWalkCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return ch, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListDir - lists a directory.
|
// ListDir - lists a directory.
|
||||||
func (client *storageRESTClient) ListDir(ctx context.Context, volume, dirPath string, count int) (entries []string, err error) {
|
func (client *storageRESTClient) ListDir(ctx context.Context, volume, dirPath string, count int) (entries []string, err error) {
|
||||||
values := make(url.Values)
|
values := make(url.Values)
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
const (
|
const (
|
||||||
storageRESTVersion = "v28" // Renamed crawl -> scanner
|
storageRESTVersion = "v29" // Removed WalkVersions()
|
||||||
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
|
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
|
||||||
storageRESTPrefix = minioReservedBucketPath + "/storage"
|
storageRESTPrefix = minioReservedBucketPath + "/storage"
|
||||||
)
|
)
|
||||||
|
@ -45,7 +45,6 @@ const (
|
||||||
storageRESTMethodReadFile = "/readfile"
|
storageRESTMethodReadFile = "/readfile"
|
||||||
storageRESTMethodReadFileStream = "/readfilestream"
|
storageRESTMethodReadFileStream = "/readfilestream"
|
||||||
storageRESTMethodListDir = "/listdir"
|
storageRESTMethodListDir = "/listdir"
|
||||||
storageRESTMethodWalkVersions = "/walkversions"
|
|
||||||
storageRESTMethodDeleteFile = "/deletefile"
|
storageRESTMethodDeleteFile = "/deletefile"
|
||||||
storageRESTMethodDeleteVersions = "/deleteverions"
|
storageRESTMethodDeleteVersions = "/deleteverions"
|
||||||
storageRESTMethodRenameFile = "/renamefile"
|
storageRESTMethodRenameFile = "/renamefile"
|
||||||
|
@ -70,7 +69,6 @@ const (
|
||||||
storageRESTOffset = "offset"
|
storageRESTOffset = "offset"
|
||||||
storageRESTLength = "length"
|
storageRESTLength = "length"
|
||||||
storageRESTCount = "count"
|
storageRESTCount = "count"
|
||||||
storageRESTMarkerPath = "marker"
|
|
||||||
storageRESTPrefixFilter = "prefix"
|
storageRESTPrefixFilter = "prefix"
|
||||||
storageRESTForwardFilter = "forward"
|
storageRESTForwardFilter = "forward"
|
||||||
storageRESTRecursive = "recursive"
|
storageRESTRecursive = "recursive"
|
||||||
|
|
|
@ -537,35 +537,6 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http
|
||||||
w.(http.Flusher).Flush()
|
w.(http.Flusher).Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
// WalkVersionsHandler - remote caller to start walking at a requested directory path.
|
|
||||||
func (s *storageRESTServer) WalkVersionsHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if !s.IsValid(w, r) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
vars := mux.Vars(r)
|
|
||||||
volume := vars[storageRESTVolume]
|
|
||||||
dirPath := vars[storageRESTDirPath]
|
|
||||||
markerPath := vars[storageRESTMarkerPath]
|
|
||||||
recursive, err := strconv.ParseBool(vars[storageRESTRecursive])
|
|
||||||
if err != nil {
|
|
||||||
s.writeErrorResponse(w, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
setEventStreamHeaders(w)
|
|
||||||
|
|
||||||
fch, err := s.storage.WalkVersions(r.Context(), volume, dirPath, markerPath, recursive, r.Context().Done())
|
|
||||||
if err != nil {
|
|
||||||
s.writeErrorResponse(w, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
encoder := msgp.NewWriter(w)
|
|
||||||
for fi := range fch {
|
|
||||||
logger.LogIf(r.Context(), fi.EncodeMsg(encoder))
|
|
||||||
}
|
|
||||||
logger.LogIf(r.Context(), encoder.Flush())
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListDirHandler - list a directory.
|
// ListDirHandler - list a directory.
|
||||||
func (s *storageRESTServer) ListDirHandler(w http.ResponseWriter, r *http.Request) {
|
func (s *storageRESTServer) ListDirHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
if !s.IsValid(w, r) {
|
if !s.IsValid(w, r) {
|
||||||
|
@ -1071,8 +1042,6 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
|
||||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTOffset, storageRESTLength)...)
|
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTOffset, storageRESTLength)...)
|
||||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodListDir).HandlerFunc(httpTraceHdrs(server.ListDirHandler)).
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodListDir).HandlerFunc(httpTraceHdrs(server.ListDirHandler)).
|
||||||
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTCount)...)
|
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTCount)...)
|
||||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkVersions).HandlerFunc(httpTraceHdrs(server.WalkVersionsHandler)).
|
|
||||||
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath, storageRESTRecursive)...)
|
|
||||||
|
|
||||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersions).HandlerFunc(httpTraceHdrs(server.DeleteVersionsHandler)).
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersions).HandlerFunc(httpTraceHdrs(server.DeleteVersionsHandler)).
|
||||||
Queries(restQueries(storageRESTVolume, storageRESTTotalVersions)...)
|
Queries(restQueries(storageRESTVolume, storageRESTTotalVersions)...)
|
||||||
|
|
|
@ -192,19 +192,6 @@ func (p *xlStorageDiskIDCheck) DeleteVol(ctx context.Context, volume string, for
|
||||||
return p.storage.DeleteVol(ctx, volume, forceDelete)
|
return p.storage.DeleteVol(ctx, volume, forceDelete)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *xlStorageDiskIDCheck) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil, ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := p.checkDiskStale(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return p.storage.WalkVersions(ctx, volume, dirPath, marker, recursive, endWalkCh)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *xlStorageDiskIDCheck) ListDir(ctx context.Context, volume, dirPath string, count int) ([]string, error) {
|
func (p *xlStorageDiskIDCheck) ListDir(ctx context.Context, volume, dirPath string, count int) ([]string, error) {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
|
@ -791,92 +791,6 @@ func (s *xlStorage) isLeafDir(volume, leafPath string) bool {
|
||||||
return isDirEmpty(pathJoin(volumeDir, leafPath))
|
return isDirEmpty(pathJoin(volumeDir, leafPath))
|
||||||
}
|
}
|
||||||
|
|
||||||
// WalkVersions - is a sorted walker which returns file entries in lexically sorted order,
|
|
||||||
// additionally along with metadata version info about each of those entries.
|
|
||||||
func (s *xlStorage) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (ch chan FileInfoVersions, err error) {
|
|
||||||
atomic.AddInt32(&s.activeIOCount, 1)
|
|
||||||
defer func() {
|
|
||||||
atomic.AddInt32(&s.activeIOCount, -1)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Verify if volume is valid and it exists.
|
|
||||||
volumeDir, err := s.getVolDir(volume)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stat a volume entry.
|
|
||||||
_, err = os.Lstat(volumeDir)
|
|
||||||
if err != nil {
|
|
||||||
if osIsNotExist(err) {
|
|
||||||
return nil, errVolumeNotFound
|
|
||||||
} else if isSysErrIO(err) {
|
|
||||||
return nil, errFaultyDisk
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fast exit track to check if we are listing an object with
|
|
||||||
// a trailing slash, this will avoid to list the object content.
|
|
||||||
if HasSuffix(dirPath, SlashSeparator) {
|
|
||||||
if st, err := os.Lstat(pathJoin(volumeDir, dirPath, xlStorageFormatFile)); err == nil && st.Mode().IsRegular() {
|
|
||||||
return nil, errFileNotFound
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ch = make(chan FileInfoVersions)
|
|
||||||
go func() {
|
|
||||||
defer close(ch)
|
|
||||||
listDir := func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string, delayIsLeaf bool) {
|
|
||||||
entries, err := s.ListDir(ctx, volume, dirPath, -1)
|
|
||||||
if err != nil {
|
|
||||||
return false, nil, false
|
|
||||||
}
|
|
||||||
if len(entries) == 0 {
|
|
||||||
return true, nil, false
|
|
||||||
}
|
|
||||||
entries, delayIsLeaf = filterListEntries(volume, dirPath, entries, dirEntry, s.isLeaf)
|
|
||||||
return false, entries, delayIsLeaf
|
|
||||||
}
|
|
||||||
|
|
||||||
walkResultCh := startTreeWalk(GlobalContext, volume, dirPath, marker, recursive, listDir, s.isLeaf, s.isLeafDir, endWalkCh)
|
|
||||||
for walkResult := range walkResultCh {
|
|
||||||
var fiv FileInfoVersions
|
|
||||||
if HasSuffix(walkResult.entry, SlashSeparator) {
|
|
||||||
fiv = FileInfoVersions{
|
|
||||||
Volume: volume,
|
|
||||||
Name: walkResult.entry,
|
|
||||||
IsEmptyDir: walkResult.isEmptyDir,
|
|
||||||
Versions: []FileInfo{
|
|
||||||
{
|
|
||||||
Volume: volume,
|
|
||||||
Name: walkResult.entry,
|
|
||||||
Mode: uint32(os.ModeDir),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
xlMetaBuf, err := xioutil.ReadFile(pathJoin(volumeDir, walkResult.entry, xlStorageFormatFile))
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
fiv, err = getFileInfoVersions(xlMetaBuf, volume, walkResult.entry)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case ch <- fiv:
|
|
||||||
case <-endWalkCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return ch, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListDir - return all the entries at the given directory path.
|
// ListDir - return all the entries at the given directory path.
|
||||||
// If an entry is a directory it will be returned with a trailing SlashSeparator.
|
// If an entry is a directory it will be returned with a trailing SlashSeparator.
|
||||||
func (s *xlStorage) ListDir(ctx context.Context, volume, dirPath string, count int) (entries []string, err error) {
|
func (s *xlStorage) ListDir(ctx context.Context, volume, dirPath string, count int) (entries []string, err error) {
|
||||||
|
|
Loading…
Reference in New Issue