diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index 39c525db4..b0e532d4d 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -37,6 +37,7 @@ import ( "github.com/dustin/go-humanize" "github.com/lithammer/shortuuid/v4" "github.com/minio/madmin-go/v3" + "github.com/minio/minio-go/v7" miniogo "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/encrypt" @@ -269,6 +270,10 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay } rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + isTags := len(r.Flags.Filter.Tags) != 0 + isMetadata := len(r.Flags.Filter.Metadata) != 0 + isStorageClassOnly := len(r.Flags.Filter.Metadata) == 1 && strings.EqualFold(r.Flags.Filter.Metadata[0].Key, xhttp.AmzStorageClass) + skip := func(oi ObjectInfo) (ok bool) { if r.Flags.Filter.OlderThan > 0 && time.Since(oi.ModTime) < r.Flags.Filter.OlderThan { // skip all objects that are newer than specified older duration @@ -289,7 +294,8 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay // skip all objects that are created after the specified time. return true } - if len(r.Flags.Filter.Tags) > 0 { + + if isTags { // Only parse object tags if tags filter is specified. tagMap := map[string]string{} tagStr := oi.UserTags @@ -312,23 +318,19 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay return false } - if len(r.Flags.Filter.Metadata) > 0 { - for _, kv := range r.Flags.Filter.Metadata { - for k, v := range oi.UserDefined { - if !stringsHasPrefixFold(k, "x-amz-meta-") && !isStandardHeader(k) { - continue - } - // We only need to match x-amz-meta or standardHeaders - if kv.Match(BatchJobKV{Key: k, Value: v}) { - return true - } + for _, kv := range r.Flags.Filter.Metadata { + for k, v := range oi.UserDefined { + if !stringsHasPrefixFold(k, "x-amz-meta-") && !isStandardHeader(k) { + continue + } + // We only need to match x-amz-meta or standardHeaders + if kv.Match(BatchJobKV{Key: k, Value: v}) { + return true } } - - // None of the provided metadata filters match skip the object. - return false } + // None of the provided filters match return false } @@ -383,17 +385,32 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay for obj := range objInfoCh { oi := toObjectInfo(r.Source.Bucket, obj.Key, obj) if !minioSrc { - oi2, err := c.StatObject(ctx, r.Source.Bucket, obj.Key, miniogo.StatObjectOptions{}) - if err == nil { - oi = toObjectInfo(r.Source.Bucket, obj.Key, oi2) - } else { - if isErrMethodNotAllowed(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) || - isErrObjectNotFound(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) { + // Check if metadata filter was requested and it is expected to have + // all user metadata or just storageClass. If its only storageClass + // List() already returns relevant information for filter to be applied. + if isMetadata && !isStorageClassOnly { + oi2, err := c.StatObject(ctx, r.Source.Bucket, obj.Key, miniogo.StatObjectOptions{}) + if err == nil { + oi = toObjectInfo(r.Source.Bucket, obj.Key, oi2) + } else { + if !isErrMethodNotAllowed(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) && + !isErrObjectNotFound(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) { + logger.LogIf(ctx, err) + } + continue + } + } + if isTags { + tags, err := c.GetObjectTagging(ctx, r.Source.Bucket, obj.Key, minio.GetObjectTaggingOptions{}) + if err == nil { + oi.UserTags = tags.String() + } else { + if !isErrMethodNotAllowed(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) && + !isErrObjectNotFound(ErrorRespToObjectError(err, r.Source.Bucket, obj.Key)) { + logger.LogIf(ctx, err) + } continue } - logger.LogIf(ctx, err) - cancel() - return err } } if skip(oi) { @@ -481,10 +498,12 @@ func toObjectInfo(bucket, object string, objInfo miniogo.ObjectInfo) ObjectInfo ReplicationStatusInternal: objInfo.ReplicationStatus, UserTags: tags.String(), } + oi.UserDefined = make(map[string]string, len(objInfo.Metadata)) for k, v := range objInfo.Metadata { oi.UserDefined[k] = v[0] } + ce, ok := oi.UserDefined[xhttp.ContentEncoding] if !ok { ce, ok = oi.UserDefined[strings.ToLower(xhttp.ContentEncoding)] @@ -492,6 +511,12 @@ func toObjectInfo(bucket, object string, objInfo miniogo.ObjectInfo) ObjectInfo if ok { oi.ContentEncoding = ce } + + _, ok = oi.UserDefined[xhttp.AmzStorageClass] + if !ok { + oi.UserDefined[xhttp.AmzStorageClass] = objInfo.StorageClass + } + return oi } @@ -821,7 +846,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba } rnd := rand.New(rand.NewSource(time.Now().UnixNano())) - skip := func(info FileInfo) (ok bool) { + selectObj := func(info FileInfo) (ok bool) { if r.Flags.Filter.OlderThan > 0 && time.Since(info.ModTime) < r.Flags.Filter.OlderThan { // skip all objects that are newer than specified older duration return false @@ -931,7 +956,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba results := make(chan ObjectInfo, 100) if err := api.Walk(ctx, r.Source.Bucket, r.Source.Prefix, results, ObjectOptions{ WalkMarker: lastObject, - WalkFilter: skip, + WalkFilter: selectObj, }); err != nil { cancel() // Do not need to retry if we can't list objects on source.