diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 46b3f12f4..9374557ff 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -473,24 +473,6 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec return partInfo, nil } -func undoRenamePart(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, errs []error) { - // Undo rename object on disks where RenameFile succeeded. - g := errgroup.WithNErrs(len(disks)) - for index, disk := range disks { - if disk == nil { - continue - } - index := index - g.Go(func() error { - if errs[index] == nil { - _ = disks[index].RenameFile(context.TODO(), dstBucket, dstEntry, srcBucket, srcEntry) - } - return nil - }, index) - } - g.Wait() -} - // renamePart - renames multipart part to its relevant location under uploadID. func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, writeQuorum int) ([]StorageAPI, error) { g := errgroup.WithNErrs(len(disks)) @@ -509,15 +491,13 @@ func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, ds // Wait for all renames to finish. errs := g.Wait() + // Do not need to undo partial successful operation since those will be cleaned up + // in 24hrs via multipart cleaner, never rename() back to `.minio.sys/tmp` as there + // is no way to clean them. + // We can safely allow RenameFile errors up to len(er.getDisks()) - writeQuorum // otherwise return failure. Cleanup successful renames. - err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) - if err == errErasureWriteQuorum { - // Undo all the partial rename operations. - undoRenamePart(disks, srcBucket, srcEntry, dstBucket, dstEntry, errs) - } - - return evalDisks(disks, errs), err + return evalDisks(disks, errs), reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) } // writeAllDisks - writes 'b' to all provided disks. diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 808ddae56..f7be534b6 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -49,7 +49,6 @@ import ( "github.com/minio/minio/internal/crypto" "github.com/minio/minio/internal/etag" "github.com/minio/minio/internal/event" - "github.com/minio/minio/internal/fips" "github.com/minio/minio/internal/handlers" "github.com/minio/minio/internal/hash" xhttp "github.com/minio/minio/internal/http" @@ -60,7 +59,6 @@ import ( "github.com/minio/pkg/bucket/policy" iampolicy "github.com/minio/pkg/iam/policy" xnet "github.com/minio/pkg/net" - "github.com/minio/sio" ) // supportedHeadGetReqParams - supported request parameters for GET and HEAD presigned request. @@ -2293,344 +2291,6 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h writeSuccessResponseHeadersOnly(w) } -// CopyObjectPartHandler - uploads a part by copying data from an existing object as data source. -func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "CopyObjectPart") - - defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) - - objectAPI := api.ObjectAPI() - if objectAPI == nil { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) - return - } - - if crypto.S3KMS.IsRequested(r.Header) { // SSE-KMS is not supported - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) - return - } - - if crypto.Requested(r.Header) && !objectAPI.IsEncryptionSupported() { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) - return - } - - vars := mux.Vars(r) - dstBucket := vars["bucket"] - dstObject, err := unescapePath(vars["object"]) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectAction, dstBucket, dstObject); s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - - // Read escaped copy source path to check for parameters. - cpSrcPath := r.Header.Get(xhttp.AmzCopySource) - var vid string - if u, err := url.Parse(cpSrcPath); err == nil { - vid = strings.TrimSpace(u.Query().Get(xhttp.VersionID)) - // Note that url.Parse does the unescaping - cpSrcPath = u.Path - } - - srcBucket, srcObject := path2BucketObject(cpSrcPath) - // If source object is empty or bucket is empty, reply back invalid copy source. - if srcObject == "" || srcBucket == "" { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL) - return - } - - if vid != "" && vid != nullVersionID { - _, err := uuid.Parse(vid) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, VersionNotFound{ - Bucket: srcBucket, - Object: srcObject, - VersionID: vid, - }), r.URL) - return - } - } - - if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectAction, srcBucket, srcObject); s3Error != ErrNone { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) - return - } - - uploadID := r.Form.Get(xhttp.UploadID) - partIDString := r.Form.Get(xhttp.PartNumber) - - partID, err := strconv.Atoi(partIDString) - if err != nil { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPart), r.URL) - return - } - - // check partID with maximum part ID for multipart objects - if isMaxPartID(partID) { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMaxParts), r.URL) - return - } - - var srcOpts, dstOpts ObjectOptions - srcOpts, err = copySrcOpts(ctx, r, srcBucket, srcObject) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - srcOpts.VersionID = vid - - // convert copy src and dst encryption options for GET/PUT calls - getOpts := ObjectOptions{VersionID: srcOpts.VersionID} - if srcOpts.ServerSideEncryption != nil { - getOpts.ServerSideEncryption = encrypt.SSE(srcOpts.ServerSideEncryption) - } - - dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, nil) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - getObjectNInfo := objectAPI.GetObjectNInfo - if api.CacheAPI() != nil { - getObjectNInfo = api.CacheAPI().GetObjectNInfo - } - - // Get request range. - var rs *HTTPRangeSpec - var parseRangeErr error - if rangeHeader := r.Header.Get(xhttp.AmzCopySourceRange); rangeHeader != "" { - rs, parseRangeErr = parseCopyPartRangeSpec(rangeHeader) - } else { - // This check is to see if client specified a header but the value - // is empty for 'x-amz-copy-source-range' - _, ok := r.Header[xhttp.AmzCopySourceRange] - if ok { - parseRangeErr = errInvalidRange - } - } - - checkCopyPartPrecondFn := func(o ObjectInfo) bool { - if objectAPI.IsEncryptionSupported() { - if _, err := DecryptObjectInfo(&o, r); err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return true - } - } - if checkCopyObjectPartPreconditions(ctx, w, r, o) { - return true - } - if parseRangeErr != nil { - logger.LogIf(ctx, parseRangeErr) - writeCopyPartErr(ctx, w, parseRangeErr, r.URL) - // Range header mismatch is pre-condition like failure - // so return true to indicate Range precondition failed. - return true - } - return false - } - getOpts.CheckPrecondFn = checkCopyPartPrecondFn - gr, err := getObjectNInfo(ctx, srcBucket, srcObject, rs, r.Header, readLock, getOpts) - if err != nil { - if isErrPreconditionFailed(err) { - return - } - if globalBucketVersioningSys.PrefixEnabled(srcBucket, srcObject) && gr != nil { - // Versioning enabled quite possibly object is deleted might be delete-marker - // if present set the headers, no idea why AWS S3 sets these headers. - if gr.ObjInfo.VersionID != "" && gr.ObjInfo.DeleteMarker { - w.Header()[xhttp.AmzVersionID] = []string{gr.ObjInfo.VersionID} - w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(gr.ObjInfo.DeleteMarker)} - } - } - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - defer gr.Close() - srcInfo := gr.ObjInfo - - actualPartSize, err := srcInfo.GetActualSize() - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - if err := enforceBucketQuotaHard(ctx, dstBucket, actualPartSize); err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - // Special care for CopyObjectPart - if partRangeErr := checkCopyPartRangeWithSize(rs, actualPartSize); partRangeErr != nil { - writeCopyPartErr(ctx, w, partRangeErr, r.URL) - return - } - - // Get the object offset & length - startOffset, length, err := rs.GetOffsetLength(actualPartSize) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - // maximum copy size for multipart objects in a single operation - if isMaxAllowedPartSize(length) { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL) - return - } - - if isRemoteCopyRequired(ctx, srcBucket, dstBucket, objectAPI) { - var dstRecords []dns.SrvRecord - dstRecords, err = globalDNSConfig.Get(dstBucket) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - // Send PutObject request to appropriate instance (in federated deployment) - core, rerr := getRemoteInstanceClient(r, getHostFromSrv(dstRecords)) - if rerr != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, rerr), r.URL) - return - } - - partInfo, err := core.PutObjectPart(ctx, dstBucket, dstObject, uploadID, partID, gr, length, "", "", dstOpts.ServerSideEncryption) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - response := generateCopyObjectPartResponse(partInfo.ETag, partInfo.LastModified) - encodedSuccessResponse := encodeResponse(response) - - // Write success response. - writeSuccessResponseXML(w, encodedSuccessResponse) - return - } - - actualPartSize = length - var reader io.Reader = etag.NewReader(gr, nil) - - mi, err := objectAPI.GetMultipartInfo(ctx, dstBucket, dstObject, uploadID, dstOpts) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - // Read compression metadata preserved in the init multipart for the decision. - _, isCompressed := mi.UserDefined[ReservedMetadataPrefix+"compression"] - // Compress only if the compression is enabled during initial multipart. - var idxCb func() []byte - if isCompressed { - wantEncryption := objectAPI.IsEncryptionSupported() && crypto.Requested(r.Header) - s2c, cb := newS2CompressReader(reader, actualPartSize, wantEncryption) - idxCb = cb - defer s2c.Close() - reader = etag.Wrap(s2c, reader) - length = -1 - } - - srcInfo.Reader, err = hash.NewReader(reader, length, "", "", actualPartSize) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, mi.UserDefined) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - dstOpts.IndexCB = idxCb - - rawReader := srcInfo.Reader - pReader := NewPutObjReader(rawReader) - - _, isEncrypted := crypto.IsEncrypted(mi.UserDefined) - var objectEncryptionKey crypto.ObjectKey - if objectAPI.IsEncryptionSupported() && isEncrypted { - if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(mi.UserDefined) { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL) - return - } - if crypto.S3.IsEncrypted(mi.UserDefined) && crypto.SSEC.IsRequested(r.Header) { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL) - return - } - var key []byte - if crypto.SSEC.IsRequested(r.Header) { - key, err = ParseSSECustomerRequest(r) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - } - key, err = decryptObjectMeta(key, dstBucket, dstObject, mi.UserDefined) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - copy(objectEncryptionKey[:], key) - - partEncryptionKey := objectEncryptionKey.DerivePartKey(uint32(partID)) - encReader, err := sio.EncryptReader(reader, sio.Config{Key: partEncryptionKey[:], CipherSuites: fips.DARECiphers()}) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - reader = etag.Wrap(encReader, reader) - - wantSize := int64(-1) - if length >= 0 { - info := ObjectInfo{Size: length} - wantSize = info.EncryptedSize() - } - - srcInfo.Reader, err = hash.NewReader(reader, wantSize, "", "", actualPartSize) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - pReader, err = pReader.WithEncryption(srcInfo.Reader, &objectEncryptionKey) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - if dstOpts.IndexCB != nil { - dstOpts.IndexCB = compressionIndexEncrypter(objectEncryptionKey, dstOpts.IndexCB) - } - } - - srcInfo.PutObjReader = pReader - copyObjectPart := objectAPI.CopyObjectPart - if api.CacheAPI() != nil { - copyObjectPart = api.CacheAPI().CopyObjectPart - } - // Copy source object to destination, if source and destination - // object is same then only metadata is updated. - partInfo, err := copyObjectPart(ctx, srcBucket, srcObject, dstBucket, dstObject, uploadID, partID, - startOffset, length, srcInfo, srcOpts, dstOpts) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - - if isEncrypted { - partInfo.ETag = tryDecryptETag(objectEncryptionKey[:], partInfo.ETag, crypto.SSEC.IsRequested(r.Header)) - } - - response := generateCopyObjectPartResponse(partInfo.ETag, partInfo.LastModified) - encodedSuccessResponse := encodeResponse(response) - - // Write success response. - writeSuccessResponseXML(w, encodedSuccessResponse) -} - // Delete objectAPIHandlers // DeleteObjectHandler - delete an object diff --git a/cmd/object-multipart-handlers.go b/cmd/object-multipart-handlers.go index 773c8a2c9..b5cad8920 100644 --- a/cmd/object-multipart-handlers.go +++ b/cmd/object-multipart-handlers.go @@ -29,11 +29,14 @@ import ( "strings" "time" + "github.com/google/uuid" "github.com/gorilla/mux" + "github.com/minio/minio-go/v7/pkg/encrypt" "github.com/minio/minio-go/v7/pkg/tags" sse "github.com/minio/minio/internal/bucket/encryption" objectlock "github.com/minio/minio/internal/bucket/object/lock" "github.com/minio/minio/internal/bucket/replication" + "github.com/minio/minio/internal/config/dns" "github.com/minio/minio/internal/config/storageclass" "github.com/minio/minio/internal/crypto" "github.com/minio/minio/internal/etag" @@ -224,6 +227,344 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r writeSuccessResponseXML(w, encodedSuccessResponse) } +// CopyObjectPartHandler - uploads a part by copying data from an existing object as data source. +func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "CopyObjectPart") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) + return + } + + if crypto.S3KMS.IsRequested(r.Header) { // SSE-KMS is not supported + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + + if crypto.Requested(r.Header) && !objectAPI.IsEncryptionSupported() { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + + vars := mux.Vars(r) + dstBucket := vars["bucket"] + dstObject, err := unescapePath(vars["object"]) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectAction, dstBucket, dstObject); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + + // Read escaped copy source path to check for parameters. + cpSrcPath := r.Header.Get(xhttp.AmzCopySource) + var vid string + if u, err := url.Parse(cpSrcPath); err == nil { + vid = strings.TrimSpace(u.Query().Get(xhttp.VersionID)) + // Note that url.Parse does the unescaping + cpSrcPath = u.Path + } + + srcBucket, srcObject := path2BucketObject(cpSrcPath) + // If source object is empty or bucket is empty, reply back invalid copy source. + if srcObject == "" || srcBucket == "" { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL) + return + } + + if vid != "" && vid != nullVersionID { + _, err := uuid.Parse(vid) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, VersionNotFound{ + Bucket: srcBucket, + Object: srcObject, + VersionID: vid, + }), r.URL) + return + } + } + + if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectAction, srcBucket, srcObject); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + + uploadID := r.Form.Get(xhttp.UploadID) + partIDString := r.Form.Get(xhttp.PartNumber) + + partID, err := strconv.Atoi(partIDString) + if err != nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPart), r.URL) + return + } + + // check partID with maximum part ID for multipart objects + if isMaxPartID(partID) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMaxParts), r.URL) + return + } + + var srcOpts, dstOpts ObjectOptions + srcOpts, err = copySrcOpts(ctx, r, srcBucket, srcObject) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + srcOpts.VersionID = vid + + // convert copy src and dst encryption options for GET/PUT calls + getOpts := ObjectOptions{VersionID: srcOpts.VersionID} + if srcOpts.ServerSideEncryption != nil { + getOpts.ServerSideEncryption = encrypt.SSE(srcOpts.ServerSideEncryption) + } + + dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, nil) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + getObjectNInfo := objectAPI.GetObjectNInfo + if api.CacheAPI() != nil { + getObjectNInfo = api.CacheAPI().GetObjectNInfo + } + + // Get request range. + var rs *HTTPRangeSpec + var parseRangeErr error + if rangeHeader := r.Header.Get(xhttp.AmzCopySourceRange); rangeHeader != "" { + rs, parseRangeErr = parseCopyPartRangeSpec(rangeHeader) + } else { + // This check is to see if client specified a header but the value + // is empty for 'x-amz-copy-source-range' + _, ok := r.Header[xhttp.AmzCopySourceRange] + if ok { + parseRangeErr = errInvalidRange + } + } + + checkCopyPartPrecondFn := func(o ObjectInfo) bool { + if objectAPI.IsEncryptionSupported() { + if _, err := DecryptObjectInfo(&o, r); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return true + } + } + if checkCopyObjectPartPreconditions(ctx, w, r, o) { + return true + } + if parseRangeErr != nil { + logger.LogIf(ctx, parseRangeErr) + writeCopyPartErr(ctx, w, parseRangeErr, r.URL) + // Range header mismatch is pre-condition like failure + // so return true to indicate Range precondition failed. + return true + } + return false + } + getOpts.CheckPrecondFn = checkCopyPartPrecondFn + gr, err := getObjectNInfo(ctx, srcBucket, srcObject, rs, r.Header, readLock, getOpts) + if err != nil { + if isErrPreconditionFailed(err) { + return + } + if globalBucketVersioningSys.PrefixEnabled(srcBucket, srcObject) && gr != nil { + // Versioning enabled quite possibly object is deleted might be delete-marker + // if present set the headers, no idea why AWS S3 sets these headers. + if gr.ObjInfo.VersionID != "" && gr.ObjInfo.DeleteMarker { + w.Header()[xhttp.AmzVersionID] = []string{gr.ObjInfo.VersionID} + w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(gr.ObjInfo.DeleteMarker)} + } + } + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + defer gr.Close() + srcInfo := gr.ObjInfo + + actualPartSize, err := srcInfo.GetActualSize() + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + if err := enforceBucketQuotaHard(ctx, dstBucket, actualPartSize); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + // Special care for CopyObjectPart + if partRangeErr := checkCopyPartRangeWithSize(rs, actualPartSize); partRangeErr != nil { + writeCopyPartErr(ctx, w, partRangeErr, r.URL) + return + } + + // Get the object offset & length + startOffset, length, err := rs.GetOffsetLength(actualPartSize) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + // maximum copy size for multipart objects in a single operation + if isMaxObjectSize(length) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL) + return + } + + if isRemoteCopyRequired(ctx, srcBucket, dstBucket, objectAPI) { + var dstRecords []dns.SrvRecord + dstRecords, err = globalDNSConfig.Get(dstBucket) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + // Send PutObject request to appropriate instance (in federated deployment) + core, rerr := getRemoteInstanceClient(r, getHostFromSrv(dstRecords)) + if rerr != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, rerr), r.URL) + return + } + + partInfo, err := core.PutObjectPart(ctx, dstBucket, dstObject, uploadID, partID, gr, length, "", "", dstOpts.ServerSideEncryption) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + response := generateCopyObjectPartResponse(partInfo.ETag, partInfo.LastModified) + encodedSuccessResponse := encodeResponse(response) + + // Write success response. + writeSuccessResponseXML(w, encodedSuccessResponse) + return + } + + actualPartSize = length + var reader io.Reader = etag.NewReader(gr, nil) + + mi, err := objectAPI.GetMultipartInfo(ctx, dstBucket, dstObject, uploadID, dstOpts) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + // Read compression metadata preserved in the init multipart for the decision. + _, isCompressed := mi.UserDefined[ReservedMetadataPrefix+"compression"] + // Compress only if the compression is enabled during initial multipart. + var idxCb func() []byte + if isCompressed { + wantEncryption := objectAPI.IsEncryptionSupported() && crypto.Requested(r.Header) + s2c, cb := newS2CompressReader(reader, actualPartSize, wantEncryption) + idxCb = cb + defer s2c.Close() + reader = etag.Wrap(s2c, reader) + length = -1 + } + + srcInfo.Reader, err = hash.NewReader(reader, length, "", "", actualPartSize) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, mi.UserDefined) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + dstOpts.IndexCB = idxCb + + rawReader := srcInfo.Reader + pReader := NewPutObjReader(rawReader) + + _, isEncrypted := crypto.IsEncrypted(mi.UserDefined) + var objectEncryptionKey crypto.ObjectKey + if objectAPI.IsEncryptionSupported() && isEncrypted { + if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(mi.UserDefined) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL) + return + } + if crypto.S3.IsEncrypted(mi.UserDefined) && crypto.SSEC.IsRequested(r.Header) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL) + return + } + var key []byte + if crypto.SSEC.IsRequested(r.Header) { + key, err = ParseSSECustomerRequest(r) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + } + key, err = decryptObjectMeta(key, dstBucket, dstObject, mi.UserDefined) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + copy(objectEncryptionKey[:], key) + + partEncryptionKey := objectEncryptionKey.DerivePartKey(uint32(partID)) + encReader, err := sio.EncryptReader(reader, sio.Config{Key: partEncryptionKey[:], CipherSuites: fips.DARECiphers()}) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + reader = etag.Wrap(encReader, reader) + + wantSize := int64(-1) + if length >= 0 { + info := ObjectInfo{Size: length} + wantSize = info.EncryptedSize() + } + + srcInfo.Reader, err = hash.NewReader(reader, wantSize, "", "", actualPartSize) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + pReader, err = pReader.WithEncryption(srcInfo.Reader, &objectEncryptionKey) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + if dstOpts.IndexCB != nil { + dstOpts.IndexCB = compressionIndexEncrypter(objectEncryptionKey, dstOpts.IndexCB) + } + } + + srcInfo.PutObjReader = pReader + copyObjectPart := objectAPI.CopyObjectPart + if api.CacheAPI() != nil { + copyObjectPart = api.CacheAPI().CopyObjectPart + } + // Copy source object to destination, if source and destination + // object is same then only metadata is updated. + partInfo, err := copyObjectPart(ctx, srcBucket, srcObject, dstBucket, dstObject, uploadID, partID, + startOffset, length, srcInfo, srcOpts, dstOpts) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + if isEncrypted { + partInfo.ETag = tryDecryptETag(objectEncryptionKey[:], partInfo.ETag, crypto.SSEC.IsRequested(r.Header)) + } + + response := generateCopyObjectPartResponse(partInfo.ETag, partInfo.LastModified) + encodedSuccessResponse := encodeResponse(response) + + // Write success response. + writeSuccessResponseXML(w, encodedSuccessResponse) +} + // PutObjectPartHandler - uploads an incoming part for an ongoing multipart operation. func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "PutObjectPart") @@ -286,12 +627,6 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http return } - // maximum Upload size for multipart objects in a single operation - if isMaxAllowedPartSize(size) { - writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL) - return - } - uploadID := r.Form.Get(xhttp.UploadID) partIDString := r.Form.Get(xhttp.PartNumber) @@ -301,6 +636,12 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http return } + // maximum size for multipart objects in a single operation + if isMaxObjectSize(size) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL) + return + } + // check partID with maximum part ID for multipart objects if isMaxPartID(partID) { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMaxParts), r.URL) diff --git a/cmd/utils.go b/cmd/utils.go index 74086ea8d..1d94dfc8d 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -258,9 +258,6 @@ const ( // Minimum Part size for multipart upload is 5MiB globalMinPartSize = 5 * humanize.MiByte - // Maximum Part size for multipart upload is 5GiB - globalMaxPartSize = 5 * humanize.GiByte - // Maximum Part ID for multipart upload is 10000 // (Acceptable values range from 1 to 10000 inclusive) globalMaxPartID = 10000 @@ -271,11 +268,6 @@ func isMaxObjectSize(size int64) bool { return size > globalMaxObjectSize } -// // Check if part size is more than maximum allowed size. -func isMaxAllowedPartSize(size int64) bool { - return size > globalMaxPartSize -} - // Check if part size is more than or equal to minimum allowed size. func isMinAllowedPartSize(size int64) bool { return size >= globalMinPartSize