avoid Walk() API listing objects without quorum (#18535)

This allows batch replication to basically do not
attempt to copy objects that do not have read quorum.

This PR also allows walk() to provide custom
values for quorum under batch replication, and
key rotation.
This commit is contained in:
Harshavardhana 2023-11-27 17:20:04 -08:00 committed by GitHub
parent 8d9e83fd99
commit bd0819330d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 50 additions and 34 deletions

View File

@ -1074,6 +1074,11 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
return err return err
} }
walkQuorum := env.Get("_MINIO_BATCH_REPLICATION_WALK_QUORUM", "strict")
if walkQuorum == "" {
walkQuorum = "strict"
}
retryAttempts := ri.RetryAttempts retryAttempts := ri.RetryAttempts
retry := false retry := false
for attempts := 1; attempts <= retryAttempts; attempts++ { for attempts := 1; attempts <= retryAttempts; attempts++ {
@ -1083,9 +1088,11 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
// one of source/target is s3, skip delete marker and all versions under the same object name. // one of source/target is s3, skip delete marker and all versions under the same object name.
s3Type := r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3 s3Type := r.Target.Type == BatchJobReplicateResourceS3 || r.Source.Type == BatchJobReplicateResourceS3
if err := api.Walk(ctx, r.Source.Bucket, r.Source.Prefix, walkCh, ObjectOptions{ results := make(chan ObjectInfo, 100)
WalkMarker: lastObject, if err := api.Walk(ctx, r.Source.Bucket, r.Source.Prefix, results, WalkOptions{
WalkFilter: selectObj, Marker: lastObject,
Filter: selectObj,
AskDisks: walkQuorum,
}); err != nil { }); err != nil {
cancel() cancel()
// Do not need to retry if we can't list objects on source. // Do not need to retry if we can't list objects on source.
@ -1429,7 +1436,7 @@ func (a adminAPIHandlers) ListBatchJobs(w http.ResponseWriter, r *http.Request)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
if err := objectAPI.Walk(ctx, minioMetaBucket, batchJobPrefix, resultCh, ObjectOptions{}); err != nil { if err := objectAPI.Walk(ctx, minioMetaBucket, batchJobPrefix, resultCh, WalkOptions{}); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return return
} }
@ -1646,7 +1653,7 @@ func (j *BatchJobPool) resume() {
results := make(chan ObjectInfo, 100) results := make(chan ObjectInfo, 100)
ctx, cancel := context.WithCancel(j.ctx) ctx, cancel := context.WithCancel(j.ctx)
defer cancel() defer cancel()
if err := j.objLayer.Walk(ctx, minioMetaBucket, batchJobPrefix, results, ObjectOptions{}); err != nil { if err := j.objLayer.Walk(ctx, minioMetaBucket, batchJobPrefix, results, WalkOptions{}); err != nil {
logger.LogIf(j.ctx, err) logger.LogIf(j.ctx, err)
return return
} }

View File

@ -359,9 +359,9 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
results := make(chan ObjectInfo, 100) results := make(chan ObjectInfo, 100)
if err := api.Walk(ctx, r.Bucket, r.Prefix, results, ObjectOptions{ if err := api.Walk(ctx, r.Bucket, r.Prefix, results, WalkOptions{
WalkMarker: lastObject, Marker: lastObject,
WalkFilter: skip, Filter: skip,
}); err != nil { }); err != nil {
cancel() cancel()
// Do not need to retry if we can't list objects on source. // Do not need to retry if we can't list objects on source.

View File

@ -2579,7 +2579,7 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
// Walk through all object versions - Walk() is always in ascending order needed to ensure // Walk through all object versions - Walk() is always in ascending order needed to ensure
// delete marker replicated to target after object version is first created. // delete marker replicated to target after object version is first created.
if err := objectAPI.Walk(ctx, opts.bucket, "", objInfoCh, ObjectOptions{}); err != nil { if err := objectAPI.Walk(ctx, opts.bucket, "", objInfoCh, WalkOptions{}); err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return return
} }
@ -2952,7 +2952,7 @@ func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string,
} }
objInfoCh := make(chan ObjectInfo, 10) objInfoCh := make(chan ObjectInfo, 10)
if err := objAPI.Walk(ctx, bucket, opts.Prefix, objInfoCh, ObjectOptions{}); err != nil { if err := objAPI.Walk(ctx, bucket, opts.Prefix, objInfoCh, WalkOptions{}); err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return nil, err return nil, err
} }

View File

@ -1945,7 +1945,7 @@ func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts
// to allocate a receive channel for ObjectInfo, upon any unhandled // to allocate a receive channel for ObjectInfo, upon any unhandled
// error walker returns error. Optionally if context.Done() is received // error walker returns error. Optionally if context.Done() is received
// then Walk() stops the walker. // then Walk() stops the walker.
func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error { func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts WalkOptions) error {
if err := checkListObjsArgs(ctx, bucket, prefix, "", z); err != nil { if err := checkListObjsArgs(ctx, bucket, prefix, "", z); err != nil {
// Upon error close the channel. // Upon error close the channel.
close(results) close(results)
@ -1982,21 +1982,27 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
} }
} }
askDisks := getListQuorum(opts.WalkAskDisks, set.setDriveCount) askDisks := getListQuorum(opts.AskDisks, set.setDriveCount)
var fallbackDisks []StorageAPI if askDisks == -1 {
askDisks = getListQuorum("strict", set.setDriveCount)
}
// Special case: ask all disks if the drive count is 4 // Special case: ask all disks if the drive count is 4
if set.setDriveCount == 4 || askDisks > len(disks) { if set.setDriveCount == 4 || askDisks > len(disks) {
askDisks = len(disks) // use all available drives askDisks = len(disks) // use all available drives
} }
var fallbackDisks []StorageAPI
if askDisks > 0 && len(disks) > askDisks { if askDisks > 0 && len(disks) > askDisks {
rand.Shuffle(len(disks), func(i, j int) {
disks[i], disks[j] = disks[j], disks[i]
})
fallbackDisks = disks[askDisks:] fallbackDisks = disks[askDisks:]
disks = disks[:askDisks] disks = disks[:askDisks]
} }
requestedVersions := 0 requestedVersions := 0
if opts.WalkLatestOnly { if opts.LatestOnly {
requestedVersions = 1 requestedVersions = 1
} }
loadEntry := func(entry metaCacheEntry) { loadEntry := func(entry metaCacheEntry) {
@ -2004,14 +2010,14 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
return return
} }
if opts.WalkLatestOnly { if opts.LatestOnly {
fi, err := entry.fileInfo(bucket) fi, err := entry.fileInfo(bucket)
if err != nil { if err != nil {
cancel() cancel()
return return
} }
if opts.WalkFilter != nil { if opts.Filter != nil {
if opts.WalkFilter(fi) { if opts.Filter(fi) {
if !send(fi.ToObjectInfo(bucket, fi.Name, vcfg != nil && vcfg.Versioned(fi.Name))) { if !send(fi.ToObjectInfo(bucket, fi.Name, vcfg != nil && vcfg.Versioned(fi.Name))) {
return return
} }
@ -2032,8 +2038,8 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
versionsSorter(fivs.Versions).reverse() versionsSorter(fivs.Versions).reverse()
for _, version := range fivs.Versions { for _, version := range fivs.Versions {
if opts.WalkFilter != nil { if opts.Filter != nil {
if opts.WalkFilter(version) { if opts.Filter(version) {
if !send(version.ToObjectInfo(bucket, version.Name, vcfg != nil && vcfg.Versioned(version.Name))) { if !send(version.ToObjectInfo(bucket, version.Name, vcfg != nil && vcfg.Versioned(version.Name))) {
return return
} }
@ -2047,10 +2053,13 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
} }
} }
// However many we ask, versions must exist on ~50%
listingQuorum := (askDisks + 1) / 2
// How to resolve partial results. // How to resolve partial results.
resolver := metadataResolutionParams{ resolver := metadataResolutionParams{
dirQuorum: 1, dirQuorum: listingQuorum,
objQuorum: 1, objQuorum: listingQuorum,
bucket: bucket, bucket: bucket,
requestedVersions: requestedVersions, requestedVersions: requestedVersions,
} }
@ -2068,19 +2077,15 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
path: path, path: path,
filterPrefix: filterPrefix, filterPrefix: filterPrefix,
recursive: true, recursive: true,
forwardTo: opts.WalkMarker, forwardTo: opts.Marker,
minDisks: 1, minDisks: 1,
reportNotFound: false, reportNotFound: false,
agreed: loadEntry, agreed: loadEntry,
partial: func(entries metaCacheEntries, _ []error) { partial: func(entries metaCacheEntries, _ []error) {
entry, ok := entries.resolve(&resolver) entry, ok := entries.resolve(&resolver)
if !ok { if ok {
// check if we can get one entry atleast
// proceed to heal nonetheless.
entry, _ = entries.firstFound()
}
loadEntry(*entry) loadEntry(*entry)
}
}, },
finished: nil, finished: nil,
} }

View File

@ -580,7 +580,7 @@ func listIAMConfigItems(ctx context.Context, objAPI ObjectLayer, pathPrefix stri
// Allocate new results channel to receive ObjectInfo. // Allocate new results channel to receive ObjectInfo.
objInfoCh := make(chan ObjectInfo) objInfoCh := make(chan ObjectInfo)
if err := objAPI.Walk(ctx, minioMetaBucket, pathPrefix, objInfoCh, ObjectOptions{}); err != nil { if err := objAPI.Walk(ctx, minioMetaBucket, pathPrefix, objInfoCh, WalkOptions{}); err != nil {
select { select {
case ch <- itemOrErr{Err: err}: case ch <- itemOrErr{Err: err}:
case <-ctx.Done(): case <-ctx.Done():

View File

@ -95,10 +95,6 @@ type ObjectOptions struct {
// participating in a rebalance operation. Typically set for 'write' operations. // participating in a rebalance operation. Typically set for 'write' operations.
SkipRebalancing bool SkipRebalancing bool
WalkFilter func(info FileInfo) bool // return WalkFilter returns 'true/false'
WalkMarker string // set to skip until this object
WalkLatestOnly bool // returns only latest versions for all matching objects
WalkAskDisks string // dictates how many disks are being listed
PrefixEnabledFn func(prefix string) bool // function which returns true if versioning is enabled on prefix PrefixEnabledFn func(prefix string) bool // function which returns true if versioning is enabled on prefix
// IndexCB will return any index created but the compression. // IndexCB will return any index created but the compression.
@ -113,6 +109,14 @@ type ObjectOptions struct {
FastGetObjInfo bool // Only for S3 Head/Get Object calls for now FastGetObjInfo bool // Only for S3 Head/Get Object calls for now
} }
// WalkOptions provides filtering, marker and other Walk() specific options.
type WalkOptions struct {
Filter func(info FileInfo) bool // return WalkFilter returns 'true/false'
Marker string // set to skip until this object
LatestOnly bool // returns only latest versions for all matching objects
AskDisks string // dictates how many disks are being listed
}
// ExpirationOptions represents object options for object expiration at objectLayer. // ExpirationOptions represents object options for object expiration at objectLayer.
type ExpirationOptions struct { type ExpirationOptions struct {
Expire bool Expire bool
@ -224,7 +228,7 @@ type ObjectLayer interface {
ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error)
ListObjectVersions(ctx context.Context, bucket, prefix, marker, versionMarker, delimiter string, maxKeys int) (result ListObjectVersionsInfo, err error) ListObjectVersions(ctx context.Context, bucket, prefix, marker, versionMarker, delimiter string, maxKeys int) (result ListObjectVersionsInfo, err error)
// Walk lists all objects including versions, delete markers. // Walk lists all objects including versions, delete markers.
Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts WalkOptions) error
// Object operations. // Object operations.