allow specifying lower disks for Walk() (#17829)

useful when you may want Walk() with
reduced quorum requirements.
This commit is contained in:
Harshavardhana 2023-08-14 21:32:39 -07:00 committed by GitHub
parent 875f4076ec
commit 64aa7feabd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 75 additions and 34 deletions

View File

@ -1835,11 +1835,56 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
return
}
send := func(objInfo ObjectInfo) bool {
select {
case <-ctx.Done():
return false
case results <- objInfo:
return true
}
}
askDisks := getListQuorum(opts.WalkAskDisks, set.setDriveCount)
var fallbackDisks []StorageAPI
// Special case: ask all disks if the drive count is 4
if set.setDriveCount == 4 || askDisks > len(disks) {
askDisks = len(disks) // use all available drives
}
if askDisks > 0 && len(disks) > askDisks {
fallbackDisks = disks[askDisks:]
disks = disks[:askDisks]
}
requestedVersions := 0
if opts.WalkLatestOnly {
requestedVersions = 1
}
loadEntry := func(entry metaCacheEntry) {
if entry.isDir() {
return
}
if opts.WalkLatestOnly {
fi, err := entry.fileInfo(bucket)
if err != nil {
cancel()
return
}
if opts.WalkFilter != nil {
if opts.WalkFilter(fi) {
if !send(fi.ToObjectInfo(bucket, fi.Name, vcfg != nil && vcfg.Versioned(fi.Name))) {
return
}
}
} else {
if !send(fi.ToObjectInfo(bucket, fi.Name, vcfg != nil && vcfg.Versioned(fi.Name))) {
return
}
}
} else {
fivs, err := entry.fileInfoVersions(bucket)
if err != nil {
cancel()
@ -1849,22 +1894,17 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
versionsSorter(fivs.Versions).reverse()
for _, version := range fivs.Versions {
send := true
if opts.WalkFilter != nil && !opts.WalkFilter(version) {
send = false
}
if !send {
continue
}
versioned := vcfg != nil && vcfg.Versioned(version.Name)
objInfo := version.ToObjectInfo(bucket, version.Name, versioned)
select {
case <-ctx.Done():
if opts.WalkFilter != nil {
if opts.WalkFilter(version) {
if !send(version.ToObjectInfo(bucket, version.Name, vcfg != nil && vcfg.Versioned(version.Name))) {
return
case results <- objInfo:
}
}
} else {
if !send(version.ToObjectInfo(bucket, version.Name, vcfg != nil && vcfg.Versioned(version.Name))) {
return
}
}
}
}
}
@ -1874,6 +1914,7 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
dirQuorum: 1,
objQuorum: 1,
bucket: bucket,
requestedVersions: requestedVersions,
}
path := baseDirFromPrefix(prefix)
@ -1884,6 +1925,7 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
lopts := listPathRawOptions{
disks: disks,
fallbackDisks: fallbackDisks,
bucket: bucket,
path: path,
filterPrefix: filterPrefix,

View File

@ -594,14 +594,11 @@ func getListQuorum(quorum string, driveCount int) int {
return 1
case "reduced":
return 2
case "strict":
return driveCount
}
// Defaults to (driveCount+1)/2 drives per set, defaults to "optimal" value
if driveCount > 0 {
case "optimal":
return (driveCount + 1) / 2
} // "3" otherwise.
return 3
}
// defaults to 'strict'
return driveCount
}
// Will return io.EOF if continuing would not yield more results.

View File

@ -97,6 +97,8 @@ type ObjectOptions struct {
WalkFilter func(info FileInfo) bool // return WalkFilter returns 'true/false'
WalkMarker string // set to skip until this object
WalkLatestOnly bool // returns only latest versions for all matching objects
WalkAskDisks string // dictates how many disks are being listed
PrefixEnabledFn func(prefix string) bool // function which returns true if versioning is enabled on prefix
// IndexCB will return any index created but the compression.