do not rename multipart failed transactions back to tmp (#16204)

This commit is contained in:
Harshavardhana 2022-12-12 01:40:29 -08:00 committed by GitHub
parent 20ef5e7a6a
commit 444ff20bc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 352 additions and 379 deletions

View File

@ -473,24 +473,6 @@ func (er erasureObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObjec
return partInfo, nil return partInfo, nil
} }
func undoRenamePart(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, errs []error) {
// Undo rename object on disks where RenameFile succeeded.
g := errgroup.WithNErrs(len(disks))
for index, disk := range disks {
if disk == nil {
continue
}
index := index
g.Go(func() error {
if errs[index] == nil {
_ = disks[index].RenameFile(context.TODO(), dstBucket, dstEntry, srcBucket, srcEntry)
}
return nil
}, index)
}
g.Wait()
}
// renamePart - renames multipart part to its relevant location under uploadID. // renamePart - renames multipart part to its relevant location under uploadID.
func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, writeQuorum int) ([]StorageAPI, error) { func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, writeQuorum int) ([]StorageAPI, error) {
g := errgroup.WithNErrs(len(disks)) g := errgroup.WithNErrs(len(disks))
@ -509,15 +491,13 @@ func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, ds
// Wait for all renames to finish. // Wait for all renames to finish.
errs := g.Wait() errs := g.Wait()
// Do not need to undo partial successful operation since those will be cleaned up
// in 24hrs via multipart cleaner, never rename() back to `.minio.sys/tmp` as there
// is no way to clean them.
// We can safely allow RenameFile errors up to len(er.getDisks()) - writeQuorum // We can safely allow RenameFile errors up to len(er.getDisks()) - writeQuorum
// otherwise return failure. Cleanup successful renames. // otherwise return failure. Cleanup successful renames.
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) return evalDisks(disks, errs), reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
if err == errErasureWriteQuorum {
// Undo all the partial rename operations.
undoRenamePart(disks, srcBucket, srcEntry, dstBucket, dstEntry, errs)
}
return evalDisks(disks, errs), err
} }
// writeAllDisks - writes 'b' to all provided disks. // writeAllDisks - writes 'b' to all provided disks.

View File

