fix: lookup metdata case insensitively (#11487)

while setting replication options
This commit is contained in:
Poorna Krishnamoorthy 2021-02-08 16:19:05 -08:00 committed by GitHub
parent 9b10118d34
commit f9c5636c2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 24 deletions

View File

@ -245,16 +245,11 @@ func transitionSCInUse(ctx context.Context, lfc *lifecycle.Lifecycle, bucket, ar
} }
// set PutObjectOptions for PUT operation to transition data to target cluster // set PutObjectOptions for PUT operation to transition data to target cluster
func putTransitionOpts(objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions) { func putTransitionOpts(objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions, err error) {
meta := make(map[string]string) meta := make(map[string]string)
tag, err := tags.ParseObjectTags(objInfo.UserTags)
if err != nil {
return
}
putOpts = miniogo.PutObjectOptions{ putOpts = miniogo.PutObjectOptions{
UserMetadata: meta, UserMetadata: meta,
UserTags: tag.ToMap(),
ContentType: objInfo.ContentType, ContentType: objInfo.ContentType,
ContentEncoding: objInfo.ContentEncoding, ContentEncoding: objInfo.ContentEncoding,
StorageClass: objInfo.StorageClass, StorageClass: objInfo.StorageClass,
@ -264,22 +259,30 @@ func putTransitionOpts(objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions) {
SourceETag: objInfo.ETag, SourceETag: objInfo.ETag,
}, },
} }
if mode, ok := objInfo.UserDefined[xhttp.AmzObjectLockMode]; ok { if objInfo.UserTags != "" {
tag, err := tags.ParseObjectTags(objInfo.UserTags)
if err != nil {
return miniogo.PutObjectOptions{}, err
}
putOpts.UserTags = tag.ToMap()
}
if mode, ok := getMapValue(objInfo.UserDefined, xhttp.AmzObjectLockMode); ok {
rmode := miniogo.RetentionMode(mode) rmode := miniogo.RetentionMode(mode)
putOpts.Mode = rmode putOpts.Mode = rmode
} }
if retainDateStr, ok := objInfo.UserDefined[xhttp.AmzObjectLockRetainUntilDate]; ok { if retainDateStr, ok := getMapValue(objInfo.UserDefined, xhttp.AmzObjectLockRetainUntilDate); ok {
rdate, err := time.Parse(time.RFC3339, retainDateStr) rdate, err := time.Parse(time.RFC3339, retainDateStr)
if err != nil { if err != nil {
return return miniogo.PutObjectOptions{}, err
} }
putOpts.RetainUntilDate = rdate putOpts.RetainUntilDate = rdate
} }
if lhold, ok := objInfo.UserDefined[xhttp.AmzObjectLockLegalHold]; ok { if lhold, ok := getMapValue(objInfo.UserDefined, xhttp.AmzObjectLockLegalHold); ok {
putOpts.LegalHold = miniogo.LegalHoldStatus(lhold) putOpts.LegalHold = miniogo.LegalHoldStatus(lhold)
} }
return return putOpts, nil
} }
// handle deletes of transitioned objects or object versions when one of the following is true: // handle deletes of transitioned objects or object versions when one of the following is true:
@ -380,7 +383,12 @@ func transitionObject(ctx context.Context, objectAPI ObjectLayer, objInfo Object
return nil return nil
} }
putOpts := putTransitionOpts(oi) putOpts, err := putTransitionOpts(oi)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to transition object %s/%s(%s): %w", oi.Bucket, oi.Name, oi.VersionID, err))
return err
}
if _, err = tgt.PutObject(ctx, arn.Bucket, oi.Name, gr, oi.Size, putOpts); err != nil { if _, err = tgt.PutObject(ctx, arn.Bucket, oi.Name, gr, oi.Size, putOpts); err != nil {
gr.Close() gr.Close()
return err return err

View File

@ -367,7 +367,19 @@ func getCopyObjMetadata(oi ObjectInfo, dest replication.Destination) map[string]
return meta return meta
} }
func putReplicationOpts(ctx context.Context, dest replication.Destination, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions) { // lookup map entry case insensitively.
func getMapValue(m map[string]string, key string) (string, bool) {
if v, ok := m[key]; ok {
return v, ok
}
if v, ok := m[strings.ToLower(key)]; ok {
return v, ok
}
v, ok := m[http.CanonicalHeaderKey(key)]
return v, ok
}
func putReplicationOpts(ctx context.Context, dest replication.Destination, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions, err error) {
meta := make(map[string]string) meta := make(map[string]string)
for k, v := range objInfo.UserDefined { for k, v := range objInfo.UserDefined {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) { if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
@ -401,33 +413,32 @@ func putReplicationOpts(ctx context.Context, dest replication.Destination, objIn
putOpts.UserTags = tag.ToMap() putOpts.UserTags = tag.ToMap()
} }
} }
if lang, ok := objInfo.UserDefined[xhttp.ContentLanguage]; ok { if lang, ok := getMapValue(objInfo.UserDefined, xhttp.ContentLanguage); ok {
putOpts.ContentLanguage = lang putOpts.ContentLanguage = lang
} }
if disp, ok := objInfo.UserDefined[xhttp.ContentDisposition]; ok { if disp, ok := getMapValue(objInfo.UserDefined, xhttp.ContentDisposition); ok {
putOpts.ContentDisposition = disp putOpts.ContentDisposition = disp
} }
if cc, ok := objInfo.UserDefined[xhttp.CacheControl]; ok { if cc, ok := getMapValue(objInfo.UserDefined, xhttp.CacheControl); ok {
putOpts.CacheControl = cc putOpts.CacheControl = cc
} }
if mode, ok := objInfo.UserDefined[xhttp.AmzObjectLockMode]; ok { if mode, ok := getMapValue(objInfo.UserDefined, xhttp.AmzObjectLockMode); ok {
rmode := miniogo.RetentionMode(mode) rmode := miniogo.RetentionMode(mode)
putOpts.Mode = rmode putOpts.Mode = rmode
} }
if retainDateStr, ok := objInfo.UserDefined[xhttp.AmzObjectLockRetainUntilDate]; ok { if retainDateStr, ok := getMapValue(objInfo.UserDefined, xhttp.AmzObjectLockRetainUntilDate); ok {
rdate, err := time.Parse(time.RFC3339Nano, retainDateStr) rdate, err := time.Parse(time.RFC3339Nano, retainDateStr)
if err != nil { if err != nil {
return return miniogo.PutObjectOptions{}, err
} }
putOpts.RetainUntilDate = rdate putOpts.RetainUntilDate = rdate
} }
if lhold, ok := objInfo.UserDefined[xhttp.AmzObjectLockLegalHold]; ok { if lhold, ok := getMapValue(objInfo.UserDefined, xhttp.AmzObjectLockLegalHold); ok {
putOpts.LegalHold = miniogo.LegalHoldStatus(lhold) putOpts.LegalHold = miniogo.LegalHoldStatus(lhold)
} }
if crypto.S3.IsEncrypted(objInfo.UserDefined) { if crypto.S3.IsEncrypted(objInfo.UserDefined) {
putOpts.ServerSideEncryption = encrypt.NewSSE() putOpts.ServerSideEncryption = encrypt.NewSSE()
} }
return return
} }
@ -627,7 +638,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
c := &miniogo.Core{Client: tgt.Client} c := &miniogo.Core{Client: tgt.Client}
if _, err = c.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), dstOpts); err != nil { if _, err = c.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), dstOpts); err != nil {
replicationStatus = replication.Failed replicationStatus = replication.Failed
logger.LogIf(ctx, fmt.Errorf("Unable to replicate metadata for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err)) logger.LogIf(ctx, fmt.Errorf("Unable to replicate metadata for object %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
} }
} else { } else {
target, err := globalBucketMetadataSys.GetBucketTarget(bucket, cfg.RoleArn) target, err := globalBucketMetadataSys.GetBucketTarget(bucket, cfg.RoleArn)
@ -642,7 +653,11 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
return return
} }
putOpts := putReplicationOpts(ctx, dest, objInfo) putOpts, err := putReplicationOpts(ctx, dest, objInfo)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to replicate object %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
return
}
// Setup bandwidth throttling // Setup bandwidth throttling
peers, _ := globalEndpoints.peers() peers, _ := globalEndpoints.peers()
totalNodesCount := len(peers) totalNodesCount := len(peers)

View File

@ -179,7 +179,7 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
} }
if objInfo.TransitionStatus == lifecycle.TransitionComplete { if objInfo.TransitionStatus == lifecycle.TransitionComplete {
// If transitioned, stream from transition tier unless object is restored locally or restore date is past. // If transitioned, stream from transition tier unless object is restored locally or restore date is past.
restoreHdr, ok := objInfo.UserDefined[xhttp.AmzRestore] restoreHdr, ok := getMapValue(objInfo.UserDefined, xhttp.AmzRestore)
if !ok || !strings.HasPrefix(restoreHdr, "ongoing-request=false") || (!objInfo.RestoreExpires.IsZero() && time.Now().After(objInfo.RestoreExpires)) { if !ok || !strings.HasPrefix(restoreHdr, "ongoing-request=false") || (!objInfo.RestoreExpires.IsZero() && time.Now().After(objInfo.RestoreExpires)) {
return getTransitionedObjectReader(ctx, bucket, object, rs, h, objInfo, opts) return getTransitionedObjectReader(ctx, bucket, object, rs, h, objInfo, opts)
} }