only look for metadata if batch replication asks for metadata filters (#18082)

This PR changes the StatObject() to be must have for non-minio source
to being a conditional API call.

- Calls StatObject() when needed
- Calls GetObjectTagging() when needed

These calls if we do without these conditionals can cause a lot of
delays, so we avoid them if not needed in more common scenario.
This commit is contained in:
Harshavardhana 2023-09-22 11:11:50 -07:00
parent 4eeb48f8e0
commit 3a90fb108c

View File

@ -37,6 +37,7 @@ import (
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
"github.com/lithammer/shortuuid/v4" "github.com/lithammer/shortuuid/v4"
"github.com/minio/madmin-go/v3" "github.com/minio/madmin-go/v3"
"github.com/minio/minio-go/v7"
miniogo "github.com/minio/minio-go/v7" miniogo "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/encrypt" "github.com/minio/minio-go/v7/pkg/encrypt"
@ -269,6 +270,10 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay
} }
rnd := rand.New(rand.NewSource(time.Now().UnixNano())) rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
isTags := len(r.Flags.Filter.Tags) != 0
isMetadata := len(r.Flags.Filter.Metadata) != 0
isStorageClassOnly := len(r.Flags.Filter.Metadata) == 1 && strings.EqualFold(r.Flags.Filter.Metadata[0].Key, xhttp.AmzStorageClass)
skip := func(oi ObjectInfo) (ok bool) { skip := func(oi ObjectInfo) (ok bool) {
if r.Flags.Filter.OlderThan > 0 && time.Since(oi.ModTime) < r.Flags.Filter.OlderThan { if r.Flags.Filter.OlderThan > 0 && time.Since(oi.ModTime) < r.Flags.Filter.OlderThan {
// skip all objects that are newer than specified older duration // skip all objects that are newer than specified older duration
@ -289,7 +294,8 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay
// skip all objects that are created after the specified time. // skip all objects that are created after the specified time.
return true return true
} }
if len(r.Flags.Filter.Tags) > 0 {
if isTags {
// Only parse object tags if tags filter is specified. // Only parse object tags if tags filter is specified.
tagMap := map[string]string{} tagMap := map[string]string{}
tagStr := oi.UserTags tagStr := oi.UserTags
@ -312,23 +318,19 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay
return false return false
} }
if len(r.Flags.Filter.Metadata) > 0 { for _, kv := range r.Flags.Filter.Metadata {
for _, kv := range r.Flags.Filter.Metadata { for k, v := range oi.UserDefined {
for k, v := range oi.UserDefined { if !stringsHasPrefixFold(k, "x-amz-meta-") && !isStandardHeader(k) {
if !stringsHasPrefixFold(k, "x-amz-meta-") && !isStandardHeader(k) { continue
continue }
} // We only need to match x-amz-meta or standardHeaders
// We only need to match x-amz-meta or standardHeaders if kv.Match(BatchJobKV{Key: k, Value: v}) {
if kv.Match(BatchJobKV{Key: k, Value: v}) { return true
return true
}
} }
} }
// None of the provided metadata filters match skip the object.
return false
} }
// None of the provided filters match
return false return false
} }
@ -383,17 +385,32 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay
for obj := range objInfoCh { for obj := range objInfoCh {
oi := toObjectInfo(r.Source.Bucket, obj.Key, obj) oi := toObjectInfo(r.Source.Bucket, obj.Key, obj)
if !minioSrc { if !minioSrc {
oi2, err := c.StatObject(ctx, r.Source.Bucket, obj.Key, miniogo.StatObjectOptions{}) // Check if metadata filter was requested and it is expected to have
if err == nil { // all user metadata or just storageClass. If its only storageClass
oi = toObjectInfo(r.Source.Bucket, obj.Key, oi2) // List() already returns relevant information for filter to be applied.
} else { if isMetadata && !isStorageClassOnly {
if isErrMethodNotAllowed(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) || oi2, err := c.StatObject(ctx, r.Source.Bucket, obj.Key, miniogo.StatObjectOptions{})
isErrObjectNotFound(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) { if err == nil {
oi = toObjectInfo(r.Source.Bucket, obj.Key, oi2)
} else {
if !isErrMethodNotAllowed(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) &&
!isErrObjectNotFound(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) {
logger.LogIf(ctx, err)
}
continue
}
}
if isTags {
tags, err := c.GetObjectTagging(ctx, r.Source.Bucket, obj.Key, minio.GetObjectTaggingOptions{})
if err == nil {
oi.UserTags = tags.String()
} else {
if !isErrMethodNotAllowed(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) &&
!isErrObjectNotFound(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) {
logger.LogIf(ctx, err)
}
continue continue
} }
logger.LogIf(ctx, err)
cancel()
return err
} }
} }
if skip(oi) { if skip(oi) {
@ -481,10 +498,12 @@ func toObjectInfo(bucket, object string, objInfo miniogo.ObjectInfo) ObjectInfo
ReplicationStatusInternal: objInfo.ReplicationStatus, ReplicationStatusInternal: objInfo.ReplicationStatus,
UserTags: tags.String(), UserTags: tags.String(),
} }
oi.UserDefined = make(map[string]string, len(objInfo.Metadata)) oi.UserDefined = make(map[string]string, len(objInfo.Metadata))
for k, v := range objInfo.Metadata { for k, v := range objInfo.Metadata {
oi.UserDefined[k] = v[0] oi.UserDefined[k] = v[0]
} }
ce, ok := oi.UserDefined[xhttp.ContentEncoding] ce, ok := oi.UserDefined[xhttp.ContentEncoding]
if !ok { if !ok {
ce, ok = oi.UserDefined[strings.ToLower(xhttp.ContentEncoding)] ce, ok = oi.UserDefined[strings.ToLower(xhttp.ContentEncoding)]
@ -492,6 +511,12 @@ func toObjectInfo(bucket, object string, objInfo miniogo.ObjectInfo) ObjectInfo
if ok { if ok {
oi.ContentEncoding = ce oi.ContentEncoding = ce
} }
_, ok = oi.UserDefined[xhttp.AmzStorageClass]
if !ok {
oi.UserDefined[xhttp.AmzStorageClass] = objInfo.StorageClass
}
return oi return oi
} }
@ -821,7 +846,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
} }
rnd := rand.New(rand.NewSource(time.Now().UnixNano())) rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
skip := func(info FileInfo) (ok bool) { selectObj := func(info FileInfo) (ok bool) {
if r.Flags.Filter.OlderThan > 0 && time.Since(info.ModTime) < r.Flags.Filter.OlderThan { if r.Flags.Filter.OlderThan > 0 && time.Since(info.ModTime) < r.Flags.Filter.OlderThan {
// skip all objects that are newer than specified older duration // skip all objects that are newer than specified older duration
return false return false
@ -931,7 +956,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
results := make(chan ObjectInfo, 100) results := make(chan ObjectInfo, 100)
if err := api.Walk(ctx, r.Source.Bucket, r.Source.Prefix, results, ObjectOptions{ if err := api.Walk(ctx, r.Source.Bucket, r.Source.Prefix, results, ObjectOptions{
WalkMarker: lastObject, WalkMarker: lastObject,
WalkFilter: skip, WalkFilter: selectObj,
}); err != nil { }); err != nil {
cancel() cancel()
// Do not need to retry if we can't list objects on source. // Do not need to retry if we can't list objects on source.