From c7df1ffc6fb0fef099b5d96fb3a961f89f735167 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 5 May 2022 04:14:41 -0700 Subject: [PATCH] avoid concurrent reads and writes to opts.UserDefined (#14862) do not modify opts.UserDefined after object-handler has set all the necessary values, any mutation needed should be done on a copy of this value not directly. As there are other pieces of code that access opts.UserDefined concurrently this becomes challenging. fixes #14856 --- cmd/disk-cache-backend.go | 2 ++ cmd/disk-cache.go | 25 ++++++++-------------- cmd/erasure-multipart.go | 16 +++++++------- cmd/erasure-object.go | 19 +++++++---------- cmd/gateway/s3/gateway-s3-sse.go | 7 ++++--- cmd/gateway/s3/gateway-s3.go | 36 ++++++++++++++++++-------------- cmd/utils.go | 3 +++ 7 files changed, 53 insertions(+), 55 deletions(-) diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go index 8d38a3d6f..f93f77c40 100644 --- a/cmd/disk-cache-backend.go +++ b/cmd/disk-cache-backend.go @@ -884,6 +884,7 @@ func (c *diskCache) put(ctx context.Context, bucket, object string, data io.Read } if err := os.MkdirAll(cachePath, 0o777); err != nil { + removeAll(cachePath) return oi, err } metadata := cloneMSS(opts.UserDefined) @@ -892,6 +893,7 @@ func (c *diskCache) put(ctx context.Context, bucket, object string, data io.Read if globalCacheKMS != nil { reader, err = newCacheEncryptReader(data, bucket, object, metadata) if err != nil { + removeAll(cachePath) return oi, err } actualSize, _ = sio.EncryptedSize(uint64(size)) diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index 12c8112ec..095b6d406 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -33,7 +33,6 @@ import ( "github.com/minio/minio/internal/config/cache" "github.com/minio/minio/internal/disk" "github.com/minio/minio/internal/hash" - xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/sync/errgroup" "github.com/minio/pkg/wildcard" @@ -739,35 +738,30 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r * // Initialize pipe to stream data to cache rPipe, wPipe := io.Pipe() infoCh := make(chan ObjectInfo) - errorCh := make(chan error) go func() { + defer close(infoCh) info, err := putObjectFn(ctx, bucket, object, NewPutObjReader(hashReader), opts) - if err != nil { - close(infoCh) - pipeReader.CloseWithError(err) - rPipe.CloseWithError(err) - errorCh <- err - return + pipeReader.CloseWithError(err) + rPipe.CloseWithError(err) + if err == nil { + infoCh <- info } - close(errorCh) - infoCh <- info }() go func() { _, err := dcache.put(lkctx.Context(), bucket, object, rPipe, r.Size(), nil, opts, false, false) if err != nil { - rPipe.CloseWithError(err) - return + logger.LogIf(lkctx.Context(), err) } + // We do not care about errors to cached backend. + rPipe.Close() }() mwriter := cacheMultiWriter(pipeWriter, wPipe) _, err = io.Copy(mwriter, r) pipeWriter.Close() wPipe.Close() - if err != nil { - err = <-errorCh return ObjectInfo{}, err } info := <-infoCh @@ -803,8 +797,7 @@ func (c *cacheObjects) uploadObject(ctx context.Context, oi ObjectInfo) { return } var opts ObjectOptions - opts.UserDefined = make(map[string]string) - opts.UserDefined[xhttp.ContentMD5] = oi.UserDefined["content-md5"] + opts.UserDefined = cloneMSS(oi.UserDefined) objInfo, err := c.InnerPutObjectFn(ctx, oi.Bucket, oi.Name, NewPutObjReader(hashReader), opts) wbCommitStatus := CommitComplete size := objInfo.Size diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 43ce216db..e2b104180 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -286,8 +286,10 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec // disks. `uploads.json` carries metadata regarding on-going multipart // operation(s) on the object. func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, object string, opts ObjectOptions) (string, error) { + userDefined := cloneMSS(opts.UserDefined) + onlineDisks := er.getDisks() - parityDrives := globalStorageClass.GetParityForSC(opts.UserDefined[xhttp.AmzStorageClass]) + parityDrives := globalStorageClass.GetParityForSC(userDefined[xhttp.AmzStorageClass]) if parityDrives <= 0 { parityDrives = er.defaultParityCount } @@ -308,7 +310,7 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, } } if parityOrig != parityDrives { - opts.UserDefined[minIOErasureUpgraded] = strconv.Itoa(parityOrig) + "->" + strconv.Itoa(parityDrives) + userDefined[minIOErasureUpgraded] = strconv.Itoa(parityOrig) + "->" + strconv.Itoa(parityDrives) } dataDrives := len(onlineDisks) - parityDrives @@ -336,8 +338,8 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, } // Guess content-type from the extension if possible. - if opts.UserDefined["content-type"] == "" { - opts.UserDefined["content-type"] = mimedb.TypeByExtension(path.Ext(object)) + if userDefined["content-type"] == "" { + userDefined["content-type"] = mimedb.TypeByExtension(path.Ext(object)) } modTime := opts.MTime @@ -352,7 +354,7 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, for index := range partsMetadata { partsMetadata[index].Fresh = true partsMetadata[index].ModTime = modTime - partsMetadata[index].Metadata = opts.UserDefined + partsMetadata[index].Metadata = userDefined } uploadID := mustGetUUID() @@ -375,10 +377,6 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string, func (er erasureObjects) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (string, error) { auditObjectErasureSet(ctx, object, &er) - // No metadata is set, allocate a new one. - if opts.UserDefined == nil { - opts.UserDefined = make(map[string]string) - } return er.newMultipartUpload(ctx, bucket, object, opts) } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 77704dee7..7b5f6b51d 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -842,17 +842,14 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st data := r.Reader - // No metadata is set, allocate a new one. - if opts.UserDefined == nil { - opts.UserDefined = make(map[string]string) - } + userDefined := cloneMSS(opts.UserDefined) storageDisks := er.getDisks() parityDrives := len(storageDisks) / 2 if !opts.MaxParity { // Get parity and data drive count based on storage class metadata - parityDrives = globalStorageClass.GetParityForSC(opts.UserDefined[xhttp.AmzStorageClass]) + parityDrives = globalStorageClass.GetParityForSC(userDefined[xhttp.AmzStorageClass]) if parityDrives <= 0 { parityDrives = er.defaultParityCount } @@ -890,7 +887,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st parityDrives = len(storageDisks) / 2 } if parityOrig != parityDrives { - opts.UserDefined[minIOErasureUpgraded] = strconv.Itoa(parityOrig) + "->" + strconv.Itoa(parityDrives) + userDefined[minIOErasureUpgraded] = strconv.Itoa(parityOrig) + "->" + strconv.Itoa(parityDrives) } } dataDrives := len(storageDisks) - parityDrives @@ -1066,13 +1063,13 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st Hash: bitrotWriterSum(w), }) } - if opts.UserDefined["etag"] == "" { - opts.UserDefined["etag"] = r.MD5CurrentHexString() + if userDefined["etag"] == "" { + userDefined["etag"] = r.MD5CurrentHexString() } // Guess content-type from the extension if possible. - if opts.UserDefined["content-type"] == "" { - opts.UserDefined["content-type"] = mimedb.TypeByExtension(path.Ext(object)) + if userDefined["content-type"] == "" { + userDefined["content-type"] = mimedb.TypeByExtension(path.Ext(object)) } modTime := opts.MTime @@ -1083,7 +1080,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st // Fill all the necessary metadata. // Update `xl.meta` content on each disks. for index := range partsMetadata { - partsMetadata[index].Metadata = opts.UserDefined + partsMetadata[index].Metadata = userDefined partsMetadata[index].Size = n partsMetadata[index].ModTime = modTime } diff --git a/cmd/gateway/s3/gateway-s3-sse.go b/cmd/gateway/s3/gateway-s3-sse.go index 2b4dcba04..8d530ca89 100644 --- a/cmd/gateway/s3/gateway-s3-sse.go +++ b/cmd/gateway/s3/gateway-s3-sse.go @@ -364,6 +364,7 @@ func (l *s3EncObjects) GetObjectInfo(ctx context.Context, bucket string, object // CopyObject copies an object from source bucket to a destination bucket. func (l *s3EncObjects) CopyObject(ctx context.Context, srcBucket string, srcObject string, dstBucket string, dstObject string, srcInfo minio.ObjectInfo, s, d minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) { cpSrcDstSame := path.Join(srcBucket, srcObject) == path.Join(dstBucket, dstObject) + userDefined := minio.CloneMSS(srcInfo.UserDefined) if cpSrcDstSame { var gwMeta gwMetaV1 if s.ServerSideEncryption != nil && d.ServerSideEncryption != nil && @@ -378,16 +379,16 @@ func (l *s3EncObjects) CopyObject(ctx context.Context, srcBucket string, srcObje d.ServerSideEncryption.Marshal(header) } for k, v := range header { - srcInfo.UserDefined[k] = v[0] + userDefined[k] = v[0] } - gwMeta.Meta = srcInfo.UserDefined + gwMeta.Meta = userDefined if err = l.writeGWMetadata(ctx, dstBucket, getDareMetaPath(dstObject), gwMeta, minio.ObjectOptions{}); err != nil { return objInfo, minio.ErrorRespToObjectError(err) } return gwMeta.ToObjectInfo(dstBucket, dstObject), nil } } - dstOpts := minio.ObjectOptions{ServerSideEncryption: d.ServerSideEncryption, UserDefined: srcInfo.UserDefined} + dstOpts := minio.ObjectOptions{ServerSideEncryption: d.ServerSideEncryption, UserDefined: userDefined} return l.PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, dstOpts) } diff --git a/cmd/gateway/s3/gateway-s3.go b/cmd/gateway/s3/gateway-s3.go index 7951b0be1..40720fd31 100644 --- a/cmd/gateway/s3/gateway-s3.go +++ b/cmd/gateway/s3/gateway-s3.go @@ -502,17 +502,20 @@ func (l *s3Objects) GetObjectInfo(ctx context.Context, bucket string, object str // PutObject creates a new object with the incoming data, func (l *s3Objects) PutObject(ctx context.Context, bucket string, object string, r *minio.PutObjReader, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) { data := r.Reader + + userDefined := minio.CloneMSS(opts.UserDefined) + var tagMap map[string]string - if tagstr, ok := opts.UserDefined[xhttp.AmzObjectTagging]; ok && tagstr != "" { + if tagstr, ok := userDefined[xhttp.AmzObjectTagging]; ok && tagstr != "" { tagObj, err := tags.ParseObjectTags(tagstr) if err != nil { return objInfo, minio.ErrorRespToObjectError(err, bucket, object) } tagMap = tagObj.ToMap() - delete(opts.UserDefined, xhttp.AmzObjectTagging) + delete(userDefined, xhttp.AmzObjectTagging) } putOpts := miniogo.PutObjectOptions{ - UserMetadata: opts.UserDefined, + UserMetadata: userDefined, ServerSideEncryption: opts.ServerSideEncryption, UserTags: tagMap, // Content-Md5 is needed for buckets with object locking, @@ -529,7 +532,7 @@ func (l *s3Objects) PutObject(ctx context.Context, bucket string, object string, ETag: ui.ETag, Size: ui.Size, Key: object, - Metadata: minio.ToMinioClientObjectInfoMetadata(opts.UserDefined), + Metadata: minio.ToMinioClientObjectInfoMetadata(userDefined), } return minio.FromMinioClientObjectInfo(bucket, oi), nil @@ -544,8 +547,9 @@ func (l *s3Objects) CopyObject(ctx context.Context, srcBucket string, srcObject // metadata input is already a trickled down value from interpreting x-amz-metadata-directive at // handler layer. So what we have right now is supposed to be applied on the destination object anyways. // So preserve it by adding "REPLACE" directive to save all the metadata set by CopyObject API. - srcInfo.UserDefined["x-amz-metadata-directive"] = "REPLACE" - srcInfo.UserDefined["x-amz-copy-source-if-match"] = srcInfo.ETag + userDefined := minio.CloneMSS(srcInfo.UserDefined) + userDefined["x-amz-metadata-directive"] = "REPLACE" + userDefined["x-amz-copy-source-if-match"] = srcInfo.ETag header := make(http.Header) if srcOpts.ServerSideEncryption != nil { encrypt.SSECopy(srcOpts.ServerSideEncryption).Marshal(header) @@ -556,10 +560,10 @@ func (l *s3Objects) CopyObject(ctx context.Context, srcBucket string, srcObject } for k, v := range header { - srcInfo.UserDefined[k] = v[0] + userDefined[k] = v[0] } - if _, err = l.Client.CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo.UserDefined, miniogo.CopySrcOptions{}, miniogo.PutObjectOptions{}); err != nil { + if _, err = l.Client.CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, userDefined, miniogo.CopySrcOptions{}, miniogo.PutObjectOptions{}); err != nil { return objInfo, minio.ErrorRespToObjectError(err, srcBucket, srcObject) } return l.GetObjectInfo(ctx, dstBucket, dstObject, dstOpts) @@ -605,17 +609,18 @@ func (l *s3Objects) ListMultipartUploads(ctx context.Context, bucket string, pre // NewMultipartUpload upload object in multiple parts func (l *s3Objects) NewMultipartUpload(ctx context.Context, bucket string, object string, o minio.ObjectOptions) (uploadID string, err error) { var tagMap map[string]string - if tagStr, ok := o.UserDefined[xhttp.AmzObjectTagging]; ok { + userDefined := minio.CloneMSS(o.UserDefined) + if tagStr, ok := userDefined[xhttp.AmzObjectTagging]; ok { tagObj, err := tags.Parse(tagStr, true) if err != nil { return uploadID, minio.ErrorRespToObjectError(err, bucket, object) } tagMap = tagObj.ToMap() - delete(o.UserDefined, xhttp.AmzObjectTagging) + delete(userDefined, xhttp.AmzObjectTagging) } // Create PutObject options opts := miniogo.PutObjectOptions{ - UserMetadata: o.UserDefined, + UserMetadata: userDefined, ServerSideEncryption: o.ServerSideEncryption, UserTags: tagMap, } @@ -645,9 +650,8 @@ func (l *s3Objects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, de if srcOpts.CheckPrecondFn != nil && srcOpts.CheckPrecondFn(srcInfo) { return minio.PartInfo{}, minio.PreConditionFailed{} } - srcInfo.UserDefined = map[string]string{ - "x-amz-copy-source-if-match": srcInfo.ETag, - } + userDefined := minio.CloneMSS(srcInfo.UserDefined) + userDefined["x-amz-copy-source-if-match"] = srcInfo.ETag header := make(http.Header) if srcOpts.ServerSideEncryption != nil { encrypt.SSECopy(srcOpts.ServerSideEncryption).Marshal(header) @@ -657,11 +661,11 @@ func (l *s3Objects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, de dstOpts.ServerSideEncryption.Marshal(header) } for k, v := range header { - srcInfo.UserDefined[k] = v[0] + userDefined[k] = v[0] } completePart, err := l.Client.CopyObjectPart(ctx, srcBucket, srcObject, destBucket, destObject, - uploadID, partID, startOffset, length, srcInfo.UserDefined) + uploadID, partID, startOffset, length, userDefined) if err != nil { return p, minio.ErrorRespToObjectError(err, srcBucket, srcObject) } diff --git a/cmd/utils.go b/cmd/utils.go index 02a1e3b46..5be2e14cb 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -130,6 +130,9 @@ func getWriteQuorum(drive int) int { return quorum } +// CloneMSS is an exposed function of cloneMSS for gateway usage. +var CloneMSS = cloneMSS + // cloneMSS will clone a map[string]string. // If input is nil an empty map is returned, not nil. func cloneMSS(v map[string]string) map[string]string {