mirror of
https://github.com/minio/minio.git
synced 2025-11-07 12:52:58 -05:00
fix: multipart replication with single part objects (#20895)
x-amz-checksum-algorithm is not set, causing all multipart single-part objects to fail to replicate going via sftp/FTP uploads.
This commit is contained in:
@@ -773,7 +773,7 @@ func (m caseInsensitiveMap) Lookup(key string) (string, bool) {
|
||||
return "", false
|
||||
}
|
||||
|
||||
func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, partNum int) (putOpts minio.PutObjectOptions, err error) {
|
||||
func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (putOpts minio.PutObjectOptions, isMP bool, err error) {
|
||||
meta := make(map[string]string)
|
||||
isSSEC := crypto.SSEC.IsEncrypted(objInfo.UserDefined)
|
||||
|
||||
@@ -794,23 +794,22 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, part
|
||||
meta[k] = v
|
||||
}
|
||||
}
|
||||
isMP = objInfo.isMultipart()
|
||||
if len(objInfo.Checksum) > 0 {
|
||||
// Add encrypted CRC to metadata for SSE-C objects.
|
||||
if isSSEC {
|
||||
meta[ReplicationSsecChecksumHeader] = base64.StdEncoding.EncodeToString(objInfo.Checksum)
|
||||
} else {
|
||||
if objInfo.isMultipart() && partNum > 0 {
|
||||
for _, pi := range objInfo.Parts {
|
||||
if pi.Number == partNum {
|
||||
for k, v := range pi.Checksums { // for PutObjectPart
|
||||
meta[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for k, v := range getCRCMeta(objInfo, 0, nil) { // for PutObject/NewMultipartUpload
|
||||
meta[k] = v
|
||||
}
|
||||
cs, mp := getCRCMeta(objInfo, 0, nil)
|
||||
// Set object checksum.
|
||||
for k, v := range cs {
|
||||
meta[k] = v
|
||||
}
|
||||
isMP = mp
|
||||
if !objInfo.isMultipart() && cs[xhttp.AmzChecksumType] == xhttp.AmzChecksumTypeFullObject {
|
||||
// For objects where checksum is full object, it will be the same.
|
||||
// Therefore, we use the cheaper PutObject replication.
|
||||
isMP = false
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -841,7 +840,7 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, part
|
||||
if tagTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+TaggingTimestamp]; ok {
|
||||
tagTimestamp, err = time.Parse(time.RFC3339Nano, tagTmstampStr)
|
||||
if err != nil {
|
||||
return putOpts, err
|
||||
return putOpts, false, err
|
||||
}
|
||||
}
|
||||
putOpts.Internal.TaggingTimestamp = tagTimestamp
|
||||
@@ -865,7 +864,7 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, part
|
||||
if retainDateStr, ok := lkMap.Lookup(xhttp.AmzObjectLockRetainUntilDate); ok {
|
||||
rdate, err := amztime.ISO8601Parse(retainDateStr)
|
||||
if err != nil {
|
||||
return putOpts, err
|
||||
return putOpts, false, err
|
||||
}
|
||||
putOpts.RetainUntilDate = rdate
|
||||
// set retention timestamp in opts
|
||||
@@ -873,7 +872,7 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, part
|
||||
if retainTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp]; ok {
|
||||
retTimestamp, err = time.Parse(time.RFC3339Nano, retainTmstampStr)
|
||||
if err != nil {
|
||||
return putOpts, err
|
||||
return putOpts, false, err
|
||||
}
|
||||
}
|
||||
putOpts.Internal.RetentionTimestamp = retTimestamp
|
||||
@@ -885,7 +884,7 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, part
|
||||
if lholdTmstampStr, ok := objInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockLegalHoldTimestamp]; ok {
|
||||
lholdTimestamp, err = time.Parse(time.RFC3339Nano, lholdTmstampStr)
|
||||
if err != nil {
|
||||
return putOpts, err
|
||||
return putOpts, false, err
|
||||
}
|
||||
}
|
||||
putOpts.Internal.LegalholdTimestamp = lholdTimestamp
|
||||
@@ -897,7 +896,7 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, part
|
||||
if crypto.S3KMS.IsEncrypted(objInfo.UserDefined) {
|
||||
sseEnc, err := encrypt.NewSSEKMS(objInfo.KMSKeyID(), nil)
|
||||
if err != nil {
|
||||
return putOpts, err
|
||||
return putOpts, false, err
|
||||
}
|
||||
putOpts.ServerSideEncryption = sseEnc
|
||||
}
|
||||
@@ -1290,7 +1289,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
|
||||
// use core client to avoid doing multipart on PUT
|
||||
c := &minio.Core{Client: tgt.Client}
|
||||
|
||||
putOpts, err := putReplicationOpts(ctx, tgt.StorageClass, objInfo, 0)
|
||||
putOpts, isMP, err := putReplicationOpts(ctx, tgt.StorageClass, objInfo)
|
||||
if err != nil {
|
||||
replLogIf(ctx, fmt.Errorf("failure setting options for replication bucket:%s err:%w", bucket, err))
|
||||
sendEvent(eventArgs{
|
||||
@@ -1322,7 +1321,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
|
||||
defer cancel()
|
||||
}
|
||||
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
|
||||
if objInfo.isMultipart() {
|
||||
if isMP {
|
||||
rinfo.Err = replicateObjectWithMultipart(ctx, c, tgt.Bucket, object, r, objInfo, putOpts)
|
||||
} else {
|
||||
_, rinfo.Err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts)
|
||||
@@ -1577,8 +1576,7 @@ applyAction:
|
||||
replLogIf(ctx, fmt.Errorf("unable to replicate metadata for object %s/%s(%s) to target %s: %w", bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), rinfo.Err))
|
||||
}
|
||||
} else {
|
||||
var putOpts minio.PutObjectOptions
|
||||
putOpts, err = putReplicationOpts(ctx, tgt.StorageClass, objInfo, 0)
|
||||
putOpts, isMP, err := putReplicationOpts(ctx, tgt.StorageClass, objInfo)
|
||||
if err != nil {
|
||||
replLogIf(ctx, fmt.Errorf("failed to set replicate options for object %s/%s(%s) (target %s) err:%w", bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), err))
|
||||
sendEvent(eventArgs{
|
||||
@@ -1609,7 +1607,7 @@ applyAction:
|
||||
defer cancel()
|
||||
}
|
||||
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
|
||||
if objInfo.isMultipart() {
|
||||
if isMP {
|
||||
rinfo.Err = replicateObjectWithMultipart(ctx, c, tgt.Bucket, object, r, objInfo, putOpts)
|
||||
} else {
|
||||
_, rinfo.Err = c.PutObject(ctx, tgt.Bucket, object, r, size, "", "", putOpts)
|
||||
@@ -1687,7 +1685,8 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob
|
||||
cHeader := http.Header{}
|
||||
cHeader.Add(xhttp.MinIOSourceReplicationRequest, "true")
|
||||
if !isSSEC {
|
||||
for k, v := range getCRCMeta(objInfo, partInfo.Number, nil) {
|
||||
cs, _ := getCRCMeta(objInfo, partInfo.Number, nil)
|
||||
for k, v := range cs {
|
||||
cHeader.Add(k, v)
|
||||
}
|
||||
}
|
||||
@@ -1732,8 +1731,9 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob
|
||||
defer ccancel()
|
||||
|
||||
if len(objInfo.Checksum) > 0 {
|
||||
for k, v := range getCRCMeta(objInfo, 0, nil) {
|
||||
userMeta[k] = v
|
||||
cs, _ := getCRCMeta(objInfo, 0, nil)
|
||||
for k, v := range cs {
|
||||
userMeta[k] = strings.Split(v, "-")[0]
|
||||
}
|
||||
}
|
||||
_, err = c.CompleteMultipartUpload(cctx, bucket, object, uploadID, uploadedParts, minio.PutObjectOptions{
|
||||
@@ -3782,9 +3782,9 @@ type validateReplicationDestinationOptions struct {
|
||||
checkReadyErr sync.Map
|
||||
}
|
||||
|
||||
func getCRCMeta(oi ObjectInfo, partNum int, h http.Header) map[string]string {
|
||||
func getCRCMeta(oi ObjectInfo, partNum int, h http.Header) (cs map[string]string, isMP bool) {
|
||||
meta := make(map[string]string)
|
||||
cs := oi.decryptChecksums(partNum, h)
|
||||
cs, isMP = oi.decryptChecksums(partNum, h)
|
||||
for k, v := range cs {
|
||||
cksum := hash.NewChecksumString(k, v)
|
||||
if cksum == nil {
|
||||
@@ -3793,7 +3793,10 @@ func getCRCMeta(oi ObjectInfo, partNum int, h http.Header) map[string]string {
|
||||
if cksum.Valid() {
|
||||
meta[cksum.Type.Key()] = v
|
||||
}
|
||||
meta[xhttp.AmzChecksumType] = cksum.Type.ObjType()
|
||||
if isMP && partNum == 0 {
|
||||
meta[xhttp.AmzChecksumType] = cksum.Type.ObjType()
|
||||
meta[xhttp.AmzChecksumAlgo] = cksum.Type.String()
|
||||
}
|
||||
}
|
||||
return meta
|
||||
return meta, isMP
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user