Fix recursive deep scan of buckets (#8900)

This commit is contained in:
Klaus Post 2020-01-30 12:50:07 +01:00 committed by GitHub
parent 881e983ed9
commit 9990464cd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 17 additions and 16 deletions

View File

@ -355,7 +355,7 @@ func listObjects(ctx context.Context, obj ObjectLayer, bucket, prefix, marker, d
recursive = false
}
walkResultCh, endWalkCh := tpool.Release(listParams{bucket, recursive, marker, prefix, false})
walkResultCh, endWalkCh := tpool.Release(listParams{bucket, recursive, marker, prefix})
if walkResultCh == nil {
endWalkCh = make(chan struct{})
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, endWalkCh)
@ -416,7 +416,7 @@ func listObjects(ctx context.Context, obj ObjectLayer, bucket, prefix, marker, d
}
// Save list routine for the next marker if we haven't reached EOF.
params := listParams{bucket, recursive, nextMarker, prefix, false}
params := listParams{bucket, recursive, nextMarker, prefix}
if !eof {
tpool.Set(params, walkResultCh, endWalkCh)
}

View File

@ -99,7 +99,7 @@ type ObjectLayer interface {
HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error)
HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error)
HealObject(ctx context.Context, bucket, object string, dryRun, remove bool, scanMode madmin.HealScanMode) (madmin.HealResultItem, error)
HealObjects(ctx context.Context, bucket, prefix string, healObject healObjectFn) error
HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) error
ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error)

View File

@ -34,7 +34,6 @@ type listParams struct {
recursive bool
marker string
prefix string
heal bool
}
// errWalkAbort - returned by doTreeWalk() if it returns prematurely.

View File

@ -857,7 +857,8 @@ func leastEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool)
}
// mergeEntriesCh - merges FileInfo channel to entries upto maxKeys.
func mergeEntriesCh(entryChs []FileInfoCh, maxKeys int, totalDrives int, heal bool) (entries FilesInfo) {
// If partialQuorumOnly is set only objects that does not have full quorum is returned.
func mergeEntriesCh(entryChs []FileInfoCh, maxKeys int, totalDrives int, partialQuorumOnly bool) (entries FilesInfo) {
var i = 0
entriesInfos := make([]FileInfo, len(entryChs))
entriesValid := make([]bool, len(entryChs))
@ -875,7 +876,7 @@ func mergeEntriesCh(entryChs []FileInfoCh, maxKeys int, totalDrives int, heal bo
rquorum = totalDrives / 2
}
if heal {
if partialQuorumOnly {
// When healing is enabled, we should
// list only objects which need healing.
if quorumCount == totalDrives {
@ -1045,10 +1046,11 @@ func (s *xlSets) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker
return result, nil
}
// ListObjects - implements listing of objects across disks, each disk is indepenently
// ListObjects - implements listing of objects across disks, each disk is independently
// walked and merged at this layer. Resulting value through the merge process sends
// the data in lexically sorted order.
func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, heal bool) (loi ListObjectsInfo, err error) {
// If partialQuorumOnly is set only objects that does not have full quorum is returned.
func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, partialQuorumOnly bool) (loi ListObjectsInfo, err error) {
if err = checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, s); err != nil {
return loi, err
}
@ -1091,13 +1093,13 @@ func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimi
recursive = false
}
entryChs, endWalkCh := s.pool.Release(listParams{bucket, recursive, marker, prefix, heal})
entryChs, endWalkCh := s.pool.Release(listParams{bucket: bucket, recursive: recursive, marker: marker, prefix: prefix})
if entryChs == nil {
endWalkCh = make(chan struct{})
entryChs = s.startMergeWalks(context.Background(), bucket, prefix, marker, recursive, endWalkCh)
}
entries := mergeEntriesCh(entryChs, maxKeys, s.drivesPerSet, heal)
entries := mergeEntriesCh(entryChs, maxKeys, s.drivesPerSet, partialQuorumOnly)
if len(entries.Files) == 0 {
return loi, nil
}
@ -1151,7 +1153,7 @@ func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimi
loi.Objects = append(loi.Objects, objInfo)
}
if loi.IsTruncated {
s.pool.Set(listParams{bucket, recursive, loi.NextMarker, prefix, heal}, entryChs, endWalkCh)
s.pool.Set(listParams{bucket, recursive, loi.NextMarker, prefix}, entryChs, endWalkCh)
}
return loi, nil
}

View File

@ -24,6 +24,6 @@ func (xl xlObjects) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) {
}
// This is not implemented/needed anymore, look for xl-sets.HealObjects()
func (xl xlObjects) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) error {
func (xl xlObjects) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) (e error) {
return nil
}

View File

@ -67,7 +67,7 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del
recursive = false
}
walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix, false})
walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix})
if walkResultCh == nil {
endWalkCh = make(chan struct{})
listDir := listDirFactory(ctx, xl.getLoadBalancedDisks()...)
@ -118,7 +118,7 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del
}
}
params := listParams{bucket, recursive, nextMarker, prefix, false}
params := listParams{bucket, recursive, nextMarker, prefix}
if !eof {
xl.listPool.Set(params, walkResultCh, endWalkCh)
}

View File

@ -698,7 +698,7 @@ func (z *xlZones) listObjects(ctx context.Context, bucket, prefix, marker, delim
var zonesEndWalkCh []chan struct{}
for _, zone := range z.zones {
entryChs, endWalkCh := zone.pool.Release(listParams{bucket, recursive, marker, prefix, heal})
entryChs, endWalkCh := zone.pool.Release(listParams{bucket, recursive, marker, prefix})
if entryChs == nil {
endWalkCh = make(chan struct{})
entryChs = zone.startMergeWalks(ctx, bucket, prefix, marker, recursive, endWalkCh)
@ -767,7 +767,7 @@ func (z *xlZones) listObjects(ctx context.Context, bucket, prefix, marker, delim
}
if loi.IsTruncated {
for i, zone := range z.zones {
zone.pool.Set(listParams{bucket, recursive, loi.NextMarker, prefix, heal}, zonesEntryChs[i],
zone.pool.Set(listParams{bucket, recursive, loi.NextMarker, prefix}, zonesEntryChs[i],
zonesEndWalkCh[i])
}
}