metacache: Make very small requests transient (#11109)

This commit is contained in:
Klaus Post 2020-12-15 11:25:36 -08:00 committed by GitHub
parent 5df61ab96b
commit e7d3b49a20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 25 additions and 7 deletions

View File

@ -707,7 +707,7 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre
// more objects matching the prefix.
ri := logger.GetReqInfo(ctx)
if ri != nil && strings.Contains(ri.UserAgent, `1.0 Veeam/1.0 Backup`) && strings.HasSuffix(prefix, ".blk") {
opts.singleObject = true
opts.discardResult = true
opts.Transient = true
}

View File

@ -84,11 +84,21 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e
o.ID = mustGetUUID()
}
o.BaseDir = baseDirFromPrefix(o.Prefix)
if o.singleObject {
if o.discardResult {
// Override for single object.
o.BaseDir = o.Prefix
}
// For very small recursive listings, don't same cache.
// Attempts to avoid expensive listings to run for a long
// while when clients aren't interested in results.
// If the client DOES resume the listing a full cache
// will be generated due to the marker without ID and this check failing.
if o.Limit < 10 && o.Marker == "" && o.Create && o.Recursive {
o.discardResult = true
o.Transient = true
}
var cache metacache
// If we don't have a list id we must ask the server if it has a cache or create a new.
if o.Create {
@ -206,7 +216,9 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e
}
truncated := entries.len() > o.Limit || !allAtEOF
entries.truncate(o.Limit)
if !o.discardResult {
entries.listID = o.ID
}
if !truncated {
return entries, io.EOF
}

View File

@ -95,8 +95,9 @@ type listPathOptions struct {
// A transient result will never be returned from the cache so knowing the list id is required.
Transient bool
// singleObject will assume that prefix refers to an exact single object.
singleObject bool
// discardResult will not persist the cache to storage.
// When the initial results are returned listing will be canceled.
discardResult bool
}
func init() {
@ -545,7 +546,7 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
o.debugf("listPath with options: %#v\n", o)
// See if we have the listing stored.
if !o.Create && !o.singleObject {
if !o.Create && !o.discardResult {
entries, err := er.streamMetadataParts(ctx, o)
if IsErr(err, []error{
nil,
@ -627,6 +628,11 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
close(filterCh)
}
// Cancel listing on return if non-saved list.
if o.discardResult {
defer cancel()
}
go func() {
defer cancel()
// Save continuous updates
@ -658,7 +664,7 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
// Write results to disk.
bw := newMetacacheBlockWriter(cacheCh, func(b *metacacheBlock) error {
if o.singleObject {
if o.discardResult {
// Don't save single object listings.
return nil
}