preserve replicated ETag properly on target (#16129)

This commit is contained in:
Poorna 2022-11-26 14:43:32 -08:00 committed by GitHub
parent ce53d7f6c2
commit 63fc6ba2cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 14 additions and 5 deletions

View File

@ -350,7 +350,7 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
rctx := lkctx.Context()
obj, err := er.getObjectInfo(rctx, bucket, object, opts)
lk.RUnlock(lkctx.Cancel)
if err != nil {
if err != nil && !isErrVersionNotFound(err) {
return nil, err
}
if opts.CheckPrecondFn(obj) {
@ -359,7 +359,9 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
}
userDefined := cloneMSS(opts.UserDefined)
if opts.PreserveETag != "" {
userDefined["etag"] = opts.PreserveETag
}
onlineDisks := er.getDisks()
parityDrives := globalStorageClass.GetParityForSC(userDefined[xhttp.AmzStorageClass])
if parityDrives < 0 {
@ -1151,9 +1153,13 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
}
// Save successfully calculated md5sum.
fi.Metadata["etag"] = opts.UserDefined["etag"]
// for replica, newMultipartUpload would have already sent the replication ETag
if fi.Metadata["etag"] == "" {
fi.Metadata["etag"] = getCompleteMultipartMD5(parts)
if opts.UserDefined["etag"] != "" {
fi.Metadata["etag"] = opts.UserDefined["etag"]
} else { // fallback if not already calculated in handler.
fi.Metadata["etag"] = getCompleteMultipartMD5(parts)
}
}
// Save the consolidated actual size.

View File

@ -943,7 +943,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
if opts.CheckPrecondFn != nil {
obj, err := er.getObjectInfo(ctx, bucket, object, opts)
if err != nil {
if err != nil && !isErrVersionNotFound(err) {
return objInfo, err
}
if opts.CheckPrecondFn(obj) {

View File

@ -293,6 +293,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada
Err: fmt.Errorf("invalid/unknown checksum sent: %v", err),
}
}
etag := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceETag))
if crypto.S3KMS.IsRequested(r.Header) {
keyID, context, err := crypto.S3KMS.ParseHTTP(r.Header)
@ -311,6 +312,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada
VersionSuspended: versionSuspended,
MTime: mtime,
WantChecksum: wantCRC,
PreserveETag: etag,
}, nil
}
// default case of passing encryption headers and UserDefined metadata to backend
@ -325,6 +327,7 @@ func putOpts(ctx context.Context, r *http.Request, bucket, object string, metada
opts.ReplicationSourceLegalholdTimestamp = lholdtimestmp
opts.ReplicationSourceRetentionTimestamp = retaintimestmp
opts.ReplicationSourceTaggingTimestamp = taggingtimestmp
opts.PreserveETag = etag
opts.WantChecksum = wantCRC
return opts, nil