@ -49,7 +49,6 @@ import (
"github.com/minio/minio/internal/crypto" "github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/etag" "github.com/minio/minio/internal/etag"
"github.com/minio/minio/internal/event" "github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/fips"
"github.com/minio/minio/internal/handlers" "github.com/minio/minio/internal/handlers"
"github.com/minio/minio/internal/hash" "github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http" xhttp "github.com/minio/minio/internal/http"
@ -60,7 +59,6 @@ import (
"github.com/minio/pkg/bucket/policy" "github.com/minio/pkg/bucket/policy"
iampolicy "github.com/minio/pkg/iam/policy" iampolicy "github.com/minio/pkg/iam/policy"
xnet "github.com/minio/pkg/net" xnet "github.com/minio/pkg/net"
"github.com/minio/sio"
) )
// supportedHeadGetReqParams - supported request parameters for GET and HEAD presigned request. // supportedHeadGetReqParams - supported request parameters for GET and HEAD presigned request.
@ -2293,344 +2291,6 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
writeSuccessResponseHeadersOnly(w) writeSuccessResponseHeadersOnly(w)
} }
// CopyObjectPartHandler - uploads a part by copying data from an existing object as data source.
func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "CopyObjectPart")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
if crypto.S3KMS.IsRequested(r.Header) { // SSE-KMS is not supported
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
if crypto.Requested(r.Header) && !objectAPI.IsEncryptionSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
vars := mux.Vars(r)
dstBucket := vars["bucket"]
dstObject, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectAction, dstBucket, dstObject); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
// Read escaped copy source path to check for parameters.
cpSrcPath := r.Header.Get(xhttp.AmzCopySource)
var vid string
if u, err := url.Parse(cpSrcPath); err == nil {
vid = strings.TrimSpace(u.Query().Get(xhttp.VersionID))
// Note that url.Parse does the unescaping
cpSrcPath = u.Path
}
srcBucket, srcObject := path2BucketObject(cpSrcPath)
// If source object is empty or bucket is empty, reply back invalid copy source.
if srcObject == "" || srcBucket == "" {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL)
return
}
if vid != "" && vid != nullVersionID {
_, err := uuid.Parse(vid)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, VersionNotFound{
Bucket: srcBucket,
Object: srcObject,
VersionID: vid,
}), r.URL)
return
}
}
if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectAction, srcBucket, srcObject); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
uploadID := r.Form.Get(xhttp.UploadID)
partIDString := r.Form.Get(xhttp.PartNumber)
partID, err := strconv.Atoi(partIDString)
if err != nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPart), r.URL)
return
}
// check partID with maximum part ID for multipart objects
if isMaxPartID(partID) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMaxParts), r.URL)
return
}
var srcOpts, dstOpts ObjectOptions
srcOpts, err = copySrcOpts(ctx, r, srcBucket, srcObject)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
srcOpts.VersionID = vid
// convert copy src and dst encryption options for GET/PUT calls
getOpts := ObjectOptions{VersionID: srcOpts.VersionID}
if srcOpts.ServerSideEncryption != nil {
getOpts.ServerSideEncryption = encrypt.SSE(srcOpts.ServerSideEncryption)
}
dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, nil)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
getObjectNInfo := objectAPI.GetObjectNInfo
if api.CacheAPI() != nil {
getObjectNInfo = api.CacheAPI().GetObjectNInfo
}
// Get request range.
var rs *HTTPRangeSpec
var parseRangeErr error
if rangeHeader := r.Header.Get(xhttp.AmzCopySourceRange); rangeHeader != "" {
rs, parseRangeErr = parseCopyPartRangeSpec(rangeHeader)
} else {
// This check is to see if client specified a header but the value
// is empty for 'x-amz-copy-source-range'
_, ok := r.Header[xhttp.AmzCopySourceRange]
if ok {
parseRangeErr = errInvalidRange
}
}
checkCopyPartPrecondFn := func(o ObjectInfo) bool {
if objectAPI.IsEncryptionSupported() {
if _, err := DecryptObjectInfo(&o, r); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return true
}
}
if checkCopyObjectPartPreconditions(ctx, w, r, o) {
return true
}
if parseRangeErr != nil {
logger.LogIf(ctx, parseRangeErr)
writeCopyPartErr(ctx, w, parseRangeErr, r.URL)
// Range header mismatch is pre-condition like failure
// so return true to indicate Range precondition failed.
return true
}
return false
}
getOpts.CheckPrecondFn = checkCopyPartPrecondFn
gr, err := getObjectNInfo(ctx, srcBucket, srcObject, rs, r.Header, readLock, getOpts)
if err != nil {
if isErrPreconditionFailed(err) {
return
}
if globalBucketVersioningSys.PrefixEnabled(srcBucket, srcObject) && gr != nil {
// Versioning enabled quite possibly object is deleted might be delete-marker
// if present set the headers, no idea why AWS S3 sets these headers.
if gr.ObjInfo.VersionID != "" && gr.ObjInfo.DeleteMarker {
w.Header()[xhttp.AmzVersionID] = []string{gr.ObjInfo.VersionID}
w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(gr.ObjInfo.DeleteMarker)}
}
}
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
defer gr.Close()
srcInfo := gr.ObjInfo
actualPartSize, err := srcInfo.GetActualSize()
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if err := enforceBucketQuotaHard(ctx, dstBucket, actualPartSize); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Special care for CopyObjectPart
if partRangeErr := checkCopyPartRangeWithSize(rs, actualPartSize); partRangeErr != nil {
writeCopyPartErr(ctx, w, partRangeErr, r.URL)
return
}
// Get the object offset & length
startOffset, length, err := rs.GetOffsetLength(actualPartSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// maximum copy size for multipart objects in a single operation
if isMaxAllowedPartSize(length) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL)
return
}
if isRemoteCopyRequired(ctx, srcBucket, dstBucket, objectAPI) {
var dstRecords []dns.SrvRecord
dstRecords, err = globalDNSConfig.Get(dstBucket)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Send PutObject request to appropriate instance (in federated deployment)
core, rerr := getRemoteInstanceClient(r, getHostFromSrv(dstRecords))
if rerr != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, rerr), r.URL)
return
}
partInfo, err := core.PutObjectPart(ctx, dstBucket, dstObject, uploadID, partID, gr, length, "", "", dstOpts.ServerSideEncryption)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
response := generateCopyObjectPartResponse(partInfo.ETag, partInfo.LastModified)
encodedSuccessResponse := encodeResponse(response)
// Write success response.
writeSuccessResponseXML(w, encodedSuccessResponse)
return
}
actualPartSize = length
var reader io.Reader = etag.NewReader(gr, nil)
mi, err := objectAPI.GetMultipartInfo(ctx, dstBucket, dstObject, uploadID, dstOpts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Read compression metadata preserved in the init multipart for the decision.
_, isCompressed := mi.UserDefined[ReservedMetadataPrefix+"compression"]
// Compress only if the compression is enabled during initial multipart.
var idxCb func() []byte
if isCompressed {
wantEncryption := objectAPI.IsEncryptionSupported() && crypto.Requested(r.Header)
s2c, cb := newS2CompressReader(reader, actualPartSize, wantEncryption)
idxCb = cb
defer s2c.Close()
reader = etag.Wrap(s2c, reader)
length = -1
}
srcInfo.Reader, err = hash.NewReader(reader, length, "", "", actualPartSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, mi.UserDefined)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
dstOpts.IndexCB = idxCb
rawReader := srcInfo.Reader
pReader := NewPutObjReader(rawReader)
_, isEncrypted := crypto.IsEncrypted(mi.UserDefined)
var objectEncryptionKey crypto.ObjectKey
if objectAPI.IsEncryptionSupported() && isEncrypted {
if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(mi.UserDefined) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL)
return
}
if crypto.S3.IsEncrypted(mi.UserDefined) && crypto.SSEC.IsRequested(r.Header) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL)
return
}
var key []byte
if crypto.SSEC.IsRequested(r.Header) {
key, err = ParseSSECustomerRequest(r)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
}
key, err = decryptObjectMeta(key, dstBucket, dstObject, mi.UserDefined)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
copy(objectEncryptionKey[:], key)
partEncryptionKey := objectEncryptionKey.DerivePartKey(uint32(partID))
encReader, err := sio.EncryptReader(reader, sio.Config{Key: partEncryptionKey[:], CipherSuites: fips.DARECiphers()})
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
reader = etag.Wrap(encReader, reader)
wantSize := int64(-1)
if length >= 0 {
info := ObjectInfo{Size: length}
wantSize = info.EncryptedSize()
}
srcInfo.Reader, err = hash.NewReader(reader, wantSize, "", "", actualPartSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
pReader, err = pReader.WithEncryption(srcInfo.Reader, &objectEncryptionKey)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if dstOpts.IndexCB != nil {
dstOpts.IndexCB = compressionIndexEncrypter(objectEncryptionKey, dstOpts.IndexCB)
}
}
srcInfo.PutObjReader = pReader
copyObjectPart := objectAPI.CopyObjectPart
if api.CacheAPI() != nil {
copyObjectPart = api.CacheAPI().CopyObjectPart
}
// Copy source object to destination, if source and destination
// object is same then only metadata is updated.
partInfo, err := copyObjectPart(ctx, srcBucket, srcObject, dstBucket, dstObject, uploadID, partID,
startOffset, length, srcInfo, srcOpts, dstOpts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if isEncrypted {
partInfo.ETag = tryDecryptETag(objectEncryptionKey[:], partInfo.ETag, crypto.SSEC.IsRequested(r.Header))
}
response := generateCopyObjectPartResponse(partInfo.ETag, partInfo.LastModified)
encodedSuccessResponse := encodeResponse(response)
// Write success response.
writeSuccessResponseXML(w, encodedSuccessResponse)
}
// Delete objectAPIHandlers // Delete objectAPIHandlers
// DeleteObjectHandler - delete an object // DeleteObjectHandler - delete an object

