mirror of
https://github.com/minio/minio.git
synced 2025-02-03 01:46:00 -05:00
replication: pass checksum headers to replica (#19834)
This commit is contained in:
parent
7edc352d23
commit
5aaef9790f
@ -1476,7 +1476,7 @@ func (j *BatchJobRequest) load(ctx context.Context, api ObjectLayer, name string
|
|||||||
|
|
||||||
func batchReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions, err error) {
|
func batchReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions, err error) {
|
||||||
// TODO: support custom storage class for remote replication
|
// TODO: support custom storage class for remote replication
|
||||||
putOpts, err = putReplicationOpts(ctx, "", objInfo)
|
putOpts, err = putReplicationOpts(ctx, "", objInfo, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return putOpts, err
|
return putOpts, err
|
||||||
}
|
}
|
||||||
|
@ -763,12 +763,32 @@ func (m caseInsensitiveMap) Lookup(key string) (string, bool) {
|
|||||||
return "", false
|
return "", false
|
||||||
}
|
}
|
||||||
|
|
||||||
func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (putOpts minio.PutObjectOptions, err error) {
|
func getCRCMeta(oi ObjectInfo, partNum int) map[string]string {
|
||||||
|
meta := make(map[string]string)
|
||||||
|
cs := oi.decryptChecksums(partNum)
|
||||||
|
for k, v := range cs {
|
||||||
|
cksum := hash.NewChecksumString(k, v)
|
||||||
|
if cksum == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if cksum.Valid() {
|
||||||
|
meta[cksum.Type.Key()] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return meta
|
||||||
|
}
|
||||||
|
|
||||||
|
func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo, partNum int) (putOpts minio.PutObjectOptions, err error) {
|
||||||
meta := make(map[string]string)
|
meta := make(map[string]string)
|
||||||
for k, v := range objInfo.UserDefined {
|
for k, v := range objInfo.UserDefined {
|
||||||
// In case of SSE-C objects copy the allowed internal headers as well
|
// In case of SSE-C objects copy the allowed internal headers as well
|
||||||
if !crypto.SSEC.IsEncrypted(objInfo.UserDefined) || !slices.Contains(maps.Keys(validSSEReplicationHeaders), k) {
|
if !crypto.SSEC.IsEncrypted(objInfo.UserDefined) || !slices.Contains(maps.Keys(validSSEReplicationHeaders), k) {
|
||||||
if stringsHasPrefixFold(k, ReservedMetadataPrefixLower) {
|
if stringsHasPrefixFold(k, ReservedMetadataPrefixLower) {
|
||||||
|
if strings.EqualFold(k, ReservedMetadataPrefixLower+"crc") {
|
||||||
|
for k, v := range getCRCMeta(objInfo, partNum) {
|
||||||
|
meta[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if isStandardHeader(k) {
|
if isStandardHeader(k) {
|
||||||
@ -782,6 +802,12 @@ func putReplicationOpts(ctx context.Context, sc string, objInfo ObjectInfo) (put
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(objInfo.Checksum) > 0 {
|
||||||
|
for k, v := range getCRCMeta(objInfo, 0) {
|
||||||
|
meta[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if sc == "" && (objInfo.StorageClass == storageclass.STANDARD || objInfo.StorageClass == storageclass.RRS) {
|
if sc == "" && (objInfo.StorageClass == storageclass.STANDARD || objInfo.StorageClass == storageclass.RRS) {
|
||||||
sc = objInfo.StorageClass
|
sc = objInfo.StorageClass
|
||||||
}
|
}
|
||||||
@ -1239,7 +1265,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
|
|||||||
// use core client to avoid doing multipart on PUT
|
// use core client to avoid doing multipart on PUT
|
||||||
c := &minio.Core{Client: tgt.Client}
|
c := &minio.Core{Client: tgt.Client}
|
||||||
|
|
||||||
putOpts, err := putReplicationOpts(ctx, tgt.StorageClass, objInfo)
|
putOpts, err := putReplicationOpts(ctx, tgt.StorageClass, objInfo, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
replLogIf(ctx, fmt.Errorf("failure setting options for replication bucket:%s err:%w", bucket, err))
|
replLogIf(ctx, fmt.Errorf("failure setting options for replication bucket:%s err:%w", bucket, err))
|
||||||
sendEvent(eventArgs{
|
sendEvent(eventArgs{
|
||||||
@ -1506,7 +1532,7 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
var putOpts minio.PutObjectOptions
|
var putOpts minio.PutObjectOptions
|
||||||
putOpts, err = putReplicationOpts(ctx, tgt.StorageClass, objInfo)
|
putOpts, err = putReplicationOpts(ctx, tgt.StorageClass, objInfo, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
replLogIf(ctx, fmt.Errorf("failed to set replicate options for object %s/%s(%s) (target %s) err:%w", bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), err))
|
replLogIf(ctx, fmt.Errorf("failed to set replicate options for object %s/%s(%s) (target %s) err:%w", bucket, objInfo.Name, objInfo.VersionID, tgt.EndpointURL(), err))
|
||||||
sendEvent(eventArgs{
|
sendEvent(eventArgs{
|
||||||
@ -1614,6 +1640,10 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob
|
|||||||
|
|
||||||
cHeader := http.Header{}
|
cHeader := http.Header{}
|
||||||
cHeader.Add(xhttp.MinIOSourceReplicationRequest, "true")
|
cHeader.Add(xhttp.MinIOSourceReplicationRequest, "true")
|
||||||
|
crc := getCRCMeta(objInfo, partInfo.Number)
|
||||||
|
for k, v := range crc {
|
||||||
|
cHeader.Add(k, v)
|
||||||
|
}
|
||||||
popts := minio.PutObjectPartOptions{
|
popts := minio.PutObjectPartOptions{
|
||||||
SSE: opts.ServerSideEncryption,
|
SSE: opts.ServerSideEncryption,
|
||||||
CustomHeader: cHeader,
|
CustomHeader: cHeader,
|
||||||
@ -1633,8 +1663,12 @@ func replicateObjectWithMultipart(ctx context.Context, c *minio.Core, bucket, ob
|
|||||||
return fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, partInfo.ActualSize)
|
return fmt.Errorf("Part size mismatch: got %d, want %d", pInfo.Size, partInfo.ActualSize)
|
||||||
}
|
}
|
||||||
uploadedParts = append(uploadedParts, minio.CompletePart{
|
uploadedParts = append(uploadedParts, minio.CompletePart{
|
||||||
PartNumber: pInfo.PartNumber,
|
PartNumber: pInfo.PartNumber,
|
||||||
ETag: pInfo.ETag,
|
ETag: pInfo.ETag,
|
||||||
|
ChecksumCRC32: pInfo.ChecksumCRC32,
|
||||||
|
ChecksumCRC32C: pInfo.ChecksumCRC32C,
|
||||||
|
ChecksumSHA1: pInfo.ChecksumSHA1,
|
||||||
|
ChecksumSHA256: pInfo.ChecksumSHA256,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1296,7 +1296,8 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
|||||||
|
|
||||||
if checksumType.IsSet() {
|
if checksumType.IsSet() {
|
||||||
checksumType |= hash.ChecksumMultipart | hash.ChecksumIncludesMultipart
|
checksumType |= hash.ChecksumMultipart | hash.ChecksumIncludesMultipart
|
||||||
cs := hash.NewChecksumFromData(checksumType, checksumCombined)
|
var cs *hash.Checksum
|
||||||
|
cs = hash.NewChecksumFromData(checksumType, checksumCombined)
|
||||||
fi.Checksum = cs.AppendTo(nil, checksumCombined)
|
fi.Checksum = cs.AppendTo(nil, checksumCombined)
|
||||||
if opts.EncryptFn != nil {
|
if opts.EncryptFn != nil {
|
||||||
fi.Checksum = opts.EncryptFn("object-checksum", fi.Checksum)
|
fi.Checksum = opts.EncryptFn("object-checksum", fi.Checksum)
|
||||||
|
@ -773,6 +773,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
|||||||
|
|
||||||
_, isEncrypted := crypto.IsEncrypted(mi.UserDefined)
|
_, isEncrypted := crypto.IsEncrypted(mi.UserDefined)
|
||||||
_, replicationStatus := mi.UserDefined[xhttp.AmzBucketReplicationStatus]
|
_, replicationStatus := mi.UserDefined[xhttp.AmzBucketReplicationStatus]
|
||||||
|
_, sourceReplReq := r.Header[xhttp.MinIOSourceReplicationRequest]
|
||||||
var objectEncryptionKey crypto.ObjectKey
|
var objectEncryptionKey crypto.ObjectKey
|
||||||
if isEncrypted {
|
if isEncrypted {
|
||||||
if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(mi.UserDefined) && !replicationStatus {
|
if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(mi.UserDefined) && !replicationStatus {
|
||||||
@ -795,7 +796,6 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, sourceReplReq := r.Header[xhttp.MinIOSourceReplicationRequest]
|
|
||||||
if !(sourceReplReq && crypto.SSEC.IsEncrypted(mi.UserDefined)) {
|
if !(sourceReplReq && crypto.SSEC.IsEncrypted(mi.UserDefined)) {
|
||||||
// Calculating object encryption key
|
// Calculating object encryption key
|
||||||
key, err = decryptObjectMeta(key, bucket, object, mi.UserDefined)
|
key, err = decryptObjectMeta(key, bucket, object, mi.UserDefined)
|
||||||
@ -847,6 +847,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
|||||||
}
|
}
|
||||||
opts.IndexCB = idxCb
|
opts.IndexCB = idxCb
|
||||||
|
|
||||||
|
opts.ReplicationRequest = sourceReplReq
|
||||||
putObjectPart := objectAPI.PutObjectPart
|
putObjectPart := objectAPI.PutObjectPart
|
||||||
|
|
||||||
partInfo, err := putObjectPart(ctx, bucket, object, uploadID, partID, pReader, opts)
|
partInfo, err := putObjectPart(ctx, bucket, object, uploadID, partID, pReader, opts)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user