mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
fix: multipart replication and encrypted etag for sse-s3 (#13171)
Replication was not working properly for encrypted objects in single PUT object for preserving etag, We need to make sure to preserve etag such that replication works properly and not gets into infinite loops of copying due to ETag mismatches.
This commit is contained in:
parent
9af4e7b1da
commit
0892f1e406
@ -869,7 +869,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
}
|
}
|
||||||
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
|
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
|
||||||
if objInfo.Multipart {
|
if objInfo.isMultipart() {
|
||||||
if err := replicateObjectWithMultipart(ctx, c, dest.Bucket, object,
|
if err := replicateObjectWithMultipart(ctx, c, dest.Bucket, object,
|
||||||
r, objInfo, putOpts); err != nil {
|
r, objInfo, putOpts); err != nil {
|
||||||
replicationStatus = replication.Failed
|
replicationStatus = replication.Failed
|
||||||
|
@ -68,10 +68,10 @@ const (
|
|||||||
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// isEncryptedMultipart returns true if the current object is
|
// isMultipart returns true if the current object is
|
||||||
// uploaded by the user using multipart mechanism:
|
// uploaded by the user using multipart mechanism:
|
||||||
// initiate new multipart, upload part, complete upload
|
// initiate new multipart, upload part, complete upload
|
||||||
func (o *ObjectInfo) isEncryptedMultipart() bool {
|
func (o *ObjectInfo) isMultipart() bool {
|
||||||
if len(o.Parts) == 0 {
|
if len(o.Parts) == 0 {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -427,7 +427,7 @@ func DecryptBlocksRequestR(inputReader io.Reader, h http.Header, seqNumber uint3
|
|||||||
|
|
||||||
bucket, object := oi.Bucket, oi.Name
|
bucket, object := oi.Bucket, oi.Name
|
||||||
// Single part case
|
// Single part case
|
||||||
if !oi.isEncryptedMultipart() {
|
if !oi.isMultipart() {
|
||||||
var reader io.Reader
|
var reader io.Reader
|
||||||
var err error
|
var err error
|
||||||
if copySource {
|
if copySource {
|
||||||
@ -589,7 +589,7 @@ func (o *ObjectInfo) DecryptedSize() (int64, error) {
|
|||||||
if _, ok := crypto.IsEncrypted(o.UserDefined); !ok {
|
if _, ok := crypto.IsEncrypted(o.UserDefined); !ok {
|
||||||
return 0, errors.New("Cannot compute decrypted size of an unencrypted object")
|
return 0, errors.New("Cannot compute decrypted size of an unencrypted object")
|
||||||
}
|
}
|
||||||
if !o.isEncryptedMultipart() {
|
if !o.isMultipart() {
|
||||||
size, err := sio.DecryptedSize(uint64(o.Size))
|
size, err := sio.DecryptedSize(uint64(o.Size))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = errObjectTampered // assign correct error type
|
err = errObjectTampered // assign correct error type
|
||||||
@ -732,7 +732,7 @@ func (o *ObjectInfo) GetDecryptedRange(rs *HTTPRangeSpec) (encOff, encLength, sk
|
|||||||
// Assemble slice of (decrypted) part sizes in `sizes`
|
// Assemble slice of (decrypted) part sizes in `sizes`
|
||||||
var sizes []int64
|
var sizes []int64
|
||||||
var decObjSize int64 // decrypted total object size
|
var decObjSize int64 // decrypted total object size
|
||||||
if o.isEncryptedMultipart() {
|
if o.isMultipart() {
|
||||||
sizes = make([]int64, len(o.Parts))
|
sizes = make([]int64, len(o.Parts))
|
||||||
for i, part := range o.Parts {
|
for i, part := range o.Parts {
|
||||||
var partSize uint64
|
var partSize uint64
|
||||||
|
@ -27,8 +27,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/minio/minio/internal/bucket/replication"
|
"github.com/minio/minio/internal/bucket/replication"
|
||||||
"github.com/minio/minio/internal/crypto"
|
|
||||||
"github.com/minio/minio/internal/etag"
|
|
||||||
xhttp "github.com/minio/minio/internal/http"
|
xhttp "github.com/minio/minio/internal/http"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
"github.com/minio/minio/internal/sync/errgroup"
|
"github.com/minio/minio/internal/sync/errgroup"
|
||||||
@ -143,13 +141,6 @@ func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo {
|
|||||||
// Extract etag from metadata.
|
// Extract etag from metadata.
|
||||||
objInfo.ETag = extractETag(fi.Metadata)
|
objInfo.ETag = extractETag(fi.Metadata)
|
||||||
|
|
||||||
// Verify if Etag is parseable, if yes
|
|
||||||
// then check if its multipart etag.
|
|
||||||
et, e := etag.Parse(objInfo.ETag)
|
|
||||||
if e == nil {
|
|
||||||
objInfo.Multipart = et.IsMultipart()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add user tags to the object info
|
// Add user tags to the object info
|
||||||
tags := fi.Metadata[xhttp.AmzObjectTagging]
|
tags := fi.Metadata[xhttp.AmzObjectTagging]
|
||||||
if len(tags) != 0 {
|
if len(tags) != 0 {
|
||||||
@ -176,14 +167,6 @@ func (fi FileInfo) ToObjectInfo(bucket, object string) ObjectInfo {
|
|||||||
// Tags have also been extracted, we remove that as well.
|
// Tags have also been extracted, we remove that as well.
|
||||||
objInfo.UserDefined = cleanMetadata(fi.Metadata)
|
objInfo.UserDefined = cleanMetadata(fi.Metadata)
|
||||||
|
|
||||||
// Set multipart for encryption properly if
|
|
||||||
// not set already.
|
|
||||||
if !objInfo.Multipart {
|
|
||||||
if _, ok := crypto.IsEncrypted(objInfo.UserDefined); ok {
|
|
||||||
objInfo.Multipart = crypto.IsMultiPart(objInfo.UserDefined)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// All the parts per object.
|
// All the parts per object.
|
||||||
objInfo.Parts = fi.Parts
|
objInfo.Parts = fi.Parts
|
||||||
|
|
||||||
|
@ -783,9 +783,6 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
|||||||
return oi, toObjectErr(err, bucket, object, uploadID)
|
return oi, toObjectErr(err, bucket, object, uploadID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculate s3 compatible md5sum for complete multipart.
|
|
||||||
s3MD5 := getCompleteMultipartMD5(parts)
|
|
||||||
|
|
||||||
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
uploadIDPath := er.getUploadIDDir(bucket, object, uploadID)
|
||||||
|
|
||||||
storageDisks := er.getDisks()
|
storageDisks := er.getDisks()
|
||||||
@ -882,9 +879,9 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Save successfully calculated md5sum.
|
// Save successfully calculated md5sum.
|
||||||
fi.Metadata["etag"] = s3MD5
|
fi.Metadata["etag"] = opts.UserDefined["etag"]
|
||||||
if opts.UserDefined["etag"] != "" { // preserve ETag if set
|
if fi.Metadata["etag"] == "" {
|
||||||
fi.Metadata["etag"] = opts.UserDefined["etag"]
|
fi.Metadata["etag"] = getCompleteMultipartMD5(parts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save the consolidated actual size.
|
// Save the consolidated actual size.
|
||||||
|
@ -29,8 +29,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
jsoniter "github.com/json-iterator/go"
|
jsoniter "github.com/json-iterator/go"
|
||||||
"github.com/minio/minio/internal/crypto"
|
|
||||||
"github.com/minio/minio/internal/etag"
|
|
||||||
xhttp "github.com/minio/minio/internal/http"
|
xhttp "github.com/minio/minio/internal/http"
|
||||||
"github.com/minio/minio/internal/lock"
|
"github.com/minio/minio/internal/lock"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
@ -170,11 +168,6 @@ func (m fsMetaV1) ToObjectInfo(bucket, object string, fi os.FileInfo) ObjectInfo
|
|||||||
|
|
||||||
objInfo.ETag = extractETag(m.Meta)
|
objInfo.ETag = extractETag(m.Meta)
|
||||||
|
|
||||||
et, e := etag.Parse(objInfo.ETag)
|
|
||||||
if e == nil {
|
|
||||||
objInfo.Multipart = et.IsMultipart()
|
|
||||||
}
|
|
||||||
|
|
||||||
objInfo.ContentType = m.Meta["content-type"]
|
objInfo.ContentType = m.Meta["content-type"]
|
||||||
objInfo.ContentEncoding = m.Meta["content-encoding"]
|
objInfo.ContentEncoding = m.Meta["content-encoding"]
|
||||||
if storageClass, ok := m.Meta[xhttp.AmzStorageClass]; ok {
|
if storageClass, ok := m.Meta[xhttp.AmzStorageClass]; ok {
|
||||||
@ -184,6 +177,7 @@ func (m fsMetaV1) ToObjectInfo(bucket, object string, fi os.FileInfo) ObjectInfo
|
|||||||
}
|
}
|
||||||
var (
|
var (
|
||||||
t time.Time
|
t time.Time
|
||||||
|
e error
|
||||||
)
|
)
|
||||||
if exp, ok := m.Meta["expires"]; ok {
|
if exp, ok := m.Meta["expires"]; ok {
|
||||||
if t, e = time.Parse(http.TimeFormat, exp); e == nil {
|
if t, e = time.Parse(http.TimeFormat, exp); e == nil {
|
||||||
@ -200,14 +194,6 @@ func (m fsMetaV1) ToObjectInfo(bucket, object string, fi os.FileInfo) ObjectInfo
|
|||||||
// Tags have also been extracted, we remove that as well.
|
// Tags have also been extracted, we remove that as well.
|
||||||
objInfo.UserDefined = cleanMetadata(m.Meta)
|
objInfo.UserDefined = cleanMetadata(m.Meta)
|
||||||
|
|
||||||
// Set multipart for encryption properly if
|
|
||||||
// not set already.
|
|
||||||
if !objInfo.Multipart {
|
|
||||||
if _, ok := crypto.IsEncrypted(objInfo.UserDefined); ok {
|
|
||||||
objInfo.Multipart = crypto.IsMultiPart(objInfo.UserDefined)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// All the parts per object.
|
// All the parts per object.
|
||||||
objInfo.Parts = m.Parts
|
objInfo.Parts = m.Parts
|
||||||
|
|
||||||
|
@ -564,9 +564,6 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
|||||||
return oi, toObjectErr(err, bucket, object)
|
return oi, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Calculate s3 compatible md5sum for complete multipart.
|
|
||||||
s3MD5 := getCompleteMultipartMD5(parts)
|
|
||||||
|
|
||||||
// ensure that part ETag is canonicalized to strip off extraneous quotes
|
// ensure that part ETag is canonicalized to strip off extraneous quotes
|
||||||
for i := range parts {
|
for i := range parts {
|
||||||
parts[i].ETag = canonicalizeETag(parts[i].ETag)
|
parts[i].ETag = canonicalizeETag(parts[i].ETag)
|
||||||
@ -755,7 +752,12 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
|||||||
if fsMeta.Meta == nil {
|
if fsMeta.Meta == nil {
|
||||||
fsMeta.Meta = make(map[string]string)
|
fsMeta.Meta = make(map[string]string)
|
||||||
}
|
}
|
||||||
fsMeta.Meta["etag"] = s3MD5
|
|
||||||
|
fsMeta.Meta["etag"] = opts.UserDefined["etag"]
|
||||||
|
if fsMeta.Meta["etag"] == "" {
|
||||||
|
fsMeta.Meta["etag"] = getCompleteMultipartMD5(parts)
|
||||||
|
}
|
||||||
|
|
||||||
// Save consolidated actual size.
|
// Save consolidated actual size.
|
||||||
fsMeta.Meta[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10)
|
fsMeta.Meta[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10)
|
||||||
if _, err = fsMeta.WriteTo(metaFile); err != nil {
|
if _, err = fsMeta.WriteTo(metaFile); err != nil {
|
||||||
|
@ -165,8 +165,6 @@ type ObjectInfo struct {
|
|||||||
|
|
||||||
Legacy bool // indicates object on disk is in legacy data format
|
Legacy bool // indicates object on disk is in legacy data format
|
||||||
|
|
||||||
Multipart bool // indicates if object is multipart object.
|
|
||||||
|
|
||||||
// backendType indicates which backend filled this structure
|
// backendType indicates which backend filled this structure
|
||||||
backendType BackendType
|
backendType BackendType
|
||||||
|
|
||||||
@ -187,7 +185,6 @@ func (o ObjectInfo) Clone() (cinfo ObjectInfo) {
|
|||||||
Size: o.Size,
|
Size: o.Size,
|
||||||
IsDir: o.IsDir,
|
IsDir: o.IsDir,
|
||||||
ETag: o.ETag,
|
ETag: o.ETag,
|
||||||
Multipart: o.Multipart,
|
|
||||||
InnerETag: o.InnerETag,
|
InnerETag: o.InnerETag,
|
||||||
VersionID: o.VersionID,
|
VersionID: o.VersionID,
|
||||||
IsLatest: o.IsLatest,
|
IsLatest: o.IsLatest,
|
||||||
|
@ -357,5 +357,6 @@ func completeMultipartOpts(ctx context.Context, r *http.Request, bucket, object
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
opts.MTime = mtime
|
opts.MTime = mtime
|
||||||
|
opts.UserDefined = make(map[string]string)
|
||||||
return opts, nil
|
return opts, nil
|
||||||
}
|
}
|
||||||
|
@ -290,10 +290,10 @@ func cleanMetadataKeys(metadata map[string]string, keyNames ...string) map[strin
|
|||||||
|
|
||||||
// Extracts etag value from the metadata.
|
// Extracts etag value from the metadata.
|
||||||
func extractETag(metadata map[string]string) string {
|
func extractETag(metadata map[string]string) string {
|
||||||
// md5Sum tag is kept for backward compatibility.
|
etag, ok := metadata["etag"]
|
||||||
etag, ok := metadata["md5Sum"]
|
|
||||||
if !ok {
|
if !ok {
|
||||||
etag = metadata["etag"]
|
// md5Sum tag is kept for backward compatibility.
|
||||||
|
etag = metadata["md5Sum"]
|
||||||
}
|
}
|
||||||
// Success.
|
// Success.
|
||||||
return etag
|
return etag
|
||||||
|
@ -3046,8 +3046,10 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
|||||||
|
|
||||||
// Complete parts.
|
// Complete parts.
|
||||||
completeParts := make([]CompletePart, 0, len(complMultipartUpload.Parts))
|
completeParts := make([]CompletePart, 0, len(complMultipartUpload.Parts))
|
||||||
|
originalCompleteParts := make([]CompletePart, 0, len(complMultipartUpload.Parts))
|
||||||
for _, part := range complMultipartUpload.Parts {
|
for _, part := range complMultipartUpload.Parts {
|
||||||
part.ETag = canonicalizeETag(part.ETag)
|
part.ETag = canonicalizeETag(part.ETag)
|
||||||
|
originalCompleteParts = append(originalCompleteParts, part)
|
||||||
if isEncrypted {
|
if isEncrypted {
|
||||||
// ETag is stored in the backend in encrypted form. Validate client sent ETag with
|
// ETag is stored in the backend in encrypted form. Validate client sent ETag with
|
||||||
// decrypted ETag.
|
// decrypted ETag.
|
||||||
@ -3063,6 +3065,9 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
|||||||
completeParts = append(completeParts, part)
|
completeParts = append(completeParts, part)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Calculate s3 compatible md5sum for complete multipart.
|
||||||
|
s3MD5 := getCompleteMultipartMD5(originalCompleteParts)
|
||||||
|
|
||||||
completeMultiPartUpload := objectAPI.CompleteMultipartUpload
|
completeMultiPartUpload := objectAPI.CompleteMultipartUpload
|
||||||
|
|
||||||
// This code is specifically to handle the requirements for slow
|
// This code is specifically to handle the requirements for slow
|
||||||
@ -3104,6 +3109,11 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// preserve ETag if set, or set from parts.
|
||||||
|
if _, ok := opts.UserDefined["etag"]; !ok {
|
||||||
|
opts.UserDefined["etag"] = s3MD5
|
||||||
|
}
|
||||||
|
|
||||||
w = &whiteSpaceWriter{ResponseWriter: w, Flusher: w.(http.Flusher)}
|
w = &whiteSpaceWriter{ResponseWriter: w, Flusher: w.(http.Flusher)}
|
||||||
completeDoneCh := sendWhiteSpace(w)
|
completeDoneCh := sendWhiteSpace(w)
|
||||||
objInfo, err := completeMultiPartUpload(ctx, bucket, object, uploadID, completeParts, opts)
|
objInfo, err := completeMultiPartUpload(ctx, bucket, object, uploadID, completeParts, opts)
|
||||||
|
Loading…
Reference in New Issue
Block a user