View File

@ -29,11 +29,14 @@ import (
"strings" "strings"
"time" "time"
"github.com/google/uuid"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio-go/v7/pkg/tags"
sse "github.com/minio/minio/internal/bucket/encryption" sse "github.com/minio/minio/internal/bucket/encryption"
objectlock "github.com/minio/minio/internal/bucket/object/lock" objectlock "github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/bucket/replication" "github.com/minio/minio/internal/bucket/replication"
"github.com/minio/minio/internal/config/dns"
"github.com/minio/minio/internal/config/storageclass" "github.com/minio/minio/internal/config/storageclass"
"github.com/minio/minio/internal/crypto" "github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/etag" "github.com/minio/minio/internal/etag"
@ -224,6 +227,344 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
writeSuccessResponseXML(w, encodedSuccessResponse) writeSuccessResponseXML(w, encodedSuccessResponse)
} }
// CopyObjectPartHandler - uploads a part by copying data from an existing object as data source.
func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "CopyObjectPart")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
if crypto.S3KMS.IsRequested(r.Header) { // SSE-KMS is not supported
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
if crypto.Requested(r.Header) && !objectAPI.IsEncryptionSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
vars := mux.Vars(r)
dstBucket := vars["bucket"]
dstObject, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectAction, dstBucket, dstObject); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
// Read escaped copy source path to check for parameters.
cpSrcPath := r.Header.Get(xhttp.AmzCopySource)
var vid string
if u, err := url.Parse(cpSrcPath); err == nil {
vid = strings.TrimSpace(u.Query().Get(xhttp.VersionID))
// Note that url.Parse does the unescaping
cpSrcPath = u.Path
}
srcBucket, srcObject := path2BucketObject(cpSrcPath)
// If source object is empty or bucket is empty, reply back invalid copy source.
if srcObject == "" || srcBucket == "" {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL)
return
}
if vid != "" && vid != nullVersionID {
_, err := uuid.Parse(vid)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, VersionNotFound{
Bucket: srcBucket,
Object: srcObject,
VersionID: vid,
}), r.URL)
return
}
}
if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectAction, srcBucket, srcObject); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
uploadID := r.Form.Get(xhttp.UploadID)
partIDString := r.Form.Get(xhttp.PartNumber)
partID, err := strconv.Atoi(partIDString)
if err != nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPart), r.URL)
return
}
// check partID with maximum part ID for multipart objects
if isMaxPartID(partID) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMaxParts), r.URL)
return
}
var srcOpts, dstOpts ObjectOptions
srcOpts, err = copySrcOpts(ctx, r, srcBucket, srcObject)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
srcOpts.VersionID = vid
// convert copy src and dst encryption options for GET/PUT calls
getOpts := ObjectOptions{VersionID: srcOpts.VersionID}
if srcOpts.ServerSideEncryption != nil {
getOpts.ServerSideEncryption = encrypt.SSE(srcOpts.ServerSideEncryption)
}
dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, nil)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
getObjectNInfo := objectAPI.GetObjectNInfo
if api.CacheAPI() != nil {
getObjectNInfo = api.CacheAPI().GetObjectNInfo
}
// Get request range.
var rs *HTTPRangeSpec
var parseRangeErr error
if rangeHeader := r.Header.Get(xhttp.AmzCopySourceRange); rangeHeader != "" {
rs, parseRangeErr = parseCopyPartRangeSpec(rangeHeader)
} else {
// This check is to see if client specified a header but the value
// is empty for 'x-amz-copy-source-range'
_, ok := r.Header[xhttp.AmzCopySourceRange]
if ok {
parseRangeErr = errInvalidRange
}
}
checkCopyPartPrecondFn := func(o ObjectInfo) bool {
if objectAPI.IsEncryptionSupported() {
if _, err := DecryptObjectInfo(&o, r); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return true
}
}
if checkCopyObjectPartPreconditions(ctx, w, r, o) {
return true
}
if parseRangeErr != nil {
logger.LogIf(ctx, parseRangeErr)
writeCopyPartErr(ctx, w, parseRangeErr, r.URL)
// Range header mismatch is pre-condition like failure
// so return true to indicate Range precondition failed.
return true
}
return false
}
getOpts.CheckPrecondFn = checkCopyPartPrecondFn
gr, err := getObjectNInfo(ctx, srcBucket, srcObject, rs, r.Header, readLock, getOpts)
if err != nil {
if isErrPreconditionFailed(err) {
return
}
if globalBucketVersioningSys.PrefixEnabled(srcBucket, srcObject) && gr != nil {
// Versioning enabled quite possibly object is deleted might be delete-marker
// if present set the headers, no idea why AWS S3 sets these headers.
if gr.ObjInfo.VersionID != "" && gr.ObjInfo.DeleteMarker {
w.Header()[xhttp.AmzVersionID] = []string{gr.ObjInfo.VersionID}
w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(gr.ObjInfo.DeleteMarker)}
}
}
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
defer gr.Close()
srcInfo := gr.ObjInfo
actualPartSize, err := srcInfo.GetActualSize()
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if err := enforceBucketQuotaHard(ctx, dstBucket, actualPartSize); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Special care for CopyObjectPart
if partRangeErr := checkCopyPartRangeWithSize(rs, actualPartSize); partRangeErr != nil {
writeCopyPartErr(ctx, w, partRangeErr, r.URL)
return
}
// Get the object offset & length
startOffset, length, err := rs.GetOffsetLength(actualPartSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// maximum copy size for multipart objects in a single operation
if isMaxObjectSize(length) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL)
return
}
if isRemoteCopyRequired(ctx, srcBucket, dstBucket, objectAPI) {
var dstRecords []dns.SrvRecord
dstRecords, err = globalDNSConfig.Get(dstBucket)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Send PutObject request to appropriate instance (in federated deployment)
core, rerr := getRemoteInstanceClient(r, getHostFromSrv(dstRecords))
if rerr != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, rerr), r.URL)
return
}
partInfo, err := core.PutObjectPart(ctx, dstBucket, dstObject, uploadID, partID, gr, length, "", "", dstOpts.ServerSideEncryption)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
response := generateCopyObjectPartResponse(partInfo.ETag, partInfo.LastModified)
encodedSuccessResponse := encodeResponse(response)
// Write success response.
writeSuccessResponseXML(w, encodedSuccessResponse)
return
}
actualPartSize = length
var reader io.Reader = etag.NewReader(gr, nil)
mi, err := objectAPI.GetMultipartInfo(ctx, dstBucket, dstObject, uploadID, dstOpts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Read compression metadata preserved in the init multipart for the decision.
_, isCompressed := mi.UserDefined[ReservedMetadataPrefix+"compression"]
// Compress only if the compression is enabled during initial multipart.
var idxCb func() []byte
if isCompressed {
wantEncryption := objectAPI.IsEncryptionSupported() && crypto.Requested(r.Header)
s2c, cb := newS2CompressReader(reader, actualPartSize, wantEncryption)
idxCb = cb
defer s2c.Close()
reader = etag.Wrap(s2c, reader)
length = -1
}
srcInfo.Reader, err = hash.NewReader(reader, length, "", "", actualPartSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, mi.UserDefined)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
dstOpts.IndexCB = idxCb
rawReader := srcInfo.Reader
pReader := NewPutObjReader(rawReader)
_, isEncrypted := crypto.IsEncrypted(mi.UserDefined)
var objectEncryptionKey crypto.ObjectKey
if objectAPI.IsEncryptionSupported() && isEncrypted {
if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(mi.UserDefined) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL)
return
}
if crypto.S3.IsEncrypted(mi.UserDefined) && crypto.SSEC.IsRequested(r.Header) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL)
return
}
var key []byte
if crypto.SSEC.IsRequested(r.Header) {
key, err = ParseSSECustomerRequest(r)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
}
key, err = decryptObjectMeta(key, dstBucket, dstObject, mi.UserDefined)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
copy(objectEncryptionKey[:], key)
partEncryptionKey := objectEncryptionKey.DerivePartKey(uint32(partID))
encReader, err := sio.EncryptReader(reader, sio.Config{Key: partEncryptionKey[:], CipherSuites: fips.DARECiphers()})
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
reader = etag.Wrap(encReader, reader)
wantSize := int64(-1)
if length >= 0 {
info := ObjectInfo{Size: length}
wantSize = info.EncryptedSize()
}
srcInfo.Reader, err = hash.NewReader(reader, wantSize, "", "", actualPartSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
pReader, err = pReader.WithEncryption(srcInfo.Reader, &objectEncryptionKey)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if dstOpts.IndexCB != nil {
dstOpts.IndexCB = compressionIndexEncrypter(objectEncryptionKey, dstOpts.IndexCB)
}
}
srcInfo.PutObjReader = pReader
copyObjectPart := objectAPI.CopyObjectPart
if api.CacheAPI() != nil {
copyObjectPart = api.CacheAPI().CopyObjectPart
}
// Copy source object to destination, if source and destination
// object is same then only metadata is updated.
partInfo, err := copyObjectPart(ctx, srcBucket, srcObject, dstBucket, dstObject, uploadID, partID,
startOffset, length, srcInfo, srcOpts, dstOpts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if isEncrypted {
partInfo.ETag = tryDecryptETag(objectEncryptionKey[:], partInfo.ETag, crypto.SSEC.IsRequested(r.Header))
}
response := generateCopyObjectPartResponse(partInfo.ETag, partInfo.LastModified)
encodedSuccessResponse := encodeResponse(response)
// Write success response.
writeSuccessResponseXML(w, encodedSuccessResponse)
}
// PutObjectPartHandler - uploads an incoming part for an ongoing multipart operation. // PutObjectPartHandler - uploads an incoming part for an ongoing multipart operation.
func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) { func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "PutObjectPart") ctx := newContext(r, w, "PutObjectPart")
@ -286,12 +627,6 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
return return
} }
// maximum Upload size for multipart objects in a single operation
if isMaxAllowedPartSize(size) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL)
return
}
uploadID := r.Form.Get(xhttp.UploadID) uploadID := r.Form.Get(xhttp.UploadID)
partIDString := r.Form.Get(xhttp.PartNumber) partIDString := r.Form.Get(xhttp.PartNumber)
@ -301,6 +636,12 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
return return
} }
// maximum size for multipart objects in a single operation
if isMaxObjectSize(size) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL)
return
}
// check partID with maximum part ID for multipart objects // check partID with maximum part ID for multipart objects
if isMaxPartID(partID) { if isMaxPartID(partID) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMaxParts), r.URL) writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMaxParts), r.URL)

View File

@ -258,9 +258,6 @@ const (
// Minimum Part size for multipart upload is 5MiB // Minimum Part size for multipart upload is 5MiB
globalMinPartSize = 5 * humanize.MiByte globalMinPartSize = 5 * humanize.MiByte
// Maximum Part size for multipart upload is 5GiB
globalMaxPartSize = 5 * humanize.GiByte
// Maximum Part ID for multipart upload is 10000 // Maximum Part ID for multipart upload is 10000
// (Acceptable values range from 1 to 10000 inclusive) // (Acceptable values range from 1 to 10000 inclusive)
globalMaxPartID = 10000 globalMaxPartID = 10000
@ -271,11 +268,6 @@ func isMaxObjectSize(size int64) bool {
return size > globalMaxObjectSize return size > globalMaxObjectSize
} }
// // Check if part size is more than maximum allowed size.
func isMaxAllowedPartSize(size int64) bool {
return size > globalMaxPartSize
}
// Check if part size is more than or equal to minimum allowed size. // Check if part size is more than or equal to minimum allowed size.
func isMinAllowedPartSize(size int64) bool { func isMinAllowedPartSize(size int64) bool {
return size >= globalMinPartSize return size >= globalMinPartSize