mirror of
https://github.com/minio/minio.git
synced 2024-12-25 22:55:54 -05:00
331bdc2245
sending whitespace character with CompleteMultipartUpload() with 200 OK was an AWS S3 compatible implementation detail, and it was expected that the client SDK must look for both successful XML as well as error XML for 200 OK. But this is not useful anymore on MinIO, since we do not have any large delayed coalescing of parts anymore.
1159 lines
37 KiB
Go
1159 lines
37 KiB
Go
// Copyright (c) 2015-2023 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bufio"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/minio/minio-go/v7"
|
|
"github.com/minio/minio-go/v7/pkg/encrypt"
|
|
"github.com/minio/minio-go/v7/pkg/tags"
|
|
"github.com/minio/minio/internal/amztime"
|
|
sse "github.com/minio/minio/internal/bucket/encryption"
|
|
objectlock "github.com/minio/minio/internal/bucket/object/lock"
|
|
"github.com/minio/minio/internal/bucket/replication"
|
|
"github.com/minio/minio/internal/config/dns"
|
|
"github.com/minio/minio/internal/config/storageclass"
|
|
"github.com/minio/minio/internal/crypto"
|
|
"github.com/minio/minio/internal/etag"
|
|
"github.com/minio/minio/internal/event"
|
|
"github.com/minio/minio/internal/fips"
|
|
"github.com/minio/minio/internal/handlers"
|
|
"github.com/minio/minio/internal/hash"
|
|
xhttp "github.com/minio/minio/internal/http"
|
|
"github.com/minio/minio/internal/logger"
|
|
"github.com/minio/mux"
|
|
"github.com/minio/pkg/bucket/policy"
|
|
iampolicy "github.com/minio/pkg/iam/policy"
|
|
"github.com/minio/sio"
|
|
)
|
|
|
|
// Multipart objectAPIHandlers
|
|
|
|
// NewMultipartUploadHandler - New multipart upload.
|
|
// Notice: The S3 client can send secret keys in headers for encryption related jobs,
|
|
// the handler should ensure to remove these keys before sending them to the object layer.
|
|
// Currently these keys are:
|
|
// - X-Amz-Server-Side-Encryption-Customer-Key
|
|
// - X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key
|
|
func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
|
ctx := newContext(r, w, "NewMultipartUpload")
|
|
|
|
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
|
|
|
objectAPI := api.ObjectAPI()
|
|
if objectAPI == nil {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
|
return
|
|
}
|
|
|
|
vars := mux.Vars(r)
|
|
bucket := vars["bucket"]
|
|
object, err := unescapePath(vars["object"])
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectAction, bucket, object); s3Error != ErrNone {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
return
|
|
}
|
|
|
|
// Check if bucket encryption is enabled
|
|
sseConfig, _ := globalBucketSSEConfigSys.Get(bucket)
|
|
sseConfig.Apply(r.Header, sse.ApplyOptions{
|
|
AutoEncrypt: globalAutoEncryption,
|
|
})
|
|
|
|
// Validate storage class metadata if present
|
|
if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" {
|
|
if !storageclass.IsValid(sc) {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidStorageClass), r.URL)
|
|
return
|
|
}
|
|
}
|
|
|
|
encMetadata := map[string]string{}
|
|
|
|
if crypto.Requested(r.Header) {
|
|
if crypto.SSECopy.IsRequested(r.Header) {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, errInvalidEncryptionParameters), r.URL)
|
|
return
|
|
}
|
|
|
|
if crypto.SSEC.IsRequested(r.Header) && crypto.S3.IsRequested(r.Header) {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, crypto.ErrIncompatibleEncryptionMethod), r.URL)
|
|
return
|
|
}
|
|
|
|
if crypto.SSEC.IsRequested(r.Header) && crypto.S3KMS.IsRequested(r.Header) {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, crypto.ErrIncompatibleEncryptionMethod), r.URL)
|
|
return
|
|
}
|
|
|
|
if crypto.SSEC.IsRequested(r.Header) && isReplicationEnabled(ctx, bucket) {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, errInvalidEncryptionParametersSSEC), r.URL)
|
|
return
|
|
}
|
|
|
|
if err = setEncryptionMetadata(r, bucket, object, encMetadata); err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
// Set this for multipart only operations, we need to differentiate during
|
|
// decryption if the file was actually multipart or not.
|
|
encMetadata[ReservedMetadataPrefix+"Encrypted-Multipart"] = ""
|
|
}
|
|
|
|
// Extract metadata that needs to be saved.
|
|
metadata, err := extractMetadata(ctx, r)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
if objTags := r.Header.Get(xhttp.AmzObjectTagging); objTags != "" {
|
|
if _, err := tags.ParseObjectTags(objTags); err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
metadata[xhttp.AmzObjectTagging] = objTags
|
|
}
|
|
|
|
retPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectRetentionAction)
|
|
holdPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectLegalHoldAction)
|
|
|
|
getObjectInfo := objectAPI.GetObjectInfo
|
|
if api.CacheAPI() != nil {
|
|
getObjectInfo = api.CacheAPI().GetObjectInfo
|
|
}
|
|
|
|
retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, getObjectInfo, retPerms, holdPerms)
|
|
if s3Err == ErrNone && retentionMode.Valid() {
|
|
metadata[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode)
|
|
metadata[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = amztime.ISO8601Format(retentionDate.UTC())
|
|
}
|
|
if s3Err == ErrNone && legalHold.Status.Valid() {
|
|
metadata[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = string(legalHold.Status)
|
|
}
|
|
if s3Err != ErrNone {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
|
|
return
|
|
}
|
|
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(ObjectInfo{
|
|
UserDefined: metadata,
|
|
}, replication.ObjectReplicationType, ObjectOptions{})); dsc.ReplicateAny() {
|
|
metadata[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
|
|
metadata[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
|
|
}
|
|
|
|
// We need to preserve the encryption headers set in EncryptRequest,
|
|
// so we do not want to override them, copy them instead.
|
|
for k, v := range encMetadata {
|
|
metadata[k] = v
|
|
}
|
|
|
|
// Ensure that metadata does not contain sensitive information
|
|
crypto.RemoveSensitiveEntries(metadata)
|
|
|
|
if isCompressible(r.Header, object) {
|
|
// Storing the compression metadata.
|
|
metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2
|
|
}
|
|
|
|
opts, err := putOpts(ctx, r, bucket, object, metadata)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
if !opts.MTime.IsZero() && opts.PreserveETag != "" {
|
|
opts.CheckPrecondFn = func(oi ObjectInfo) bool {
|
|
if _, err := DecryptObjectInfo(&oi, r); err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return true
|
|
}
|
|
return checkPreconditionsPUT(ctx, w, r, oi, opts)
|
|
}
|
|
}
|
|
|
|
checksumType := hash.NewChecksumType(r.Header.Get(xhttp.AmzChecksumAlgo))
|
|
if checksumType.Is(hash.ChecksumInvalid) {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequestParameter), r.URL)
|
|
return
|
|
} else if checksumType.IsSet() && !checksumType.Is(hash.ChecksumTrailing) {
|
|
opts.WantChecksum = &hash.Checksum{Type: checksumType}
|
|
}
|
|
|
|
newMultipartUpload := objectAPI.NewMultipartUpload
|
|
if api.CacheAPI() != nil {
|
|
newMultipartUpload = api.CacheAPI().NewMultipartUpload
|
|
}
|
|
|
|
res, err := newMultipartUpload(ctx, bucket, object, opts)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
response := generateInitiateMultipartUploadResponse(bucket, object, res.UploadID)
|
|
if res.ChecksumAlgo != "" {
|
|
w.Header().Set(xhttp.AmzChecksumAlgo, res.ChecksumAlgo)
|
|
}
|
|
encodedSuccessResponse := encodeResponse(response)
|
|
|
|
// Write success response.
|
|
writeSuccessResponseXML(w, encodedSuccessResponse)
|
|
}
|
|
|
|
// CopyObjectPartHandler - uploads a part by copying data from an existing object as data source.
|
|
func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *http.Request) {
|
|
ctx := newContext(r, w, "CopyObjectPart")
|
|
|
|
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
|
|
|
objectAPI := api.ObjectAPI()
|
|
if objectAPI == nil {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
|
return
|
|
}
|
|
|
|
if crypto.S3KMS.IsRequested(r.Header) { // SSE-KMS is not supported
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
|
|
return
|
|
}
|
|
|
|
vars := mux.Vars(r)
|
|
dstBucket := vars["bucket"]
|
|
dstObject, err := unescapePath(vars["object"])
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectAction, dstBucket, dstObject); s3Error != ErrNone {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
return
|
|
}
|
|
|
|
// Read escaped copy source path to check for parameters.
|
|
cpSrcPath := r.Header.Get(xhttp.AmzCopySource)
|
|
var vid string
|
|
if u, err := url.Parse(cpSrcPath); err == nil {
|
|
vid = strings.TrimSpace(u.Query().Get(xhttp.VersionID))
|
|
// Note that url.Parse does the unescaping
|
|
cpSrcPath = u.Path
|
|
}
|
|
|
|
srcBucket, srcObject := path2BucketObject(cpSrcPath)
|
|
// If source object is empty or bucket is empty, reply back invalid copy source.
|
|
if srcObject == "" || srcBucket == "" {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL)
|
|
return
|
|
}
|
|
|
|
if vid != "" && vid != nullVersionID {
|
|
_, err := uuid.Parse(vid)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, VersionNotFound{
|
|
Bucket: srcBucket,
|
|
Object: srcObject,
|
|
VersionID: vid,
|
|
}), r.URL)
|
|
return
|
|
}
|
|
}
|
|
|
|
if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectAction, srcBucket, srcObject); s3Error != ErrNone {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
return
|
|
}
|
|
|
|
uploadID := r.Form.Get(xhttp.UploadID)
|
|
partIDString := r.Form.Get(xhttp.PartNumber)
|
|
|
|
partID, err := strconv.Atoi(partIDString)
|
|
if err != nil || partID <= 0 {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPart), r.URL)
|
|
return
|
|
}
|
|
|
|
// check partID with maximum part ID for multipart objects
|
|
if isMaxPartID(partID) {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMaxParts), r.URL)
|
|
return
|
|
}
|
|
|
|
var srcOpts, dstOpts ObjectOptions
|
|
srcOpts, err = copySrcOpts(ctx, r, srcBucket, srcObject)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
srcOpts.VersionID = vid
|
|
|
|
// convert copy src and dst encryption options for GET/PUT calls
|
|
getOpts := ObjectOptions{VersionID: srcOpts.VersionID}
|
|
if srcOpts.ServerSideEncryption != nil {
|
|
getOpts.ServerSideEncryption = encrypt.SSE(srcOpts.ServerSideEncryption)
|
|
}
|
|
|
|
dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, nil)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
getObjectNInfo := objectAPI.GetObjectNInfo
|
|
if api.CacheAPI() != nil {
|
|
getObjectNInfo = api.CacheAPI().GetObjectNInfo
|
|
}
|
|
|
|
// Get request range.
|
|
var rs *HTTPRangeSpec
|
|
var parseRangeErr error
|
|
if rangeHeader := r.Header.Get(xhttp.AmzCopySourceRange); rangeHeader != "" {
|
|
rs, parseRangeErr = parseCopyPartRangeSpec(rangeHeader)
|
|
} else {
|
|
// This check is to see if client specified a header but the value
|
|
// is empty for 'x-amz-copy-source-range'
|
|
_, ok := r.Header[xhttp.AmzCopySourceRange]
|
|
if ok {
|
|
parseRangeErr = errInvalidRange
|
|
}
|
|
}
|
|
|
|
checkCopyPartPrecondFn := func(o ObjectInfo) bool {
|
|
if _, err := DecryptObjectInfo(&o, r); err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return true
|
|
}
|
|
if checkCopyObjectPartPreconditions(ctx, w, r, o) {
|
|
return true
|
|
}
|
|
if parseRangeErr != nil {
|
|
logger.LogIf(ctx, parseRangeErr)
|
|
writeCopyPartErr(ctx, w, parseRangeErr, r.URL)
|
|
// Range header mismatch is pre-condition like failure
|
|
// so return true to indicate Range precondition failed.
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
getOpts.CheckPrecondFn = checkCopyPartPrecondFn
|
|
gr, err := getObjectNInfo(ctx, srcBucket, srcObject, rs, r.Header, getOpts)
|
|
if err != nil {
|
|
if isErrPreconditionFailed(err) {
|
|
return
|
|
}
|
|
if globalBucketVersioningSys.PrefixEnabled(srcBucket, srcObject) && gr != nil {
|
|
// Versioning enabled quite possibly object is deleted might be delete-marker
|
|
// if present set the headers, no idea why AWS S3 sets these headers.
|
|
if gr.ObjInfo.VersionID != "" && gr.ObjInfo.DeleteMarker {
|
|
w.Header()[xhttp.AmzVersionID] = []string{gr.ObjInfo.VersionID}
|
|
w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(gr.ObjInfo.DeleteMarker)}
|
|
}
|
|
}
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
defer gr.Close()
|
|
srcInfo := gr.ObjInfo
|
|
|
|
actualPartSize, err := srcInfo.GetActualSize()
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
if err := enforceBucketQuotaHard(ctx, dstBucket, actualPartSize); err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
// Special care for CopyObjectPart
|
|
if partRangeErr := checkCopyPartRangeWithSize(rs, actualPartSize); partRangeErr != nil {
|
|
writeCopyPartErr(ctx, w, partRangeErr, r.URL)
|
|
return
|
|
}
|
|
|
|
// Get the object offset & length
|
|
startOffset, length, err := rs.GetOffsetLength(actualPartSize)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
// maximum copy size for multipart objects in a single operation
|
|
if isMaxObjectSize(length) {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL)
|
|
return
|
|
}
|
|
|
|
if isRemoteCopyRequired(ctx, srcBucket, dstBucket, objectAPI) {
|
|
var dstRecords []dns.SrvRecord
|
|
dstRecords, err = globalDNSConfig.Get(dstBucket)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
// Send PutObject request to appropriate instance (in federated deployment)
|
|
core, rerr := getRemoteInstanceClient(r, getHostFromSrv(dstRecords))
|
|
if rerr != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, rerr), r.URL)
|
|
return
|
|
}
|
|
|
|
popts := minio.PutObjectPartOptions{
|
|
SSE: dstOpts.ServerSideEncryption,
|
|
}
|
|
|
|
partInfo, err := core.PutObjectPart(ctx, dstBucket, dstObject, uploadID, partID, gr, length, popts)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
response := generateCopyObjectPartResponse(partInfo.ETag, partInfo.LastModified)
|
|
encodedSuccessResponse := encodeResponse(response)
|
|
|
|
// Write success response.
|
|
writeSuccessResponseXML(w, encodedSuccessResponse)
|
|
return
|
|
}
|
|
|
|
actualPartSize = length
|
|
var reader io.Reader = etag.NewReader(gr, nil)
|
|
|
|
mi, err := objectAPI.GetMultipartInfo(ctx, dstBucket, dstObject, uploadID, dstOpts)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
_, isEncrypted := crypto.IsEncrypted(mi.UserDefined)
|
|
|
|
// Read compression metadata preserved in the init multipart for the decision.
|
|
_, isCompressed := mi.UserDefined[ReservedMetadataPrefix+"compression"]
|
|
// Compress only if the compression is enabled during initial multipart.
|
|
var idxCb func() []byte
|
|
if isCompressed {
|
|
wantEncryption := crypto.Requested(r.Header) || isEncrypted
|
|
s2c, cb := newS2CompressReader(reader, actualPartSize, wantEncryption)
|
|
idxCb = cb
|
|
defer s2c.Close()
|
|
reader = etag.Wrap(s2c, reader)
|
|
length = -1
|
|
}
|
|
|
|
srcInfo.Reader, err = hash.NewReader(reader, length, "", "", actualPartSize)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, mi.UserDefined)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
dstOpts.IndexCB = idxCb
|
|
|
|
rawReader := srcInfo.Reader
|
|
pReader := NewPutObjReader(rawReader)
|
|
|
|
var objectEncryptionKey crypto.ObjectKey
|
|
if isEncrypted {
|
|
if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(mi.UserDefined) {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL)
|
|
return
|
|
}
|
|
if crypto.S3.IsEncrypted(mi.UserDefined) && crypto.SSEC.IsRequested(r.Header) {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL)
|
|
return
|
|
}
|
|
var key []byte
|
|
if crypto.SSEC.IsRequested(r.Header) {
|
|
key, err = ParseSSECustomerRequest(r)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
}
|
|
key, err = decryptObjectMeta(key, dstBucket, dstObject, mi.UserDefined)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
copy(objectEncryptionKey[:], key)
|
|
|
|
partEncryptionKey := objectEncryptionKey.DerivePartKey(uint32(partID))
|
|
encReader, err := sio.EncryptReader(reader, sio.Config{Key: partEncryptionKey[:], CipherSuites: fips.DARECiphers()})
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
reader = etag.Wrap(encReader, reader)
|
|
|
|
wantSize := int64(-1)
|
|
if length >= 0 {
|
|
info := ObjectInfo{Size: length}
|
|
wantSize = info.EncryptedSize()
|
|
}
|
|
|
|
srcInfo.Reader, err = hash.NewReader(reader, wantSize, "", "", actualPartSize)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
pReader, err = pReader.WithEncryption(srcInfo.Reader, &objectEncryptionKey)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
if dstOpts.IndexCB != nil {
|
|
dstOpts.IndexCB = compressionIndexEncrypter(objectEncryptionKey, dstOpts.IndexCB)
|
|
}
|
|
}
|
|
|
|
srcInfo.PutObjReader = pReader
|
|
copyObjectPart := objectAPI.CopyObjectPart
|
|
if api.CacheAPI() != nil {
|
|
copyObjectPart = api.CacheAPI().CopyObjectPart
|
|
}
|
|
// Copy source object to destination, if source and destination
|
|
// object is same then only metadata is updated.
|
|
partInfo, err := copyObjectPart(ctx, srcBucket, srcObject, dstBucket, dstObject, uploadID, partID,
|
|
startOffset, length, srcInfo, srcOpts, dstOpts)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
if isEncrypted {
|
|
sseS3 := crypto.S3.IsRequested(r.Header) || crypto.S3.IsEncrypted(mi.UserDefined)
|
|
partInfo.ETag = tryDecryptETag(objectEncryptionKey[:], partInfo.ETag, sseS3)
|
|
}
|
|
|
|
response := generateCopyObjectPartResponse(partInfo.ETag, partInfo.LastModified)
|
|
encodedSuccessResponse := encodeResponse(response)
|
|
|
|
// Write success response.
|
|
writeSuccessResponseXML(w, encodedSuccessResponse)
|
|
}
|
|
|
|
// PutObjectPartHandler - uploads an incoming part for an ongoing multipart operation.
|
|
func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) {
|
|
ctx := newContext(r, w, "PutObjectPart")
|
|
|
|
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
|
|
|
objectAPI := api.ObjectAPI()
|
|
if objectAPI == nil {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
|
return
|
|
}
|
|
|
|
vars := mux.Vars(r)
|
|
bucket := vars["bucket"]
|
|
object, err := unescapePath(vars["object"])
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
// X-Amz-Copy-Source shouldn't be set for this call.
|
|
if _, ok := r.Header[xhttp.AmzCopySource]; ok {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL)
|
|
return
|
|
}
|
|
|
|
clientETag, err := etag.FromContentMD5(r.Header)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidDigest), r.URL)
|
|
return
|
|
}
|
|
|
|
// if Content-Length is unknown/missing, throw away
|
|
size := r.ContentLength
|
|
|
|
rAuthType := getRequestAuthType(r)
|
|
// For auth type streaming signature, we need to gather a different content length.
|
|
switch rAuthType {
|
|
// Check signature types that must have content length
|
|
case authTypeStreamingSigned, authTypeStreamingSignedTrailer, authTypeStreamingUnsignedTrailer:
|
|
if sizeStr, ok := r.Header[xhttp.AmzDecodedContentLength]; ok {
|
|
if sizeStr[0] == "" {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL)
|
|
return
|
|
}
|
|
size, err = strconv.ParseInt(sizeStr[0], 10, 64)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
if size == -1 {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL)
|
|
return
|
|
}
|
|
|
|
uploadID := r.Form.Get(xhttp.UploadID)
|
|
partIDString := r.Form.Get(xhttp.PartNumber)
|
|
|
|
partID, err := strconv.Atoi(partIDString)
|
|
if err != nil || partID <= 0 {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPart), r.URL)
|
|
return
|
|
}
|
|
|
|
// maximum size for multipart objects in a single operation
|
|
if isMaxObjectSize(size) {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL)
|
|
return
|
|
}
|
|
|
|
// check partID with maximum part ID for multipart objects
|
|
if isMaxPartID(partID) {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMaxParts), r.URL)
|
|
return
|
|
}
|
|
|
|
var (
|
|
md5hex = clientETag.String()
|
|
sha256hex = ""
|
|
reader io.Reader = r.Body
|
|
s3Error APIErrorCode
|
|
)
|
|
if s3Error = isPutActionAllowed(ctx, rAuthType, bucket, object, r, iampolicy.PutObjectAction); s3Error != ErrNone {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
return
|
|
}
|
|
|
|
switch rAuthType {
|
|
case authTypeStreamingSigned, authTypeStreamingSignedTrailer:
|
|
// Initialize stream signature verifier.
|
|
reader, s3Error = newSignV4ChunkedReader(r, rAuthType == authTypeStreamingSignedTrailer)
|
|
if s3Error != ErrNone {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
return
|
|
}
|
|
case authTypeStreamingUnsignedTrailer:
|
|
// Initialize stream signature verifier.
|
|
reader, s3Error = newUnsignedV4ChunkedReader(r, true)
|
|
if s3Error != ErrNone {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
return
|
|
}
|
|
case authTypeSignedV2, authTypePresignedV2:
|
|
if s3Error = isReqAuthenticatedV2(r); s3Error != ErrNone {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
return
|
|
}
|
|
case authTypePresigned, authTypeSigned:
|
|
if s3Error = reqSignatureV4Verify(r, globalSite.Region, serviceS3); s3Error != ErrNone {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
return
|
|
}
|
|
|
|
if !skipContentSha256Cksum(r) {
|
|
sha256hex = getContentSha256Cksum(r, serviceS3)
|
|
}
|
|
}
|
|
|
|
if err := enforceBucketQuotaHard(ctx, bucket, size); err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
actualSize := size
|
|
|
|
// get encryption options
|
|
var opts ObjectOptions
|
|
if crypto.SSEC.IsRequested(r.Header) {
|
|
opts, err = getOpts(ctx, r, bucket, object)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
}
|
|
|
|
mi, err := objectAPI.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
// Read compression metadata preserved in the init multipart for the decision.
|
|
_, isCompressed := mi.UserDefined[ReservedMetadataPrefix+"compression"]
|
|
var idxCb func() []byte
|
|
if isCompressed {
|
|
actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
if err = actualReader.AddChecksum(r, false); err != nil {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidChecksum), r.URL)
|
|
return
|
|
}
|
|
|
|
// Set compression metrics.
|
|
wantEncryption := crypto.Requested(r.Header)
|
|
s2c, cb := newS2CompressReader(actualReader, actualSize, wantEncryption)
|
|
idxCb = cb
|
|
defer s2c.Close()
|
|
reader = etag.Wrap(s2c, actualReader)
|
|
size = -1 // Since compressed size is un-predictable.
|
|
md5hex = "" // Do not try to verify the content.
|
|
sha256hex = ""
|
|
}
|
|
|
|
hashReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
if err := hashReader.AddChecksum(r, size < 0); err != nil {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidChecksum), r.URL)
|
|
return
|
|
}
|
|
|
|
pReader := NewPutObjReader(hashReader)
|
|
|
|
_, isEncrypted := crypto.IsEncrypted(mi.UserDefined)
|
|
var objectEncryptionKey crypto.ObjectKey
|
|
if isEncrypted {
|
|
if !crypto.SSEC.IsRequested(r.Header) && crypto.SSEC.IsEncrypted(mi.UserDefined) {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSSEMultipartEncrypted), r.URL)
|
|
return
|
|
}
|
|
|
|
opts, err = putOpts(ctx, r, bucket, object, mi.UserDefined)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
var key []byte
|
|
if crypto.SSEC.IsRequested(r.Header) {
|
|
key, err = ParseSSECustomerRequest(r)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Calculating object encryption key
|
|
key, err = decryptObjectMeta(key, bucket, object, mi.UserDefined)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
copy(objectEncryptionKey[:], key)
|
|
|
|
partEncryptionKey := objectEncryptionKey.DerivePartKey(uint32(partID))
|
|
in := io.Reader(hashReader)
|
|
if size > encryptBufferThreshold {
|
|
// The encryption reads in blocks of 64KB.
|
|
// We add a buffer on bigger files to reduce the number of syscalls upstream.
|
|
in = bufio.NewReaderSize(hashReader, encryptBufferSize)
|
|
}
|
|
reader, err = sio.EncryptReader(in, sio.Config{Key: partEncryptionKey[:], CipherSuites: fips.DARECiphers()})
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
wantSize := int64(-1)
|
|
if size >= 0 {
|
|
info := ObjectInfo{Size: size}
|
|
wantSize = info.EncryptedSize()
|
|
}
|
|
// do not try to verify encrypted content
|
|
hashReader, err = hash.NewReader(etag.Wrap(reader, hashReader), wantSize, "", "", actualSize)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
if err := hashReader.AddChecksum(r, true); err != nil {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidChecksum), r.URL)
|
|
return
|
|
}
|
|
|
|
pReader, err = pReader.WithEncryption(hashReader, &objectEncryptionKey)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
if idxCb != nil {
|
|
idxCb = compressionIndexEncrypter(objectEncryptionKey, idxCb)
|
|
}
|
|
opts.EncryptFn = metadataEncrypter(objectEncryptionKey)
|
|
}
|
|
opts.IndexCB = idxCb
|
|
|
|
putObjectPart := objectAPI.PutObjectPart
|
|
if api.CacheAPI() != nil {
|
|
putObjectPart = api.CacheAPI().PutObjectPart
|
|
}
|
|
|
|
partInfo, err := putObjectPart(ctx, bucket, object, uploadID, partID, pReader, opts)
|
|
if err != nil {
|
|
// Verify if the underlying error is signature mismatch.
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
etag := partInfo.ETag
|
|
if kind, encrypted := crypto.IsEncrypted(mi.UserDefined); encrypted {
|
|
switch kind {
|
|
case crypto.S3KMS:
|
|
w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionKMS)
|
|
w.Header().Set(xhttp.AmzServerSideEncryptionKmsID, mi.KMSKeyID())
|
|
if kmsCtx, ok := mi.UserDefined[crypto.MetaContext]; ok {
|
|
w.Header().Set(xhttp.AmzServerSideEncryptionKmsContext, kmsCtx)
|
|
}
|
|
if len(etag) >= 32 && strings.Count(etag, "-") != 1 {
|
|
etag = etag[len(etag)-32:]
|
|
}
|
|
case crypto.S3:
|
|
w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionAES)
|
|
etag, _ = DecryptETag(objectEncryptionKey, ObjectInfo{ETag: etag})
|
|
case crypto.SSEC:
|
|
w.Header().Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerAlgorithm))
|
|
w.Header().Set(xhttp.AmzServerSideEncryptionCustomerKeyMD5, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerKeyMD5))
|
|
|
|
if len(etag) >= 32 && strings.Count(etag, "-") != 1 {
|
|
etag = etag[len(etag)-32:]
|
|
}
|
|
}
|
|
}
|
|
|
|
// We must not use the http.Header().Set method here because some (broken)
|
|
// clients expect the ETag header key to be literally "ETag" - not "Etag" (case-sensitive).
|
|
// Therefore, we have to set the ETag directly as map entry.
|
|
w.Header()[xhttp.ETag] = []string{"\"" + etag + "\""}
|
|
hash.TransferChecksumHeader(w, r)
|
|
|
|
writeSuccessResponseHeadersOnly(w)
|
|
}
|
|
|
|
// CompleteMultipartUploadHandler - Complete multipart upload.
|
|
func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
|
ctx := newContext(r, w, "CompleteMultipartUpload")
|
|
|
|
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
|
|
|
vars := mux.Vars(r)
|
|
bucket := vars["bucket"]
|
|
object, err := unescapePath(vars["object"])
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
objectAPI := api.ObjectAPI()
|
|
if objectAPI == nil {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
|
return
|
|
}
|
|
|
|
if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectAction, bucket, object); s3Error != ErrNone {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
return
|
|
}
|
|
|
|
// Get upload id.
|
|
uploadID, _, _, _, s3Error := getObjectResources(r.Form)
|
|
if s3Error != ErrNone {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
return
|
|
}
|
|
|
|
// Content-Length is required and should be non-zero
|
|
if r.ContentLength <= 0 {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingPart), r.URL)
|
|
return
|
|
}
|
|
|
|
complMultipartUpload := &CompleteMultipartUpload{}
|
|
if err = xmlDecoder(r.Body, complMultipartUpload, r.ContentLength); err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
if len(complMultipartUpload.Parts) == 0 {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingPart), r.URL)
|
|
return
|
|
}
|
|
|
|
if !sort.SliceIsSorted(complMultipartUpload.Parts, func(i, j int) bool {
|
|
return complMultipartUpload.Parts[i].PartNumber < complMultipartUpload.Parts[j].PartNumber
|
|
}) {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPartOrder), r.URL)
|
|
return
|
|
}
|
|
|
|
// Reject retention or governance headers if set, CompleteMultipartUpload spec
|
|
// does not use these headers, and should not be passed down to checkPutObjectLockAllowed
|
|
if objectlock.IsObjectLockRequested(r.Header) || objectlock.IsObjectLockGovernanceBypassSet(r.Header) {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL)
|
|
return
|
|
}
|
|
|
|
if _, _, _, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, objectAPI.GetObjectInfo, ErrNone, ErrNone); s3Err != ErrNone {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
|
|
return
|
|
}
|
|
|
|
completeMultiPartUpload := objectAPI.CompleteMultipartUpload
|
|
if api.CacheAPI() != nil {
|
|
completeMultiPartUpload = api.CacheAPI().CompleteMultipartUpload
|
|
}
|
|
|
|
versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object)
|
|
suspended := globalBucketVersioningSys.PrefixSuspended(bucket, object)
|
|
os := newObjSweeper(bucket, object).WithVersioning(versioned, suspended)
|
|
if !globalTierConfigMgr.Empty() {
|
|
// Get appropriate object info to identify the remote object to delete
|
|
goiOpts := os.GetOpts()
|
|
if goi, gerr := objectAPI.GetObjectInfo(ctx, bucket, object, goiOpts); gerr == nil {
|
|
os.SetTransitionState(goi.TransitionedObject)
|
|
}
|
|
}
|
|
|
|
setEventStreamHeaders(w)
|
|
|
|
opts, err := completeMultipartOpts(ctx, r, bucket, object)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
// First, we compute the ETag of the multipart object.
|
|
// The ETag of a multi-part object is always:
|
|
// ETag := MD5(ETag_p1, ETag_p2, ...)+"-N" (N being the number of parts)
|
|
//
|
|
// This is independent of encryption. An encrypted multipart
|
|
// object also has an ETag that is the MD5 of its part ETags.
|
|
// The fact the in case of encryption the ETag of a part is
|
|
// not the MD5 of the part content does not change that.
|
|
var completeETags []etag.ETag
|
|
for _, part := range complMultipartUpload.Parts {
|
|
ETag, err := etag.Parse(part.ETag)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
completeETags = append(completeETags, ETag)
|
|
}
|
|
multipartETag := etag.Multipart(completeETags...)
|
|
opts.UserDefined["etag"] = multipartETag.String()
|
|
|
|
objInfo, err := completeMultiPartUpload(ctx, bucket, object, uploadID, complMultipartUpload.Parts, opts)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
opts.EncryptFn, err = objInfo.metadataEncryptFn(r.Header)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
if r.Header.Get(xMinIOExtract) == "true" && HasSuffix(object, archiveExt) {
|
|
opts := ObjectOptions{VersionID: objInfo.VersionID, MTime: objInfo.ModTime}
|
|
if _, err := updateObjectMetadataWithZipInfo(ctx, objectAPI, bucket, object, opts); err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
}
|
|
|
|
setPutObjHeaders(w, objInfo, false)
|
|
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo, replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
|
|
scheduleReplication(ctx, objInfo.Clone(), objectAPI, dsc, replication.ObjectReplicationType)
|
|
}
|
|
if _, ok := r.Header[xhttp.MinIOSourceReplicationRequest]; ok {
|
|
actualSize, _ := objInfo.GetActualSize()
|
|
defer globalReplicationStats.UpdateReplicaStat(bucket, actualSize)
|
|
}
|
|
|
|
// Get object location.
|
|
location := getObjectLocation(r, globalDomainNames, bucket, object)
|
|
// Generate complete multipart response.
|
|
response := generateCompleteMultpartUploadResponse(bucket, object, location, objInfo)
|
|
encodedSuccessResponse := encodeResponse(response)
|
|
|
|
// Write success response.
|
|
writeSuccessResponseXML(w, encodedSuccessResponse)
|
|
|
|
// Notify object created event.
|
|
evt := eventArgs{
|
|
EventName: event.ObjectCreatedCompleteMultipartUpload,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
ReqParams: extractReqParams(r),
|
|
RespElements: extractRespElements(w),
|
|
UserAgent: r.UserAgent(),
|
|
Host: handlers.GetSourceIP(r),
|
|
}
|
|
sendEvent(evt)
|
|
|
|
if objInfo.NumVersions > dataScannerExcessiveVersionsThreshold {
|
|
evt.EventName = event.ObjectManyVersions
|
|
sendEvent(evt)
|
|
}
|
|
|
|
// Remove the transitioned object whose object version is being overwritten.
|
|
if !globalTierConfigMgr.Empty() {
|
|
// Schedule object for immediate transition if eligible.
|
|
enqueueTransitionImmediate(objInfo, lcEventSrc_s3CompleteMultipartUpload)
|
|
logger.LogIf(ctx, os.Sweep())
|
|
}
|
|
}
|
|
|
|
// AbortMultipartUploadHandler - Abort multipart upload
|
|
func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
|
ctx := newContext(r, w, "AbortMultipartUpload")
|
|
|
|
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
|
|
|
vars := mux.Vars(r)
|
|
bucket := vars["bucket"]
|
|
object, err := unescapePath(vars["object"])
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
objectAPI := api.ObjectAPI()
|
|
if objectAPI == nil {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
|
return
|
|
}
|
|
abortMultipartUpload := objectAPI.AbortMultipartUpload
|
|
if api.CacheAPI() != nil {
|
|
abortMultipartUpload = api.CacheAPI().AbortMultipartUpload
|
|
}
|
|
|
|
if s3Error := checkRequestAuthType(ctx, r, policy.AbortMultipartUploadAction, bucket, object); s3Error != ErrNone {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
return
|
|
}
|
|
|
|
uploadID, _, _, _, s3Error := getObjectResources(r.Form)
|
|
if s3Error != ErrNone {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
return
|
|
}
|
|
opts := ObjectOptions{}
|
|
if err := abortMultipartUpload(ctx, bucket, object, uploadID, opts); err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
writeSuccessNoContent(w)
|
|
}
|
|
|
|
// ListObjectPartsHandler - List object parts
|
|
func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) {
|
|
ctx := newContext(r, w, "ListObjectParts")
|
|
|
|
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
|
|
|
vars := mux.Vars(r)
|
|
bucket := vars["bucket"]
|
|
object, err := unescapePath(vars["object"])
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
objectAPI := api.ObjectAPI()
|
|
if objectAPI == nil {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
|
return
|
|
}
|
|
|
|
if s3Error := checkRequestAuthType(ctx, r, policy.ListMultipartUploadPartsAction, bucket, object); s3Error != ErrNone {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
return
|
|
}
|
|
|
|
uploadID, partNumberMarker, maxParts, encodingType, s3Error := getObjectResources(r.Form)
|
|
if s3Error != ErrNone {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
return
|
|
}
|
|
if partNumberMarker < 0 {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidPartNumberMarker), r.URL)
|
|
return
|
|
}
|
|
if maxParts < 0 {
|
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMaxParts), r.URL)
|
|
return
|
|
}
|
|
|
|
opts := ObjectOptions{}
|
|
listPartsInfo, err := objectAPI.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
|
|
// We have to adjust the size of encrypted parts since encrypted parts
|
|
// are slightly larger due to encryption overhead.
|
|
// Further, we have to adjust the ETags of parts when using SSE-S3.
|
|
// Due to AWS S3, SSE-S3 encrypted parts return the plaintext ETag
|
|
// being the content MD5 of that particular part. This is not the
|
|
// case for SSE-C and SSE-KMS objects.
|
|
if kind, ok := crypto.IsEncrypted(listPartsInfo.UserDefined); ok {
|
|
var objectEncryptionKey []byte
|
|
if kind == crypto.S3 {
|
|
objectEncryptionKey, err = decryptObjectMeta(nil, bucket, object, listPartsInfo.UserDefined)
|
|
if err != nil {
|
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
return
|
|
}
|
|
}
|
|
for i, p := range listPartsInfo.Parts {
|
|
listPartsInfo.Parts[i].ETag = tryDecryptETag(objectEncryptionKey, p.ETag, kind == crypto.S3)
|
|
listPartsInfo.Parts[i].Size = p.ActualSize
|
|
}
|
|
}
|
|
|
|
response := generateListPartsResponse(listPartsInfo, encodingType)
|
|
encodedSuccessResponse := encodeResponse(response)
|
|
|
|
// Write success response.
|
|
writeSuccessResponseXML(w, encodedSuccessResponse)
|
|
}
|