minio/cmd/object-handlers.go
Klaus Post f1302c40fe
Fix uninitialized replication stats (#20260)
Services are unfrozen before `initBackgroundReplication` is finished. This means that 
the globalReplicationStats write is racy. Switch to an atomic pointer.

Provide the `ReplicationPool` with the stats, so it doesn't have to be grabbed 
from the atomic pointer on every use.

All other loads and checks are nil, and calls return empty values when stats 
still haven't been initialized.
2024-08-15 05:04:40 -07:00

3766 lines
124 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"archive/tar"
"bytes"
"context"
"encoding/hex"
"encoding/xml"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/textproto"
"net/url"
"os"
"sort"
"strconv"
"strings"
"sync/atomic"
"time"
"unicode"
"github.com/google/uuid"
"github.com/klauspost/compress/gzhttp"
miniogo "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/internal/amztime"
"github.com/minio/minio/internal/auth"
sse "github.com/minio/minio/internal/bucket/encryption"
"github.com/minio/minio/internal/bucket/lifecycle"
objectlock "github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/bucket/replication"
"github.com/minio/minio/internal/config/cache"
"github.com/minio/minio/internal/config/dns"
"github.com/minio/minio/internal/config/storageclass"
"github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/etag"
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/handlers"
"github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/kms"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/s3select"
"github.com/minio/mux"
"github.com/minio/pkg/v3/policy"
"github.com/valyala/bytebufferpool"
)
// supportedHeadGetReqParams - supported request parameters for GET and HEAD presigned request.
var supportedHeadGetReqParams = map[string]string{
"response-expires": xhttp.Expires,
"response-content-type": xhttp.ContentType,
"response-cache-control": xhttp.CacheControl,
"response-content-encoding": xhttp.ContentEncoding,
"response-content-language": xhttp.ContentLanguage,
"response-content-disposition": xhttp.ContentDisposition,
}
const (
compressionAlgorithmV1 = "golang/snappy/LZ77"
compressionAlgorithmV2 = "klauspost/compress/s2"
// When an upload exceeds encryptBufferThreshold ...
encryptBufferThreshold = 1 << 20
// add an input buffer of this size.
encryptBufferSize = 1 << 20
// minCompressibleSize is the minimum size at which we enable compression.
minCompressibleSize = 4096
)
// setHeadGetRespHeaders - set any requested parameters as response headers.
func setHeadGetRespHeaders(w http.ResponseWriter, reqParams url.Values) {
for k, v := range reqParams {
if header, ok := supportedHeadGetReqParams[strings.ToLower(k)]; ok {
w.Header()[header] = []string{strings.Join(v, ",")}
}
}
}
// SelectObjectContentHandler - GET Object?select
// ----------
// This implementation of the GET operation retrieves object content based
// on an SQL expression. In the request, along with the sql expression, you must
// also specify a data serialization format (JSON, CSV) of the object.
func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "SelectObject")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
// Fetch object stat info.
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
if crypto.S3.IsRequested(r.Header) || crypto.S3KMS.IsRequested(r.Header) { // If SSE-S3 or SSE-KMS present -> AWS fails with undefined error
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrBadRequest), r.URL)
return
}
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
getObjectInfo := objectAPI.GetObjectInfo
// Check for auth type to return S3 compatible error.
// type to return the correct error (NoSuchKey vs AccessDenied)
if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectAction, bucket, object); s3Error != ErrNone {
if getRequestAuthType(r) == authTypeAnonymous {
// As per "Permission" section in
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html
// If the object you request does not exist,
// the error Amazon S3 returns depends on
// whether you also have the s3:ListBucket
// permission.
// * If you have the s3:ListBucket permission
// on the bucket, Amazon S3 will return an
// HTTP status code 404 ("no such key")
// error.
// * if you dont have the s3:ListBucket
// permission, Amazon S3 will return an HTTP
// status code 403 ("access denied") error.`
if globalPolicySys.IsAllowed(policy.BucketPolicyArgs{
Action: policy.ListBucketAction,
BucketName: bucket,
ConditionValues: getConditionValues(r, "", auth.AnonymousCredentials),
IsOwner: false,
}) {
_, err = getObjectInfo(ctx, bucket, object, opts)
if toAPIError(ctx, err).Code == "NoSuchKey" {
s3Error = ErrNoSuchKey
}
}
}
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
// Get request range.
rangeHeader := r.Header.Get(xhttp.Range)
if rangeHeader != "" {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrUnsupportedRangeHeader), r.URL)
return
}
if r.ContentLength <= 0 {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEmptyRequestBody), r.URL)
return
}
// Take read lock on object, here so subsequent lower-level
// calls do not need to.
lock := objectAPI.NewNSLock(bucket, object)
lkctx, err := lock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
ctx = lkctx.Context()
defer lock.RUnlock(lkctx)
getObjectNInfo := objectAPI.GetObjectNInfo
gopts := opts
gopts.NoLock = true // We already have a lock, we can live with it.
objInfo, err := getObjectInfo(ctx, bucket, object, gopts)
if err != nil {
// Versioning enabled quite possibly object is deleted might be delete-marker
// if present set the headers, no idea why AWS S3 sets these headers.
if objInfo.VersionID != "" && objInfo.DeleteMarker {
w.Header()[xhttp.AmzVersionID] = []string{objInfo.VersionID}
w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(objInfo.DeleteMarker)}
}
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// filter object lock metadata if permission does not permit
getRetPerms := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object)
legalHoldPerms := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object)
// filter object lock metadata if permission does not permit
objInfo.UserDefined = objectlock.FilterObjectLockMetadata(objInfo.UserDefined, getRetPerms != ErrNone, legalHoldPerms != ErrNone)
if _, err = DecryptObjectInfo(&objInfo, r); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
actualSize, err := objInfo.GetActualSize()
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
objectRSC := s3select.NewObjectReadSeekCloser(
func(offset int64) (io.ReadCloser, error) {
rs := &HTTPRangeSpec{
IsSuffixLength: false,
Start: offset,
End: -1,
}
opts.NoLock = true
return getObjectNInfo(ctx, bucket, object, rs, r.Header, opts)
},
actualSize,
)
defer objectRSC.Close()
s3Select, err := s3select.NewS3Select(r.Body)
if err != nil {
if serr, ok := err.(s3select.SelectError); ok {
encodedErrorResponse := encodeResponse(APIErrorResponse{
Code: serr.ErrorCode(),
Message: serr.ErrorMessage(),
BucketName: bucket,
Key: object,
Resource: r.URL.Path,
RequestID: w.Header().Get(xhttp.AmzRequestID),
HostID: globalDeploymentID(),
})
writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, mimeXML)
} else {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
}
return
}
defer s3Select.Close()
if err = s3Select.Open(objectRSC); err != nil {
if serr, ok := err.(s3select.SelectError); ok {
encodedErrorResponse := encodeResponse(APIErrorResponse{
Code: serr.ErrorCode(),
Message: serr.ErrorMessage(),
BucketName: bucket,
Key: object,
Resource: r.URL.Path,
RequestID: w.Header().Get(xhttp.AmzRequestID),
HostID: globalDeploymentID(),
})
writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, mimeXML)
} else {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
}
return
}
// Set encryption response headers
switch kind, _ := crypto.IsEncrypted(objInfo.UserDefined); kind {
case crypto.S3:
w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionAES)
case crypto.S3KMS:
w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionKMS)
w.Header().Set(xhttp.AmzServerSideEncryptionKmsID, objInfo.KMSKeyID())
if kmsCtx, ok := objInfo.UserDefined[crypto.MetaContext]; ok {
w.Header().Set(xhttp.AmzServerSideEncryptionKmsContext, kmsCtx)
}
case crypto.SSEC:
// Validate the SSE-C Key set in the header.
if _, err = crypto.SSEC.UnsealObjectKey(r.Header, objInfo.UserDefined, bucket, object); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
w.Header().Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerAlgorithm))
w.Header().Set(xhttp.AmzServerSideEncryptionCustomerKeyMD5, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerKeyMD5))
}
s3Select.Evaluate(w)
// Notify object accessed via a GET request.
sendEvent(eventArgs{
EventName: event.ObjectAccessedGet,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
}
func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI ObjectLayer, bucket, object string, w http.ResponseWriter, r *http.Request) {
if crypto.S3.IsRequested(r.Header) || crypto.S3KMS.IsRequested(r.Header) { // If SSE-S3 or SSE-KMS present -> AWS fails with undefined error
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrBadRequest), r.URL)
return
}
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Check for auth type to return S3 compatible error.
// type to return the correct error (NoSuchKey vs AccessDenied)
if s3Error := authenticateRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone {
if getRequestAuthType(r) == authTypeAnonymous {
// As per "Permission" section in
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html
// If the object you request does not exist,
// the error Amazon S3 returns depends on
// whether you also have the s3:ListBucket
// permission.
// * If you have the s3:ListBucket permission
// on the bucket, Amazon S3 will return an
// HTTP status code 404 ("no such key")
// error.
// * if you dont have the s3:ListBucket
// permission, Amazon S3 will return an HTTP
// status code 403 ("access denied") error.`
if globalPolicySys.IsAllowed(policy.BucketPolicyArgs{
Action: policy.ListBucketAction,
BucketName: bucket,
ConditionValues: getConditionValues(r, "", auth.AnonymousCredentials),
IsOwner: false,
}) {
getObjectInfo := objectAPI.GetObjectInfo
_, err = getObjectInfo(ctx, bucket, object, opts)
if toAPIError(ctx, err).Code == "NoSuchKey" {
s3Error = ErrNoSuchKey
}
}
}
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
getObjectNInfo := objectAPI.GetObjectNInfo
// Get request range.
var rs *HTTPRangeSpec
var rangeErr error
rangeHeader := r.Header.Get(xhttp.Range)
if rangeHeader != "" {
// Both 'Range' and 'partNumber' cannot be specified at the same time
if opts.PartNumber > 0 {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRangePartNumber), r.URL)
return
}
rs, rangeErr = parseRequestRangeSpec(rangeHeader)
// Handle only errInvalidRange. Ignore other
// parse error and treat it as regular Get
// request like Amazon S3.
if errors.Is(rangeErr, errInvalidRange) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRange), r.URL)
return
}
}
cachedResult := globalCacheConfig.Enabled() && opts.VersionID == ""
if _, ok := crypto.IsRequested(r.Header); ok {
// No need to check cache for encrypted objects.
cachedResult = false
}
var update bool
if cachedResult {
rc := &cache.CondCheck{}
h := r.Header.Clone()
if opts.PartNumber > 0 {
h.Set(xhttp.PartNumber, strconv.Itoa(opts.PartNumber))
}
rc.Init(bucket, object, h)
ci, err := globalCacheConfig.Get(rc)
if ci != nil {
tgs, ok := ci.Metadata[xhttp.AmzObjectTagging]
if ok {
// Set this such that authorization policies can be applied on the object tags.
r.Header.Set(xhttp.AmzObjectTagging, tgs)
}
if s3Error := authorizeRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone {
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(s3Error))
return
}
okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent ||
ci.StatusCode == http.StatusPreconditionFailed || ci.StatusCode == http.StatusNotModified)
if okSt {
ci.WriteHeaders(w, func() {
// set common headers
setCommonHeaders(w)
}, func() {
okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent)
if okSt && len(ci.Data) > 0 {
for k, v := range ci.Metadata {
w.Header().Set(k, v)
}
if opts.PartNumber > 0 && strings.Contains(ci.ETag, "-") {
w.Header()[xhttp.AmzMpPartsCount] = []string{
strings.TrimLeftFunc(ci.ETag, func(r rune) bool {
return !unicode.IsNumber(r)
}),
}
}
// For providing ranged content
start, rangeLen, err := rs.GetOffsetLength(ci.Size)
if err != nil {
start, rangeLen = 0, ci.Size
}
// Set content length.
w.Header().Set(xhttp.ContentLength, strconv.FormatInt(rangeLen, 10))
if rs != nil {
contentRange := fmt.Sprintf("bytes %d-%d/%d", start, start+rangeLen-1, ci.Size)
w.Header().Set(xhttp.ContentRange, contentRange)
}
io.Copy(w, bytes.NewReader(ci.Data))
return
}
if ci.StatusCode == http.StatusPreconditionFailed {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrPreconditionFailed), r.URL)
return
} else if ci.StatusCode == http.StatusNotModified {
w.WriteHeader(ci.StatusCode)
return
}
// We did not satisfy any requirement from the cache, update the cache.
// this basically means that we do not have the Data for the object
// cached yet
update = true
})
if !update {
// No update is needed means we have written already to the client just return here.
return
}
}
}
if errors.Is(err, cache.ErrKeyMissing) {
update = true
}
}
// Validate pre-conditions if any.
opts.CheckPrecondFn = func(oi ObjectInfo) bool {
if _, err := DecryptObjectInfo(&oi, r); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return true
}
if oi.UserTags != "" {
r.Header.Set(xhttp.AmzObjectTagging, oi.UserTags)
}
if s3Error := authorizeRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return true
}
return checkPreconditions(ctx, w, r, oi, opts)
}
opts.FastGetObjInfo = true
var proxy proxyResult
gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, opts)
if err != nil {
var (
reader *GetObjectReader
perr error
)
if (isErrObjectNotFound(err) || isErrVersionNotFound(err) || isErrReadQuorum(err)) && !(gr != nil && gr.ObjInfo.DeleteMarker) {
proxytgts := getProxyTargets(ctx, bucket, object, opts)
if !proxytgts.Empty() {
globalReplicationStats.Load().incProxy(bucket, getObjectAPI, false)
// proxy to replication target if active-active replication is in place.
reader, proxy, perr = proxyGetToReplicationTarget(ctx, bucket, object, rs, r.Header, opts, proxytgts)
if perr != nil {
globalReplicationStats.Load().incProxy(bucket, getObjectAPI, true)
proxyGetErr := ErrorRespToObjectError(perr, bucket, object)
if !isErrBucketNotFound(proxyGetErr) && !isErrObjectNotFound(proxyGetErr) && !isErrVersionNotFound(proxyGetErr) &&
!isErrPreconditionFailed(proxyGetErr) && !isErrInvalidRange(proxyGetErr) {
replLogIf(ctx, fmt.Errorf("Proxying request (replication) failed for %s/%s(%s) - %w", bucket, object, opts.VersionID, perr))
}
}
if reader != nil && proxy.Proxy && perr == nil {
gr = reader
}
}
}
if reader == nil || !proxy.Proxy {
// validate if the request indeed was authorized, if it wasn't we need to return "ErrAccessDenied"
// instead of any namespace related error.
if s3Error := authorizeRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
if isErrPreconditionFailed(err) {
return
}
if proxy.Err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, proxy.Err), r.URL)
return
}
if gr != nil {
if !gr.ObjInfo.VersionPurgeStatus.Empty() {
// Shows the replication status of a permanent delete of a version
w.Header()[xhttp.MinIODeleteReplicationStatus] = []string{string(gr.ObjInfo.VersionPurgeStatus)}
}
if !gr.ObjInfo.ReplicationStatus.Empty() && gr.ObjInfo.DeleteMarker {
w.Header()[xhttp.MinIODeleteMarkerReplicationStatus] = []string{string(gr.ObjInfo.ReplicationStatus)}
}
// Versioning enabled quite possibly object is deleted might be delete-marker
// if present set the headers, no idea why AWS S3 sets these headers.
if gr.ObjInfo.VersionID != "" && gr.ObjInfo.DeleteMarker {
w.Header()[xhttp.AmzVersionID] = []string{gr.ObjInfo.VersionID}
w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(gr.ObjInfo.DeleteMarker)}
}
QueueReplicationHeal(ctx, bucket, gr.ObjInfo, 0)
}
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
}
defer gr.Close()
objInfo := gr.ObjInfo
if !proxy.Proxy { // apply lifecycle rules only for local requests
// Automatically remove the object/version if an expiry lifecycle rule can be applied
if lc, err := globalLifecycleSys.Get(bucket); err == nil {
rcfg, err := globalBucketObjectLockSys.Get(bucket)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
replcfg, err := getReplicationConfig(ctx, bucket)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
event := evalActionFromLifecycle(ctx, *lc, rcfg, replcfg, objInfo)
if event.Action.Delete() {
// apply whatever the expiry rule is.
applyExpiryRule(event, lcEventSrc_s3GetObject, objInfo)
if !event.Action.DeleteRestored() {
// If the ILM action is not on restored object return error.
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNoSuchKey), r.URL)
return
}
}
}
QueueReplicationHeal(ctx, bucket, gr.ObjInfo, 0)
}
// filter object lock metadata if permission does not permit
getRetPerms := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object)
legalHoldPerms := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object)
// filter object lock metadata if permission does not permit
objInfo.UserDefined = objectlock.FilterObjectLockMetadata(objInfo.UserDefined, getRetPerms != ErrNone, legalHoldPerms != ErrNone)
// Set encryption response headers
if kind, isEncrypted := crypto.IsEncrypted(objInfo.UserDefined); isEncrypted {
switch kind {
case crypto.S3:
w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionAES)
case crypto.S3KMS:
w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionKMS)
w.Header().Set(xhttp.AmzServerSideEncryptionKmsID, objInfo.KMSKeyID())
if kmsCtx, ok := objInfo.UserDefined[crypto.MetaContext]; ok {
w.Header().Set(xhttp.AmzServerSideEncryptionKmsContext, kmsCtx)
}
case crypto.SSEC:
w.Header().Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerAlgorithm))
w.Header().Set(xhttp.AmzServerSideEncryptionCustomerKeyMD5, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerKeyMD5))
}
objInfo.ETag = getDecryptedETag(r.Header, objInfo, false)
}
if r.Header.Get(xhttp.AmzChecksumMode) == "ENABLED" && rs == nil {
// AWS S3 silently drops checksums on range requests.
hash.AddChecksumHeader(w, objInfo.decryptChecksums(opts.PartNumber, r.Header))
}
if err = setObjectHeaders(ctx, w, objInfo, rs, opts); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Set Parts Count Header
if opts.PartNumber > 0 && len(objInfo.Parts) > 0 {
setPartsCountHeaders(w, objInfo)
}
setHeadGetRespHeaders(w, r.Form)
var buf *bytebufferpool.ByteBuffer
if update && globalCacheConfig.MatchesSize(objInfo.Size) {
buf = bytebufferpool.Get()
defer bytebufferpool.Put(buf)
}
var iw io.Writer
iw = w
if buf != nil {
iw = io.MultiWriter(w, buf)
}
statusCodeWritten := false
httpWriter := xioutil.WriteOnClose(iw)
if rs != nil || opts.PartNumber > 0 {
statusCodeWritten = true
w.WriteHeader(http.StatusPartialContent)
}
// Write object content to response body
if _, err = xioutil.Copy(httpWriter, gr); err != nil {
if !httpWriter.HasWritten() && !statusCodeWritten {
// write error response only if no data or headers has been written to client yet
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
return
}
if err = httpWriter.Close(); err != nil {
if !httpWriter.HasWritten() && !statusCodeWritten { // write error response only if no data or headers has been written to client yet
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
return
}
defer func() {
var data []byte
if buf != nil {
data = buf.Bytes()
}
if len(data) == 0 {
return
}
globalCacheConfig.Set(&cache.ObjectInfo{
Key: objInfo.Name,
Bucket: objInfo.Bucket,
ETag: objInfo.ETag,
ModTime: objInfo.ModTime,
Expires: objInfo.ExpiresStr(),
CacheControl: objInfo.CacheControl,
Metadata: cleanReservedKeys(objInfo.UserDefined),
Range: rangeHeader,
PartNumber: opts.PartNumber,
Size: int64(len(data)),
Data: data,
})
}()
// Notify object accessed via a GET request.
sendEvent(eventArgs{
EventName: event.ObjectAccessedGet,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
}
// GetObjectAttributes ...
func (api objectAPIHandlers) getObjectAttributesHandler(ctx context.Context, objectAPI ObjectLayer, bucket, object string, w http.ResponseWriter, r *http.Request) {
opts, valid := getAndValidateAttributesOpts(ctx, w, r, bucket, object)
if !valid {
return
}
var s3Error APIErrorCode
if opts.VersionID != "" {
s3Error = checkRequestAuthType(ctx, r, policy.GetObjectVersionAttributesAction, bucket, object)
if s3Error == ErrNone {
s3Error = checkRequestAuthType(ctx, r, policy.GetObjectVersionAction, bucket, object)
}
} else {
s3Error = checkRequestAuthType(ctx, r, policy.GetObjectAttributesAction, bucket, object)
if s3Error == ErrNone {
s3Error = checkRequestAuthType(ctx, r, policy.GetObjectAction, bucket, object)
}
}
if s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
objInfo, err := objectAPI.GetObjectInfo(ctx, bucket, object, opts)
if err != nil {
s3Error = checkRequestAuthType(ctx, r, policy.ListBucketAction, bucket, object)
if s3Error == ErrNone {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrAccessDenied), r.URL)
return
}
if _, err = DecryptObjectInfo(&objInfo, r); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if checkPreconditions(ctx, w, r, objInfo, opts) {
return
}
OA := new(getObjectAttributesResponse)
if opts.Versioned {
w.Header().Set(xhttp.AmzVersionID, objInfo.VersionID)
}
lastModified := objInfo.ModTime.UTC().Format(http.TimeFormat)
w.Header().Set(xhttp.LastModified, lastModified)
w.Header().Del(xhttp.ContentType)
if _, ok := opts.ObjectAttributes[xhttp.Checksum]; ok {
chkSums := objInfo.decryptChecksums(0, r.Header)
// AWS does not appear to append part number on this API call.
if len(chkSums) > 0 {
OA.Checksum = &objectAttributesChecksum{
ChecksumCRC32: strings.Split(chkSums["CRC32"], "-")[0],
ChecksumCRC32C: strings.Split(chkSums["CRC32C"], "-")[0],
ChecksumSHA1: strings.Split(chkSums["SHA1"], "-")[0],
ChecksumSHA256: strings.Split(chkSums["SHA256"], "-")[0],
}
}
}
if _, ok := opts.ObjectAttributes[xhttp.ETag]; ok {
OA.ETag = objInfo.ETag
}
if _, ok := opts.ObjectAttributes[xhttp.ObjectSize]; ok {
OA.ObjectSize, _ = objInfo.GetActualSize()
}
if _, ok := opts.ObjectAttributes[xhttp.StorageClass]; ok {
OA.StorageClass = filterStorageClass(ctx, objInfo.StorageClass)
}
objInfo.decryptPartsChecksums(r.Header)
if _, ok := opts.ObjectAttributes[xhttp.ObjectParts]; ok {
OA.ObjectParts = new(objectAttributesParts)
OA.ObjectParts.PartNumberMarker = opts.PartNumberMarker
OA.ObjectParts.MaxParts = opts.MaxParts
partsLength := len(objInfo.Parts)
OA.ObjectParts.PartsCount = partsLength
if opts.MaxParts > -1 {
for i, v := range objInfo.Parts {
if v.Number <= opts.PartNumberMarker {
continue
}
if len(OA.ObjectParts.Parts) == opts.MaxParts {
break
}
OA.ObjectParts.NextPartNumberMarker = v.Number
OA.ObjectParts.Parts = append(OA.ObjectParts.Parts, &objectAttributesPart{
ChecksumSHA1: objInfo.Parts[i].Checksums["SHA1"],
ChecksumSHA256: objInfo.Parts[i].Checksums["SHA256"],
ChecksumCRC32: objInfo.Parts[i].Checksums["CRC32"],
ChecksumCRC32C: objInfo.Parts[i].Checksums["CRC32C"],
PartNumber: objInfo.Parts[i].Number,
Size: objInfo.Parts[i].Size,
})
}
}
if OA.ObjectParts.NextPartNumberMarker != partsLength {
OA.ObjectParts.IsTruncated = true
}
}
writeSuccessResponseXML(w, encodeResponse(OA))
sendEvent(eventArgs{
EventName: event.ObjectAccessedAttributes,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
return
}
// GetObjectHandler - GET Object
// ----------
// This implementation of the GET operation retrieves object. To use GET,
// you must have READ access to the object.
func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "GetObject")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if !globalAPIConfig.shouldGzipObjects() {
w.Header().Set(gzhttp.HeaderNoCompression, "true")
}
if r.Header.Get(xMinIOExtract) == "true" && strings.Contains(object, archivePattern) {
api.getObjectInArchiveFileHandler(ctx, objectAPI, bucket, object, w, r)
} else {
api.getObjectHandler(ctx, objectAPI, bucket, object, w, r)
}
}
func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI ObjectLayer, bucket, object string, w http.ResponseWriter, r *http.Request) {
if crypto.S3.IsRequested(r.Header) || crypto.S3KMS.IsRequested(r.Header) { // If SSE-S3 or SSE-KMS present -> AWS fails with undefined error
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrBadRequest))
return
}
getObjectInfo := objectAPI.GetObjectInfo
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
return
}
// Check for auth type to return S3 compatible error.
// type to return the correct error (NoSuchKey vs AccessDenied)
if s3Error := authenticateRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone {
if getRequestAuthType(r) == authTypeAnonymous {
// As per "Permission" section in
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectHEAD.html
// If the object you request does not exist,
// the error Amazon S3 returns depends on
// whether you also have the s3:ListBucket
// permission.
// * If you have the s3:ListBucket permission
// on the bucket, Amazon S3 will return an
// HTTP status code 404 ("no such key")
// error.
// * if you dont have the s3:ListBucket
// permission, Amazon S3 will return an HTTP
// status code 403 ("access denied") error.`
if globalPolicySys.IsAllowed(policy.BucketPolicyArgs{
Action: policy.ListBucketAction,
BucketName: bucket,
ConditionValues: getConditionValues(r, "", auth.AnonymousCredentials),
IsOwner: false,
}) {
getObjectInfo := objectAPI.GetObjectInfo
_, err = getObjectInfo(ctx, bucket, object, opts)
if toAPIError(ctx, err).Code == "NoSuchKey" {
s3Error = ErrNoSuchKey
}
}
}
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(s3Error))
return
}
// Get request range.
var rs *HTTPRangeSpec
rangeHeader := r.Header.Get(xhttp.Range)
if rangeHeader != "" {
rs, _ = parseRequestRangeSpec(rangeHeader)
}
if rangeHeader != "" {
// Both 'Range' and 'partNumber' cannot be specified at the same time
if opts.PartNumber > 0 {
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRangePartNumber))
return
}
if rs, err = parseRequestRangeSpec(rangeHeader); err != nil {
// Handle only errInvalidRange. Ignore other
// parse error and treat it as regular Get
// request like Amazon S3.
if errors.Is(err, errInvalidRange) {
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRange))
return
}
}
}
cachedResult := globalCacheConfig.Enabled() && opts.VersionID == ""
if _, ok := crypto.IsRequested(r.Header); ok {
// No need to check cache for encrypted objects.
cachedResult = false
}
if cachedResult {
rc := &cache.CondCheck{}
h := r.Header.Clone()
if opts.PartNumber > 0 {
h.Set(xhttp.PartNumber, strconv.Itoa(opts.PartNumber))
}
rc.Init(bucket, object, h)
ci, _ := globalCacheConfig.Get(rc)
if ci != nil {
tgs, ok := ci.Metadata[xhttp.AmzObjectTagging]
if ok {
// Set this such that authorization policies can be applied on the object tags.
r.Header.Set(xhttp.AmzObjectTagging, tgs)
}
if s3Error := authorizeRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone {
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(s3Error))
return
}
okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent ||
ci.StatusCode == http.StatusPreconditionFailed || ci.StatusCode == http.StatusNotModified)
if okSt {
ci.WriteHeaders(w, func() {
// set common headers
setCommonHeaders(w)
}, func() {
okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent)
if okSt {
for k, v := range ci.Metadata {
w.Header().Set(k, v)
}
// For providing ranged content
start, rangeLen, err := rs.GetOffsetLength(ci.Size)
if err != nil {
start, rangeLen = 0, ci.Size
}
if opts.PartNumber > 0 && strings.Contains(ci.ETag, "-") {
w.Header()[xhttp.AmzMpPartsCount] = []string{
strings.TrimLeftFunc(ci.ETag, func(r rune) bool {
return !unicode.IsNumber(r)
}),
}
}
// Set content length for the range.
w.Header().Set(xhttp.ContentLength, strconv.FormatInt(rangeLen, 10))
if rs != nil {
contentRange := fmt.Sprintf("bytes %d-%d/%d", start, start+rangeLen-1, ci.Size)
w.Header().Set(xhttp.ContentRange, contentRange)
}
return
}
if ci.StatusCode == http.StatusPreconditionFailed {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrPreconditionFailed), r.URL)
return
}
w.WriteHeader(ci.StatusCode)
})
return
}
}
}
opts.FastGetObjInfo = true
objInfo, err := getObjectInfo(ctx, bucket, object, opts)
var proxy proxyResult
if err != nil && !objInfo.DeleteMarker && (isErrObjectNotFound(err) || isErrVersionNotFound(err) || isErrReadQuorum(err)) {
// proxy HEAD to replication target if active-active replication configured on bucket
proxytgts := getProxyTargets(ctx, bucket, object, opts)
if !proxytgts.Empty() {
globalReplicationStats.Load().incProxy(bucket, headObjectAPI, false)
var oi ObjectInfo
oi, proxy = proxyHeadToReplicationTarget(ctx, bucket, object, rs, opts, proxytgts)
if proxy.Proxy {
objInfo = oi
}
if proxy.Err != nil {
globalReplicationStats.Load().incProxy(bucket, headObjectAPI, true)
writeErrorResponseHeadersOnly(w, toAPIError(ctx, proxy.Err))
return
}
}
}
if objInfo.UserTags != "" {
// Set this such that authorization policies can be applied on the object tags.
r.Header.Set(xhttp.AmzObjectTagging, objInfo.UserTags)
}
if s3Error := authorizeRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone {
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(s3Error))
return
}
if err != nil && !proxy.Proxy {
switch {
case !objInfo.VersionPurgeStatus.Empty():
w.Header()[xhttp.MinIODeleteReplicationStatus] = []string{string(objInfo.VersionPurgeStatus)}
case !objInfo.ReplicationStatus.Empty() && objInfo.DeleteMarker:
w.Header()[xhttp.MinIODeleteMarkerReplicationStatus] = []string{string(objInfo.ReplicationStatus)}
}
// Versioning enabled quite possibly object is deleted might be delete-marker
// if present set the headers, no idea why AWS S3 sets these headers.
if objInfo.VersionID != "" && objInfo.DeleteMarker {
w.Header()[xhttp.AmzVersionID] = []string{objInfo.VersionID}
w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(objInfo.DeleteMarker)}
}
QueueReplicationHeal(ctx, bucket, objInfo, 0)
// do an additional verification whether object exists when object is deletemarker and request
// is from replication
if opts.CheckDMReplicationReady {
topts := opts
topts.VersionID = ""
goi, gerr := getObjectInfo(ctx, bucket, object, topts)
if gerr == nil || goi.VersionID != "" { // object layer returned more info because object is deleted
w.Header().Set(xhttp.MinIOTargetReplicationReady, "true")
}
}
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
return
}
if !proxy.Proxy { // apply lifecycle rules only locally not for proxied requests
// Automatically remove the object/version if an expiry lifecycle rule can be applied
if lc, err := globalLifecycleSys.Get(bucket); err == nil {
rcfg, err := globalBucketObjectLockSys.Get(bucket)
if err != nil {
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
return
}
replcfg, err := getReplicationConfig(ctx, bucket)
if err != nil {
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
return
}
event := evalActionFromLifecycle(ctx, *lc, rcfg, replcfg, objInfo)
if event.Action.Delete() {
// apply whatever the expiry rule is.
applyExpiryRule(event, lcEventSrc_s3HeadObject, objInfo)
if !event.Action.DeleteRestored() {
// If the ILM action is not on restored object return error.
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrNoSuchKey))
return
}
}
}
QueueReplicationHeal(ctx, bucket, objInfo, 0)
}
// filter object lock metadata if permission does not permit
getRetPerms := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object)
legalHoldPerms := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object)
// filter object lock metadata if permission does not permit
objInfo.UserDefined = objectlock.FilterObjectLockMetadata(objInfo.UserDefined, getRetPerms != ErrNone, legalHoldPerms != ErrNone)
if _, err = DecryptObjectInfo(&objInfo, r); err != nil {
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
return
}
// Validate pre-conditions if any.
if checkPreconditions(ctx, w, r, objInfo, opts) {
return
}
// Set encryption response headers
switch kind, _ := crypto.IsEncrypted(objInfo.UserDefined); kind {
case crypto.S3:
w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionAES)
case crypto.S3KMS:
w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionKMS)
w.Header().Set(xhttp.AmzServerSideEncryptionKmsID, objInfo.KMSKeyID())
if kmsCtx, ok := objInfo.UserDefined[crypto.MetaContext]; ok {
w.Header().Set(xhttp.AmzServerSideEncryptionKmsContext, kmsCtx)
}
case crypto.SSEC:
// Validate the SSE-C Key set in the header.
if _, err = crypto.SSEC.UnsealObjectKey(r.Header, objInfo.UserDefined, bucket, object); err != nil {
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
return
}
w.Header().Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerAlgorithm))
w.Header().Set(xhttp.AmzServerSideEncryptionCustomerKeyMD5, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerKeyMD5))
}
if r.Header.Get(xhttp.AmzChecksumMode) == "ENABLED" && rs == nil {
// AWS S3 silently drops checksums on range requests.
hash.AddChecksumHeader(w, objInfo.decryptChecksums(opts.PartNumber, r.Header))
}
// Set standard object headers.
if err = setObjectHeaders(ctx, w, objInfo, rs, opts); err != nil {
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
return
}
// Set Parts Count Header
if opts.PartNumber > 0 && len(objInfo.Parts) > 0 {
setPartsCountHeaders(w, objInfo)
}
// Set any additional requested response headers.
setHeadGetRespHeaders(w, r.Form)
// Successful response.
if rs != nil || opts.PartNumber > 0 {
w.WriteHeader(http.StatusPartialContent)
} else {
w.WriteHeader(http.StatusOK)
}
// Notify object accessed via a HEAD request.
sendEvent(eventArgs{
EventName: event.ObjectAccessedHead,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
}
// GetObjectAttributesHandles - GET Object
// -----------
// This operation retrieves metadata and part metadata from an object without returning the object itself.
func (api objectAPIHandlers) GetObjectAttributesHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "GetObjectAttributes")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrServerNotInitialized))
return
}
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
api.getObjectAttributesHandler(ctx, objectAPI, bucket, object, w, r)
}
// HeadObjectHandler - HEAD Object
// -----------
// The HEAD operation retrieves metadata from an object without returning the object itself.
func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "HeadObject")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrServerNotInitialized))
return
}
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if r.Header.Get(xMinIOExtract) == "true" && strings.Contains(object, archivePattern) {
api.headObjectInArchiveFileHandler(ctx, objectAPI, bucket, object, w, r)
} else {
api.headObjectHandler(ctx, objectAPI, bucket, object, w, r)
}
}
// Extract metadata relevant for an CopyObject operation based on conditional
// header values specified in X-Amz-Metadata-Directive.
func getCpObjMetadataFromHeader(ctx context.Context, r *http.Request, userMeta map[string]string) (map[string]string, error) {
// Make a copy of the supplied metadata to avoid
// to change the original one.
defaultMeta := make(map[string]string, len(userMeta))
for k, v := range userMeta {
// skip tier metadata when copying metadata from source object
switch k {
case metaTierName, metaTierStatus, metaTierObjName, metaTierVersionID:
continue
}
defaultMeta[k] = v
}
// remove SSE Headers from source info
crypto.RemoveSSEHeaders(defaultMeta)
// Storage class is special, it can be replaced regardless of the
// metadata directive, if set should be preserved and replaced
// to the destination metadata.
sc := r.Header.Get(xhttp.AmzStorageClass)
if sc == "" {
sc = r.Form.Get(xhttp.AmzStorageClass)
}
// if x-amz-metadata-directive says REPLACE then
// we extract metadata from the input headers.
if isDirectiveReplace(r.Header.Get(xhttp.AmzMetadataDirective)) {
emetadata, err := extractMetadataFromReq(ctx, r)
if err != nil {
return nil, err
}
if sc != "" {
emetadata[xhttp.AmzStorageClass] = sc
}
return emetadata, nil
}
if sc != "" {
defaultMeta[xhttp.AmzStorageClass] = sc
}
// if x-amz-metadata-directive says COPY then we
// return the default metadata.
if isDirectiveCopy(r.Header.Get(xhttp.AmzMetadataDirective)) {
return defaultMeta, nil
}
// Copy is default behavior if not x-amz-metadata-directive is set.
return defaultMeta, nil
}
// getRemoteInstanceTransport contains a roundtripper for external (not peers) servers
var remoteInstanceTransport atomic.Value
func setRemoteInstanceTransport(tr http.RoundTripper) {
remoteInstanceTransport.Store(tr)
}
func getRemoteInstanceTransport() http.RoundTripper {
rt, ok := remoteInstanceTransport.Load().(http.RoundTripper)
if ok {
return rt
}
return nil
}
// Returns a minio-go Client configured to access remote host described by destDNSRecord
// Applicable only in a federated deployment
var getRemoteInstanceClient = func(r *http.Request, host string) (*miniogo.Core, error) {
cred := getReqAccessCred(r, globalSite.Region())
// In a federated deployment, all the instances share config files
// and hence expected to have same credentials.
core, err := miniogo.NewCore(host, &miniogo.Options{
Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, ""),
Secure: globalIsTLS,
Transport: getRemoteInstanceTransport(),
})
if err != nil {
return nil, err
}
core.SetAppInfo("minio-federated", ReleaseTag)
return core, nil
}
// Check if the destination bucket is on a remote site, this code only gets executed
// when federation is enabled, ie when globalDNSConfig is non 'nil'.
//
// This function is similar to isRemoteCallRequired but specifically for COPY object API
// if destination and source are same we do not need to check for destination bucket
// to exist locally.
func isRemoteCopyRequired(ctx context.Context, srcBucket, dstBucket string, objAPI ObjectLayer) bool {
if srcBucket == dstBucket {
return false
}
return isRemoteCallRequired(ctx, dstBucket, objAPI)
}
// Check if the bucket is on a remote site, this code only gets executed when federation is enabled.
func isRemoteCallRequired(ctx context.Context, bucket string, objAPI ObjectLayer) bool {
if globalDNSConfig == nil {
return false
}
if globalBucketFederation {
_, err := objAPI.GetBucketInfo(ctx, bucket, BucketOptions{})
return err == toObjectErr(errVolumeNotFound, bucket)
}
return false
}
// CopyObjectHandler - Copy Object
// ----------
// This implementation of the PUT operation adds an object to a bucket
// while reading the object from another source.
// Notice: The S3 client can send secret keys in headers for encryption related jobs,
// the handler should ensure to remove these keys before sending them to the object layer.
// Currently these keys are:
// - X-Amz-Server-Side-Encryption-Customer-Key
// - X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key
func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "CopyObject")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
vars := mux.Vars(r)
dstBucket := vars["bucket"]
dstObject, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectAction, dstBucket, dstObject); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
// Read escaped copy source path to check for parameters.
cpSrcPath := r.Header.Get(xhttp.AmzCopySource)
var vid string
if u, err := url.Parse(cpSrcPath); err == nil {
vid = strings.TrimSpace(u.Query().Get(xhttp.VersionID))
// Note that url.Parse does the unescaping
cpSrcPath = u.Path
}
srcBucket, srcObject := path2BucketObject(cpSrcPath)
// If source object is empty or bucket is empty, reply back invalid copy source.
if srcObject == "" || srcBucket == "" {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL)
return
}
if vid != "" && vid != nullVersionID {
_, err := uuid.Parse(vid)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, VersionNotFound{
Bucket: srcBucket,
Object: srcObject,
VersionID: vid,
}), r.URL)
return
}
}
if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectAction, srcBucket, srcObject); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
// Check if metadata directive is valid.
if !isDirectiveValid(r.Header.Get(xhttp.AmzMetadataDirective)) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMetadataDirective), r.URL)
return
}
// check if tag directive is valid
if !isDirectiveValid(r.Header.Get(xhttp.AmzTagDirective)) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidTagDirective), r.URL)
return
}
// Validate storage class metadata if present
dstSc := r.Header.Get(xhttp.AmzStorageClass)
if dstSc != "" && !storageclass.IsValid(dstSc) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidStorageClass), r.URL)
return
}
// Check if bucket encryption is enabled
sseConfig, _ := globalBucketSSEConfigSys.Get(dstBucket)
sseConfig.Apply(r.Header, sse.ApplyOptions{
AutoEncrypt: globalAutoEncryption,
})
var srcOpts, dstOpts ObjectOptions
srcOpts, err = copySrcOpts(ctx, r, srcBucket, srcObject)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
srcOpts.VersionID = vid
// convert copy src encryption options for GET calls
getOpts := ObjectOptions{
VersionID: srcOpts.VersionID,
Versioned: srcOpts.Versioned,
VersionSuspended: srcOpts.VersionSuspended,
ReplicationRequest: r.Header.Get(xhttp.MinIOSourceReplicationRequest) == "true",
}
getSSE := encrypt.SSE(srcOpts.ServerSideEncryption)
if getSSE != srcOpts.ServerSideEncryption {
getOpts.ServerSideEncryption = getSSE
}
dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, nil)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
getObjectNInfo := objectAPI.GetObjectNInfo
checkCopyPrecondFn := func(o ObjectInfo) bool {
if _, err := DecryptObjectInfo(&o, r); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return true
}
return checkCopyObjectPreconditions(ctx, w, r, o)
}
getOpts.CheckPrecondFn = checkCopyPrecondFn
if cpSrcDstSame {
getOpts.NoLock = true
}
var rs *HTTPRangeSpec
gr, err := getObjectNInfo(ctx, srcBucket, srcObject, rs, r.Header, getOpts)
if err != nil {
if isErrPreconditionFailed(err) {
return
}
// Versioning enabled quite possibly object is deleted might be delete-marker
// if present set the headers, no idea why AWS S3 sets these headers.
if gr != nil && gr.ObjInfo.VersionID != "" && gr.ObjInfo.DeleteMarker {
w.Header()[xhttp.AmzVersionID] = []string{gr.ObjInfo.VersionID}
w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(gr.ObjInfo.DeleteMarker)}
}
// Update context bucket & object names for correct S3 XML error response
reqInfo := logger.GetReqInfo(ctx)
reqInfo.BucketName = srcBucket
reqInfo.ObjectName = srcObject
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
defer gr.Close()
srcInfo := gr.ObjInfo
// maximum Upload size for object in a single CopyObject operation.
if isMaxObjectSize(srcInfo.Size) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL)
return
}
// We have to copy metadata only if source and destination are same.
// this changes for encryption which can be observed below.
if cpSrcDstSame {
srcInfo.metadataOnly = true
}
var chStorageClass bool
if dstSc != "" && dstSc != srcInfo.StorageClass {
chStorageClass = true
srcInfo.metadataOnly = false
} // no changes in storage-class expected so its a metadataonly operation.
var reader io.Reader = gr
// Set the actual size to the compressed/decrypted size if encrypted.
actualSize, err := srcInfo.GetActualSize()
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
length := actualSize
if !cpSrcDstSame {
if err := enforceBucketQuotaHard(ctx, dstBucket, actualSize); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
}
// Check if either the source is encrypted or the destination will be encrypted.
objectEncryption := crypto.Requested(r.Header)
objectEncryption = objectEncryption || crypto.IsSourceEncrypted(srcInfo.UserDefined)
var compressMetadata map[string]string
// No need to compress for remote etcd calls
// Pass the decompressed stream to such calls.
isDstCompressed := isCompressible(r.Header, dstObject) &&
length > minCompressibleSize &&
!isRemoteCopyRequired(ctx, srcBucket, dstBucket, objectAPI) && !objectEncryption
if isDstCompressed {
compressMetadata = make(map[string]string, 2)
// Preserving the compression metadata.
compressMetadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2
compressMetadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(actualSize, 10)
reader = etag.NewReader(ctx, reader, nil, nil)
wantEncryption := crypto.Requested(r.Header)
s2c, cb := newS2CompressReader(reader, actualSize, wantEncryption)
dstOpts.IndexCB = cb
defer s2c.Close()
reader = etag.Wrap(s2c, reader)
length = -1
} else {
delete(srcInfo.UserDefined, ReservedMetadataPrefix+"compression")
delete(srcInfo.UserDefined, ReservedMetadataPrefix+"actual-size")
reader = gr
}
srcInfo.Reader, err = hash.NewReader(ctx, reader, length, "", "", actualSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
pReader := NewPutObjReader(srcInfo.Reader)
// Handle encryption
encMetadata := make(map[string]string)
// Encryption parameters not applicable for this object.
if _, ok := crypto.IsEncrypted(srcInfo.UserDefined); !ok && crypto.SSECopy.IsRequested(r.Header) {
writeErrorResponse(ctx, w, toAPIError(ctx, errInvalidEncryptionParameters), r.URL)
return
}
// Encryption parameters not present for this object.
if crypto.SSEC.IsEncrypted(srcInfo.UserDefined) && !crypto.SSECopy.IsRequested(r.Header) && r.Header.Get(xhttp.MinIOSourceReplicationRequest) != "true" {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidSSECustomerAlgorithm), r.URL)
return
}
var oldKey, newKey []byte
var newKeyID string
var kmsCtx kms.Context
var objEncKey crypto.ObjectKey
sseCopyKMS := crypto.S3KMS.IsEncrypted(srcInfo.UserDefined)
sseCopyS3 := crypto.S3.IsEncrypted(srcInfo.UserDefined)
sseCopyC := crypto.SSEC.IsEncrypted(srcInfo.UserDefined) && crypto.SSECopy.IsRequested(r.Header)
sseC := crypto.SSEC.IsRequested(r.Header)
sseS3 := crypto.S3.IsRequested(r.Header)
sseKMS := crypto.S3KMS.IsRequested(r.Header)
isSourceEncrypted := sseCopyC || sseCopyS3 || sseCopyKMS
isTargetEncrypted := sseC || sseS3 || sseKMS
if sseC {
newKey, err = ParseSSECustomerRequest(r)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
}
if crypto.S3KMS.IsRequested(r.Header) {
newKeyID, kmsCtx, err = crypto.S3KMS.ParseHTTP(r.Header)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
}
// If src == dst and either
// - the object is encrypted using SSE-C and two different SSE-C keys are present
// - the object is encrypted using SSE-S3 and the SSE-S3 header is present
// - the object storage class is not changing
// then execute a key rotation.
if cpSrcDstSame && (sseCopyC && sseC) && !chStorageClass {
oldKey, err = ParseSSECopyCustomerRequest(r.Header, srcInfo.UserDefined)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
for k, v := range srcInfo.UserDefined {
if stringsHasPrefixFold(k, ReservedMetadataPrefixLower) {
encMetadata[k] = v
}
}
if err = rotateKey(ctx, oldKey, newKeyID, newKey, srcBucket, srcObject, encMetadata, kmsCtx); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Since we are rotating the keys, make sure to update the metadata.
srcInfo.metadataOnly = true
srcInfo.keyRotation = true
} else {
if isSourceEncrypted || isTargetEncrypted {
// We are not only copying just metadata instead
// we are creating a new object at this point, even
// if source and destination are same objects.
if !srcInfo.keyRotation {
srcInfo.metadataOnly = false
}
}
// Calculate the size of the target object
var targetSize int64
switch {
case isDstCompressed:
targetSize = -1
case !isSourceEncrypted && !isTargetEncrypted:
targetSize, _ = srcInfo.GetActualSize()
case isSourceEncrypted && isTargetEncrypted:
objInfo := ObjectInfo{Size: actualSize}
targetSize = objInfo.EncryptedSize()
case !isSourceEncrypted && isTargetEncrypted:
targetSize = srcInfo.EncryptedSize()
case isSourceEncrypted && !isTargetEncrypted:
targetSize, _ = srcInfo.DecryptedSize()
}
if isTargetEncrypted {
var encReader io.Reader
kind, _ := crypto.IsRequested(r.Header)
encReader, objEncKey, err = newEncryptReader(ctx, srcInfo.Reader, kind, newKeyID, newKey, dstBucket, dstObject, encMetadata, kmsCtx)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
reader = etag.Wrap(encReader, srcInfo.Reader)
}
if isSourceEncrypted {
// Remove all source encrypted related metadata to
// avoid copying them in target object.
crypto.RemoveInternalEntries(srcInfo.UserDefined)
}
// do not try to verify encrypted content
srcInfo.Reader, err = hash.NewReader(ctx, reader, targetSize, "", "", actualSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if isTargetEncrypted {
pReader, err = pReader.WithEncryption(srcInfo.Reader, &objEncKey)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if dstOpts.IndexCB != nil {
dstOpts.IndexCB = compressionIndexEncrypter(objEncKey, dstOpts.IndexCB)
}
}
}
srcInfo.PutObjReader = pReader
srcInfo.UserDefined, err = getCpObjMetadataFromHeader(ctx, r, srcInfo.UserDefined)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
objTags := srcInfo.UserTags
// If x-amz-tagging-directive header is REPLACE, get passed tags.
if isDirectiveReplace(r.Header.Get(xhttp.AmzTagDirective)) {
objTags = r.Header.Get(xhttp.AmzObjectTagging)
if _, err := tags.ParseObjectTags(objTags); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
}
if objTags != "" {
lastTaggingTimestamp := srcInfo.UserDefined[ReservedMetadataPrefixLower+TaggingTimestamp]
if dstOpts.ReplicationRequest {
srcTimestamp := dstOpts.ReplicationSourceTaggingTimestamp
if !srcTimestamp.IsZero() {
ondiskTimestamp, err := time.Parse(time.RFC3339Nano, lastTaggingTimestamp)
// update tagging metadata only if replica timestamp is newer than what's on disk
if err != nil || (err == nil && ondiskTimestamp.Before(srcTimestamp)) {
srcInfo.UserDefined[ReservedMetadataPrefixLower+TaggingTimestamp] = srcTimestamp.UTC().Format(time.RFC3339Nano)
srcInfo.UserDefined[xhttp.AmzObjectTagging] = objTags
}
}
} else {
srcInfo.UserDefined[xhttp.AmzObjectTagging] = objTags
srcInfo.UserDefined[ReservedMetadataPrefixLower+TaggingTimestamp] = UTCNow().Format(time.RFC3339Nano)
}
}
srcInfo.UserDefined = filterReplicationStatusMetadata(srcInfo.UserDefined)
srcInfo.UserDefined = objectlock.FilterObjectLockMetadata(srcInfo.UserDefined, true, true)
retPerms := isPutActionAllowed(ctx, getRequestAuthType(r), dstBucket, dstObject, r, policy.PutObjectRetentionAction)
holdPerms := isPutActionAllowed(ctx, getRequestAuthType(r), dstBucket, dstObject, r, policy.PutObjectLegalHoldAction)
getObjectInfo := objectAPI.GetObjectInfo
// apply default bucket configuration/governance headers for dest side.
retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, dstBucket, dstObject, getObjectInfo, retPerms, holdPerms)
if s3Err == ErrNone && retentionMode.Valid() {
lastretentionTimestamp := srcInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp]
if dstOpts.ReplicationRequest {
srcTimestamp := dstOpts.ReplicationSourceRetentionTimestamp
if !srcTimestamp.IsZero() {
ondiskTimestamp, err := time.Parse(time.RFC3339Nano, lastretentionTimestamp)
// update retention metadata only if replica timestamp is newer than what's on disk
if err != nil || (err == nil && ondiskTimestamp.Before(srcTimestamp)) {
srcInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode)
srcInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = amztime.ISO8601Format(retentionDate.UTC())
srcInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp] = srcTimestamp.UTC().Format(time.RFC3339Nano)
}
}
} else {
srcInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode)
srcInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = amztime.ISO8601Format(retentionDate.UTC())
srcInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp] = UTCNow().Format(time.RFC3339Nano)
}
}
if s3Err == ErrNone && legalHold.Status.Valid() {
lastLegalHoldTimestamp := srcInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockLegalHoldTimestamp]
if dstOpts.ReplicationRequest {
srcTimestamp := dstOpts.ReplicationSourceLegalholdTimestamp
if !srcTimestamp.IsZero() {
ondiskTimestamp, err := time.Parse(time.RFC3339Nano, lastLegalHoldTimestamp)
// update legalhold metadata only if replica timestamp is newer than what's on disk
if err != nil || (err == nil && ondiskTimestamp.Before(srcTimestamp)) {
srcInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = string(legalHold.Status)
srcInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp] = srcTimestamp.Format(time.RFC3339Nano)
}
}
} else {
srcInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = string(legalHold.Status)
}
}
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
}
if rs := r.Header.Get(xhttp.AmzBucketReplicationStatus); rs != "" {
srcInfo.UserDefined[ReservedMetadataPrefixLower+ReplicaStatus] = replication.Replica.String()
srcInfo.UserDefined[ReservedMetadataPrefixLower+ReplicaTimestamp] = UTCNow().Format(time.RFC3339Nano)
srcInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = rs
}
op := replication.ObjectReplicationType
if srcInfo.metadataOnly {
op = replication.MetadataReplicationType
}
if dsc := mustReplicate(ctx, dstBucket, dstObject, srcInfo.getMustReplicateOptions(op, dstOpts)); dsc.ReplicateAny() {
srcInfo.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
srcInfo.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
}
// Store the preserved compression metadata.
for k, v := range compressMetadata {
srcInfo.UserDefined[k] = v
}
// We need to preserve the encryption headers set in EncryptRequest,
// so we do not want to override them, copy them instead.
for k, v := range encMetadata {
srcInfo.UserDefined[k] = v
}
// Ensure that metadata does not contain sensitive information
crypto.RemoveSensitiveEntries(srcInfo.UserDefined)
// If we see legacy source, metadataOnly we have to overwrite the content.
if srcInfo.Legacy {
srcInfo.metadataOnly = false
}
// Check if x-amz-metadata-directive or x-amz-tagging-directive was not set to REPLACE and source,
// destination are same objects. Apply this restriction also when
// metadataOnly is true indicating that we are not overwriting the object.
// if encryption is enabled we do not need explicit "REPLACE" metadata to
// be enabled as well - this is to allow for key-rotation.
if !isDirectiveReplace(r.Header.Get(xhttp.AmzMetadataDirective)) && !isDirectiveReplace(r.Header.Get(xhttp.AmzTagDirective)) &&
srcInfo.metadataOnly && srcOpts.VersionID == "" && !objectEncryption {
// If x-amz-metadata-directive is not set to REPLACE then we need
// to error out if source and destination are same.
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopyDest), r.URL)
return
}
remoteCallRequired := isRemoteCopyRequired(ctx, srcBucket, dstBucket, objectAPI)
var objInfo ObjectInfo
var os *objSweeper
if remoteCallRequired {
var dstRecords []dns.SrvRecord
dstRecords, err = globalDNSConfig.Get(dstBucket)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Send PutObject request to appropriate instance (in federated deployment)
core, rerr := getRemoteInstanceClient(r, getHostFromSrv(dstRecords))
if rerr != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, rerr), r.URL)
return
}
tag, err := tags.ParseObjectTags(objTags)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Remove the metadata for remote calls.
delete(srcInfo.UserDefined, ReservedMetadataPrefix+"compression")
delete(srcInfo.UserDefined, ReservedMetadataPrefix+"actual-size")
opts := miniogo.PutObjectOptions{
UserMetadata: srcInfo.UserDefined,
ServerSideEncryption: dstOpts.ServerSideEncryption,
UserTags: tag.ToMap(),
}
remoteObjInfo, rerr := core.PutObject(ctx, dstBucket, dstObject, srcInfo.Reader,
srcInfo.Size, "", "", opts)
if rerr != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, rerr), r.URL)
return
}
objInfo.UserDefined = cloneMSS(opts.UserMetadata)
objInfo.ETag = remoteObjInfo.ETag
objInfo.ModTime = remoteObjInfo.LastModified
} else {
os = newObjSweeper(dstBucket, dstObject).WithVersioning(dstOpts.Versioned, dstOpts.VersionSuspended)
// Get appropriate object info to identify the remote object to delete
if !srcInfo.metadataOnly {
goiOpts := os.GetOpts()
if !globalTierConfigMgr.Empty() {
if goi, gerr := getObjectInfo(ctx, dstBucket, dstObject, goiOpts); gerr == nil {
os.SetTransitionState(goi.TransitionedObject)
}
}
}
copyObjectFn := objectAPI.CopyObject
// Copy source object to destination, if source and destination
// object is same then only metadata is updated.
objInfo, err = copyObjectFn(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
}
origETag := objInfo.ETag
objInfo.ETag = getDecryptedETag(r.Header, objInfo, false)
response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime)
encodedSuccessResponse := encodeResponse(response)
if dsc := mustReplicate(ctx, dstBucket, dstObject, objInfo.getMustReplicateOptions(replication.ObjectReplicationType, dstOpts)); dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.ObjectReplicationType)
}
setPutObjHeaders(w, objInfo, false, r.Header)
// We must not use the http.Header().Set method here because some (broken)
// clients expect the x-amz-copy-source-version-id header key to be literally
// "x-amz-copy-source-version-id"- not in canonicalized form, preserve it.
if srcOpts.VersionID != "" {
w.Header()[strings.ToLower(xhttp.AmzCopySourceVersionID)] = []string{srcOpts.VersionID}
}
// Write success response.
writeSuccessResponseXML(w, encodedSuccessResponse)
// Notify object created event.
sendEvent(eventArgs{
EventName: event.ObjectCreatedCopy,
BucketName: dstBucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
if !remoteCallRequired && !globalTierConfigMgr.Empty() {
// Schedule object for immediate transition if eligible.
objInfo.ETag = origETag
enqueueTransitionImmediate(objInfo, lcEventSrc_s3CopyObject)
// Remove the transitioned object whose object version is being overwritten.
os.Sweep()
}
}
// PutObjectHandler - PUT Object
// ----------
// This implementation of the PUT operation adds an object to a bucket.
// Notice: The S3 client can send secret keys in headers for encryption related jobs,
// the handler should ensure to remove these keys before sending them to the object layer.
// Currently these keys are:
// - X-Amz-Server-Side-Encryption-Customer-Key
// - X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key
func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "PutObject")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// X-Amz-Copy-Source shouldn't be set for this call.
if _, ok := r.Header[xhttp.AmzCopySource]; ok {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL)
return
}
// Validate storage class metadata if present
if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" {
if !storageclass.IsValid(sc) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidStorageClass), r.URL)
return
}
}
clientETag, err := etag.FromContentMD5(r.Header)
if err != nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidDigest), r.URL)
return
}
// if Content-Length is unknown/missing, deny the request
size := r.ContentLength
rAuthType := getRequestAuthType(r)
switch rAuthType {
// Check signature types that must have content length
case authTypeStreamingSigned, authTypeStreamingSignedTrailer, authTypeStreamingUnsignedTrailer:
if sizeStr, ok := r.Header[xhttp.AmzDecodedContentLength]; ok {
if sizeStr[0] == "" {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL)
return
}
size, err = strconv.ParseInt(sizeStr[0], 10, 64)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
}
}
if size == -1 {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL)
return
}
// maximum Upload size for objects in a single operation
if isMaxObjectSize(size) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL)
return
}
metadata, err := extractMetadataFromReq(ctx, r)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if objTags := r.Header.Get(xhttp.AmzObjectTagging); objTags != "" {
if _, err := tags.ParseObjectTags(objTags); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
metadata[xhttp.AmzObjectTagging] = objTags
}
var (
md5hex = clientETag.String()
sha256hex = ""
rd io.Reader = r.Body
s3Err APIErrorCode
putObject = objectAPI.PutObject
)
// Check if put is allowed
if s3Err = isPutActionAllowed(ctx, rAuthType, bucket, object, r, policy.PutObjectAction); s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
}
switch rAuthType {
case authTypeStreamingSigned, authTypeStreamingSignedTrailer:
// Initialize stream signature verifier.
rd, s3Err = newSignV4ChunkedReader(r, rAuthType == authTypeStreamingSignedTrailer)
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
}
case authTypeStreamingUnsignedTrailer:
// Initialize stream chunked reader with optional trailers.
rd, s3Err = newUnsignedV4ChunkedReader(r, true)
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
}
case authTypeSignedV2, authTypePresignedV2:
s3Err = isReqAuthenticatedV2(r)
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
}
case authTypePresigned, authTypeSigned:
if s3Err = reqSignatureV4Verify(r, globalSite.Region(), serviceS3); s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
}
if !skipContentSha256Cksum(r) {
sha256hex = getContentSha256Cksum(r, serviceS3)
}
}
if _, ok := r.Header[xhttp.MinIOSourceReplicationCheck]; ok {
// requests to just validate replication settings and permissions are not allowed to write data
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrReplicationPermissionCheckError), r.URL)
return
}
if err := enforceBucketQuotaHard(ctx, bucket, size); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() {
if s3Err = isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, policy.ReplicateObjectAction); s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
}
metadata[ReservedMetadataPrefixLower+ReplicaStatus] = replication.Replica.String()
metadata[ReservedMetadataPrefixLower+ReplicaTimestamp] = UTCNow().Format(time.RFC3339Nano)
defer globalReplicationStats.Load().UpdateReplicaStat(bucket, size)
}
// Check if bucket encryption is enabled
sseConfig, _ := globalBucketSSEConfigSys.Get(bucket)
sseConfig.Apply(r.Header, sse.ApplyOptions{
AutoEncrypt: globalAutoEncryption,
})
var buf *bytebufferpool.ByteBuffer
// Disable cache for encrypted objects - headers are applied with sseConfig.Apply if auto encrypted.
if globalCacheConfig.MatchesSize(size) && !crypto.Requested(r.Header) {
buf = bytebufferpool.Get()
defer bytebufferpool.Put(buf)
}
var reader io.Reader
reader = rd
if buf != nil {
reader = io.TeeReader(rd, buf)
}
actualSize := size
var idxCb func() []byte
if isCompressible(r.Header, object) && size > minCompressibleSize {
// Storing the compression metadata.
metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2
metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10)
actualReader, err := hash.NewReader(ctx, reader, size, md5hex, sha256hex, actualSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if err = actualReader.AddChecksum(r, false); err != nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidChecksum), r.URL)
return
}
// Set compression metrics.
var s2c io.ReadCloser
wantEncryption := crypto.Requested(r.Header)
s2c, idxCb = newS2CompressReader(actualReader, actualSize, wantEncryption)
defer s2c.Close()
reader = etag.Wrap(s2c, actualReader)
size = -1 // Since compressed size is un-predictable.
md5hex = "" // Do not try to verify the content.
sha256hex = ""
}
var forceMD5 []byte
// Optimization: If SSE-KMS and SSE-C did not request Content-Md5. Use uuid as etag. Optionally enable this also
// for server that is started with `--no-compat`.
if !etag.ContentMD5Requested(r.Header) && (crypto.S3KMS.IsRequested(r.Header) || crypto.SSEC.IsRequested(r.Header) || !globalServerCtxt.StrictS3Compat) {
forceMD5 = mustGetUUIDBytes()
}
hashReader, err := hash.NewReaderWithOpts(ctx, reader, hash.Options{
Size: size,
MD5Hex: md5hex,
SHA256Hex: sha256hex,
ActualSize: actualSize,
DisableMD5: false,
ForceMD5: forceMD5,
})
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if err := hashReader.AddChecksum(r, size < 0); err != nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidChecksum), r.URL)
return
}
rawReader := hashReader
pReader := NewPutObjReader(rawReader)
var opts ObjectOptions
opts, err = putOptsFromReq(ctx, r, bucket, object, metadata)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
opts.IndexCB = idxCb
if opts.PreserveETag != "" ||
r.Header.Get(xhttp.IfMatch) != "" ||
r.Header.Get(xhttp.IfNoneMatch) != "" {
opts.CheckPrecondFn = func(oi ObjectInfo) bool {
if _, err := DecryptObjectInfo(&oi, r); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return true
}
return checkPreconditionsPUT(ctx, w, r, oi, opts)
}
}
retPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, policy.PutObjectRetentionAction)
holdPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, policy.PutObjectLegalHoldAction)
getObjectInfo := objectAPI.GetObjectInfo
retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, getObjectInfo, retPerms, holdPerms)
if s3Err == ErrNone && retentionMode.Valid() {
metadata[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode)
metadata[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = amztime.ISO8601Format(retentionDate.UTC())
}
if s3Err == ErrNone && legalHold.Status.Valid() {
metadata[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = string(legalHold.Status)
}
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
}
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
metadata[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
metadata[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
}
var objectEncryptionKey crypto.ObjectKey
if crypto.Requested(r.Header) {
if crypto.SSECopy.IsRequested(r.Header) {
writeErrorResponse(ctx, w, toAPIError(ctx, errInvalidEncryptionParameters), r.URL)
return
}
if crypto.SSEC.IsRequested(r.Header) && crypto.S3.IsRequested(r.Header) {
writeErrorResponse(ctx, w, toAPIError(ctx, crypto.ErrIncompatibleEncryptionMethod), r.URL)
return
}
if crypto.SSEC.IsRequested(r.Header) && crypto.S3KMS.IsRequested(r.Header) {
writeErrorResponse(ctx, w, toAPIError(ctx, crypto.ErrIncompatibleEncryptionMethod), r.URL)
return
}
reader, objectEncryptionKey, err = EncryptRequest(hashReader, r, bucket, object, metadata)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
wantSize := int64(-1)
if size >= 0 {
info := ObjectInfo{Size: size}
wantSize = info.EncryptedSize()
}
// do not try to verify encrypted content
hashReader, err = hash.NewReader(ctx, etag.Wrap(reader, hashReader), wantSize, "", "", actualSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
pReader, err = pReader.WithEncryption(hashReader, &objectEncryptionKey)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if opts.IndexCB != nil {
opts.IndexCB = compressionIndexEncrypter(objectEncryptionKey, opts.IndexCB)
}
opts.EncryptFn = metadataEncrypter(objectEncryptionKey)
}
// Ensure that metadata does not contain sensitive information
crypto.RemoveSensitiveEntries(metadata)
os := newObjSweeper(bucket, object).WithVersioning(opts.Versioned, opts.VersionSuspended)
if !globalTierConfigMgr.Empty() {
// Get appropriate object info to identify the remote object to delete
goiOpts := os.GetOpts()
if goi, gerr := getObjectInfo(ctx, bucket, object, goiOpts); gerr == nil {
os.SetTransitionState(goi.TransitionedObject)
}
}
// Create the object..
objInfo, err := putObject(ctx, bucket, object, pReader, opts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if r.Header.Get(xMinIOExtract) == "true" && HasSuffix(object, archiveExt) {
opts := ObjectOptions{VersionID: objInfo.VersionID, MTime: objInfo.ModTime}
if _, err := updateObjectMetadataWithZipInfo(ctx, objectAPI, bucket, object, opts); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
}
origETag := objInfo.ETag
if kind, encrypted := crypto.IsEncrypted(objInfo.UserDefined); encrypted {
switch kind {
case crypto.S3:
w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionAES)
objInfo.ETag, _ = DecryptETag(objectEncryptionKey, ObjectInfo{ETag: objInfo.ETag})
case crypto.S3KMS:
w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionKMS)
w.Header().Set(xhttp.AmzServerSideEncryptionKmsID, objInfo.KMSKeyID())
if kmsCtx, ok := objInfo.UserDefined[crypto.MetaContext]; ok {
w.Header().Set(xhttp.AmzServerSideEncryptionKmsContext, kmsCtx)
}
if len(objInfo.ETag) >= 32 && strings.Count(objInfo.ETag, "-") != 1 {
objInfo.ETag = objInfo.ETag[len(objInfo.ETag)-32:]
}
case crypto.SSEC:
w.Header().Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerAlgorithm))
w.Header().Set(xhttp.AmzServerSideEncryptionCustomerKeyMD5, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerKeyMD5))
if len(objInfo.ETag) >= 32 && strings.Count(objInfo.ETag, "-") != 1 {
objInfo.ETag = objInfo.ETag[len(objInfo.ETag)-32:]
}
}
}
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.ObjectReplicationType)
}
setPutObjHeaders(w, objInfo, false, r.Header)
defer func() {
var data []byte
if buf != nil {
data = buf.Bytes()
}
if len(data) == 0 {
return
}
globalCacheConfig.Set(&cache.ObjectInfo{
Key: objInfo.Name,
Bucket: objInfo.Bucket,
ETag: objInfo.ETag,
ModTime: objInfo.ModTime,
Expires: objInfo.ExpiresStr(),
CacheControl: objInfo.CacheControl,
Size: int64(len(data)),
Metadata: cleanReservedKeys(objInfo.UserDefined),
Data: data,
})
}()
// Notify object created event.
evt := eventArgs{
EventName: event.ObjectCreatedPut,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
}
sendEvent(evt)
if objInfo.NumVersions > int(scannerExcessObjectVersions.Load()) {
evt.EventName = event.ObjectManyVersions
sendEvent(evt)
auditLogInternal(context.Background(), AuditLogOptions{
Event: "scanner:manyversions",
APIName: "PutObject",
Bucket: objInfo.Bucket,
Object: objInfo.Name,
VersionID: objInfo.VersionID,
Status: http.StatusText(http.StatusOK),
})
}
// Do not send checksums in events to avoid leaks.
hash.TransferChecksumHeader(w, r)
writeSuccessResponseHeadersOnly(w)
// Remove the transitioned object whose object version is being overwritten.
if !globalTierConfigMgr.Empty() {
// Schedule object for immediate transition if eligible.
objInfo.ETag = origETag
enqueueTransitionImmediate(objInfo, lcEventSrc_s3PutObject)
os.Sweep()
}
}
// PutObjectExtractHandler - PUT Object extract is an extended API
// based off from AWS Snowball feature to auto extract compressed
// stream will be extracted in the same directory it is stored in
// and the folder structures will be built out accordingly.
func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "PutObjectExtract")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
if crypto.S3KMS.IsRequested(r.Header) { // SSE-KMS is not supported
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// X-Amz-Copy-Source shouldn't be set for this call.
if _, ok := r.Header[xhttp.AmzCopySource]; ok {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL)
return
}
// Validate storage class metadata if present
sc := r.Header.Get(xhttp.AmzStorageClass)
if sc != "" {
if !storageclass.IsValid(sc) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidStorageClass), r.URL)
return
}
}
clientETag, err := etag.FromContentMD5(r.Header)
if err != nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidDigest), r.URL)
return
}
// if Content-Length is unknown/missing, deny the request
size := r.ContentLength
rAuthType := getRequestAuthType(r)
if rAuthType == authTypeStreamingSigned || rAuthType == authTypeStreamingSignedTrailer {
if sizeStr, ok := r.Header[xhttp.AmzDecodedContentLength]; ok {
if sizeStr[0] == "" {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL)
return
}
size, err = strconv.ParseInt(sizeStr[0], 10, 64)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
}
}
if size == -1 {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL)
return
}
// maximum Upload size for objects in a single operation
if isMaxObjectSize(size) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL)
return
}
var (
md5hex = clientETag.String()
sha256hex = ""
reader io.Reader = r.Body
s3Err APIErrorCode
putObject = objectAPI.PutObject
)
// Check if put is allowed
if s3Err = isPutActionAllowed(ctx, rAuthType, bucket, object, r, policy.PutObjectAction); s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
}
switch rAuthType {
case authTypeStreamingSigned, authTypeStreamingSignedTrailer:
// Initialize stream signature verifier.
reader, s3Err = newSignV4ChunkedReader(r, rAuthType == authTypeStreamingSignedTrailer)
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
}
case authTypeSignedV2, authTypePresignedV2:
s3Err = isReqAuthenticatedV2(r)
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
}
case authTypePresigned, authTypeSigned:
if s3Err = reqSignatureV4Verify(r, globalSite.Region(), serviceS3); s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
}
if !skipContentSha256Cksum(r) {
sha256hex = getContentSha256Cksum(r, serviceS3)
}
}
hreader, err := hash.NewReader(ctx, reader, size, md5hex, sha256hex, size)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if err = hreader.AddChecksum(r, false); err != nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidChecksum), r.URL)
return
}
if err := enforceBucketQuotaHard(ctx, bucket, size); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Check if bucket encryption is enabled
sseConfig, _ := globalBucketSSEConfigSys.Get(bucket)
sseConfig.Apply(r.Header, sse.ApplyOptions{
AutoEncrypt: globalAutoEncryption,
})
retPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, policy.PutObjectRetentionAction)
holdPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, policy.PutObjectLegalHoldAction)
getObjectInfo := objectAPI.GetObjectInfo
// These are static for all objects extracted.
reqParams := extractReqParams(r)
respElements := map[string]string{
"requestId": w.Header().Get(xhttp.AmzRequestID),
"nodeId": w.Header().Get(xhttp.AmzRequestHostID),
}
if sc == "" {
sc = storageclass.STANDARD
}
putObjectTar := func(reader io.Reader, info os.FileInfo, object string) error {
size := info.Size()
metadata := map[string]string{
xhttp.AmzStorageClass: sc, // save same storage-class as incoming stream.
}
actualSize := size
var idxCb func() []byte
if isCompressible(r.Header, object) && size > minCompressibleSize {
// Storing the compression metadata.
metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2
metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10)
actualReader, err := hash.NewReader(ctx, reader, size, "", "", actualSize)
if err != nil {
return err
}
// Set compression metrics.
wantEncryption := crypto.Requested(r.Header)
s2c, cb := newS2CompressReader(actualReader, actualSize, wantEncryption)
defer s2c.Close()
idxCb = cb
reader = etag.Wrap(s2c, actualReader)
size = -1 // Since compressed size is un-predictable.
}
hashReader, err := hash.NewReader(ctx, reader, size, "", "", actualSize)
if err != nil {
return err
}
rawReader := hashReader
pReader := NewPutObjReader(rawReader)
if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() {
if s3Err = isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, policy.ReplicateObjectAction); s3Err != ErrNone {
return err
}
metadata[ReservedMetadataPrefixLower+ReplicaStatus] = replication.Replica.String()
metadata[ReservedMetadataPrefixLower+ReplicaTimestamp] = UTCNow().Format(time.RFC3339Nano)
}
var (
versionID string
hdrs http.Header
)
if tarHdrs, ok := info.Sys().(*tar.Header); ok && len(tarHdrs.PAXRecords) > 0 {
versionID = tarHdrs.PAXRecords["minio.versionId"]
hdrs = make(http.Header)
for k, v := range tarHdrs.PAXRecords {
if k == "minio.versionId" {
continue
}
if strings.HasPrefix(k, "minio.metadata.") {
k = strings.TrimPrefix(k, "minio.metadata.")
hdrs.Set(k, v)
}
}
m, err := extractMetadata(ctx, textproto.MIMEHeader(hdrs))
if err != nil {
return err
}
for k, v := range m {
metadata[k] = v
}
} else {
versionID = r.Form.Get(xhttp.VersionID)
hdrs = r.Header
}
opts, err := putOpts(ctx, bucket, object, versionID, hdrs, metadata)
if err != nil {
return err
}
opts.MTime = info.ModTime()
if opts.MTime.Unix() <= 0 {
opts.MTime = UTCNow()
}
opts.IndexCB = idxCb
retentionMode, retentionDate, legalHold, s3err := checkPutObjectLockAllowed(ctx, r, bucket, object, getObjectInfo, retPerms, holdPerms)
if s3err == ErrNone && retentionMode.Valid() {
metadata[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode)
metadata[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = amztime.ISO8601Format(retentionDate.UTC())
}
if s3err == ErrNone && legalHold.Status.Valid() {
metadata[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = string(legalHold.Status)
}
if s3err != ErrNone {
s3Err = s3err
return ObjectLocked{}
}
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
metadata[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
metadata[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
}
var objectEncryptionKey crypto.ObjectKey
if crypto.Requested(r.Header) {
if crypto.SSECopy.IsRequested(r.Header) {
return errInvalidEncryptionParameters
}
reader, objectEncryptionKey, err = EncryptRequest(hashReader, r, bucket, object, metadata)
if err != nil {
return err
}
wantSize := int64(-1)
if size >= 0 {
info := ObjectInfo{Size: size}
wantSize = info.EncryptedSize()
}
// do not try to verify encrypted content
hashReader, err = hash.NewReader(ctx, etag.Wrap(reader, hashReader), wantSize, "", "", actualSize)
if err != nil {
return err
}
pReader, err = pReader.WithEncryption(hashReader, &objectEncryptionKey)
if err != nil {
return err
}
}
if opts.IndexCB != nil {
opts.IndexCB = compressionIndexEncrypter(objectEncryptionKey, opts.IndexCB)
}
// Ensure that metadata does not contain sensitive information
crypto.RemoveSensitiveEntries(metadata)
os := newObjSweeper(bucket, object).WithVersioning(opts.Versioned, opts.VersionSuspended)
if !globalTierConfigMgr.Empty() {
// Get appropriate object info to identify the remote object to delete
goiOpts := os.GetOpts()
if goi, gerr := getObjectInfo(ctx, bucket, object, goiOpts); gerr == nil {
os.SetTransitionState(goi.TransitionedObject)
}
}
// Create the object..
objInfo, err := putObject(ctx, bucket, object, pReader, opts)
if err != nil {
return err
}
origETag := objInfo.ETag
objInfo.ETag = getDecryptedETag(r.Header, objInfo, false)
if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.ObjectReplicationType)
}
// Notify object created event.
evt := eventArgs{
EventName: event.ObjectCreatedPut,
BucketName: bucket,
Object: objInfo,
ReqParams: reqParams,
RespElements: respElements,
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
}
sendEvent(evt)
// Remove the transitioned object whose object version is being overwritten.
if !globalTierConfigMgr.Empty() {
objInfo.ETag = origETag
// Schedule object for immediate transition if eligible.
enqueueTransitionImmediate(objInfo, lcEventSrc_s3PutObject)
os.Sweep()
}
return nil
}
var opts untarOptions
opts.ignoreDirs = strings.EqualFold(r.Header.Get(xhttp.MinIOSnowballIgnoreDirs), "true")
opts.ignoreErrs = strings.EqualFold(r.Header.Get(xhttp.MinIOSnowballIgnoreErrors), "true")
opts.prefixAll = r.Header.Get(xhttp.MinIOSnowballPrefix)
if opts.prefixAll != "" {
opts.prefixAll = trimLeadingSlash(pathJoin(opts.prefixAll, slashSeparator))
}
if err = untar(ctx, hreader, putObjectTar, opts); err != nil {
apiErr := errorCodes.ToAPIErr(s3Err)
// If not set, convert or use BadRequest
if s3Err == ErrNone {
apiErr = toAPIError(ctx, err)
if apiErr.Code == "InternalError" {
// Convert generic internal errors to bad requests.
apiErr = APIError{
Code: "BadRequest",
Description: err.Error(),
HTTPStatusCode: http.StatusBadRequest,
}
}
}
writeErrorResponse(ctx, w, apiErr, r.URL)
return
}
w.Header()[xhttp.ETag] = []string{`"` + hex.EncodeToString(hreader.MD5Current()) + `"`}
hash.TransferChecksumHeader(w, r)
writeSuccessResponseHeadersOnly(w)
}
// Delete objectAPIHandlers
// DeleteObjectHandler - delete an object
func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "DeleteObject")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
if s3Error := checkRequestAuthType(ctx, r, policy.DeleteObjectAction, bucket, object); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
if _, ok := r.Header[xhttp.MinIOSourceReplicationCheck]; ok {
// requests to just validate replication settings and permissions are not allowed to delete data
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrReplicationPermissionCheckError), r.URL)
return
}
replica := r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String()
if replica {
if s3Error := checkRequestAuthType(ctx, r, policy.ReplicateDeleteAction, bucket, object); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
}
if globalDNSConfig != nil {
_, err := globalDNSConfig.Get(bucket)
if err != nil && err != dns.ErrNotImplemented {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
}
opts, err := delOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
rcfg, _ := globalBucketObjectLockSys.Get(bucket)
if rcfg.LockEnabled && opts.DeletePrefix {
apiErr := toAPIError(ctx, errInvalidArgument)
apiErr.Description = "force-delete is forbidden on Object Locking enabled buckets"
writeErrorResponse(ctx, w, apiErr, r.URL)
return
}
os := newObjSweeper(bucket, object).WithVersion(opts.VersionID).WithVersioning(opts.Versioned, opts.VersionSuspended)
opts.SetEvalMetadataFn(func(oi *ObjectInfo, gerr error) (dsc ReplicateDecision, err error) {
if replica { // no need to check replication on receiver
return dsc, nil
}
dsc = checkReplicateDelete(ctx, bucket, ObjectToDelete{
ObjectV: ObjectV{
ObjectName: object,
VersionID: opts.VersionID,
},
}, *oi, opts, gerr)
// Mutations of objects on versioning suspended buckets
// affect its null version. Through opts below we select
// the null version's remote object to delete if
// transitioned.
if gerr == nil {
os.SetTransitionState(oi.TransitionedObject)
}
return dsc, nil
})
vID := opts.VersionID
if replica {
opts.SetReplicaStatus(replication.Replica)
if opts.VersionPurgeStatus().Empty() {
// opts.VersionID holds delete marker version ID to replicate and not yet present on disk
vID = ""
}
}
opts.SetEvalRetentionBypassFn(func(goi ObjectInfo, gerr error) (err error) {
err = nil
if vID != "" {
err := enforceRetentionBypassForDelete(ctx, r, bucket, ObjectToDelete{
ObjectV: ObjectV{
ObjectName: object,
VersionID: vID,
},
}, goi, gerr)
if err != nil && !isErrObjectNotFound(err) {
return err
}
}
return
})
deleteObject := objectAPI.DeleteObject
// http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectDELETE.html
objInfo, err := deleteObject(ctx, bucket, object, opts)
if err != nil {
if _, ok := err.(BucketNotFound); ok {
// When bucket doesn't exist specially handle it.
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
// Send an event when the object is not found
objInfo.Name = object
objInfo.VersionID = opts.VersionID
sendEvent(eventArgs{
EventName: event.ObjectRemovedNoOP,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
writeSuccessNoContent(w)
return
}
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if objInfo.Name == "" {
writeSuccessNoContent(w)
return
}
defer globalCacheConfig.Delete(bucket, object)
setPutObjHeaders(w, objInfo, true, r.Header)
writeSuccessNoContent(w)
eventName := event.ObjectRemovedDelete
if objInfo.DeleteMarker {
eventName = event.ObjectRemovedDeleteMarkerCreated
}
// Notify object deleted event.
sendEvent(eventArgs{
EventName: eventName,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
if objInfo.ReplicationStatus == replication.Pending || objInfo.VersionPurgeStatus == Pending {
dmVersionID := ""
versionID := ""
if objInfo.DeleteMarker {
dmVersionID = objInfo.VersionID
} else {
versionID = objInfo.VersionID
}
dobj := DeletedObjectReplicationInfo{
DeletedObject: DeletedObject{
ObjectName: object,
VersionID: versionID,
DeleteMarkerVersionID: dmVersionID,
DeleteMarkerMTime: DeleteMarkerMTime{objInfo.ModTime},
DeleteMarker: objInfo.DeleteMarker,
ReplicationState: objInfo.ReplicationState(),
},
Bucket: bucket,
EventType: ReplicateIncomingDelete,
}
scheduleReplicationDelete(ctx, dobj, objectAPI)
}
// Remove the transitioned object whose object version is being overwritten.
if !globalTierConfigMgr.Empty() {
os.Sweep()
}
}
// PutObjectLegalHoldHandler - set legal hold configuration to object,
func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "PutObjectLegalHold")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
// Check permissions to perform this legal hold operation
if s3Err := checkRequestAuthType(ctx, r, policy.PutObjectLegalHoldAction, bucket, object); s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
}
if _, err := objectAPI.GetBucketInfo(ctx, bucket, BucketOptions{}); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if !hasContentMD5(r.Header) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentMD5), r.URL)
return
}
if rcfg, _ := globalBucketObjectLockSys.Get(bucket); !rcfg.LockEnabled {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidBucketObjectLockConfiguration), r.URL)
return
}
legalHold, err := objectlock.ParseObjectLegalHold(io.LimitReader(r.Body, r.ContentLength))
if err != nil {
apiErr := errorCodes.ToAPIErr(ErrMalformedXML)
apiErr.Description = err.Error()
writeErrorResponse(ctx, w, apiErr, r.URL)
return
}
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
popts := ObjectOptions{
MTime: opts.MTime,
VersionID: opts.VersionID,
EvalMetadataFn: func(oi *ObjectInfo, gerr error) (ReplicateDecision, error) {
oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = strings.ToUpper(string(legalHold.Status))
oi.UserDefined[ReservedMetadataPrefixLower+ObjectLockLegalHoldTimestamp] = UTCNow().Format(time.RFC3339Nano)
dsc := mustReplicate(ctx, bucket, object, oi.getMustReplicateOptions(replication.MetadataReplicationType, opts))
if dsc.ReplicateAny() {
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
}
return dsc, nil
},
}
objInfo, err := objectAPI.PutObjectMetadata(ctx, bucket, object, popts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
dsc := mustReplicate(ctx, bucket, object, objInfo.getMustReplicateOptions(replication.MetadataReplicationType, opts))
if dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.MetadataReplicationType)
}
writeSuccessResponseHeadersOnly(w)
// Notify object event.
sendEvent(eventArgs{
EventName: event.ObjectCreatedPutLegalHold,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
}
// GetObjectLegalHoldHandler - get legal hold configuration to object,
func (api objectAPIHandlers) GetObjectLegalHoldHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "GetObjectLegalHold")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
getObjectInfo := objectAPI.GetObjectInfo
if rcfg, _ := globalBucketObjectLockSys.Get(bucket); !rcfg.LockEnabled {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidBucketObjectLockConfiguration), r.URL)
return
}
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
objInfo, err := getObjectInfo(ctx, bucket, object, opts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
legalHold := objectlock.GetObjectLegalHoldMeta(objInfo.UserDefined)
if legalHold.IsEmpty() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNoSuchObjectLockConfiguration), r.URL)
return
}
writeSuccessResponseXML(w, encodeResponse(legalHold))
// Notify object legal hold accessed via a GET request.
sendEvent(eventArgs{
EventName: event.ObjectAccessedGetLegalHold,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
}
// PutObjectRetentionHandler - set object hold configuration to object,
func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "PutObjectRetention")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
cred, owner, s3Err := validateSignature(getRequestAuthType(r), r)
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
return
}
if _, err := objectAPI.GetBucketInfo(ctx, bucket, BucketOptions{}); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if !hasContentMD5(r.Header) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentMD5), r.URL)
return
}
if rcfg, _ := globalBucketObjectLockSys.Get(bucket); !rcfg.LockEnabled {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidBucketObjectLockConfiguration), r.URL)
return
}
objRetention, err := objectlock.ParseObjectRetention(r.Body)
if err != nil {
apiErr := errorCodes.ToAPIErr(ErrMalformedXML)
apiErr.Description = err.Error()
writeErrorResponse(ctx, w, apiErr, r.URL)
return
}
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
popts := ObjectOptions{
MTime: opts.MTime,
VersionID: opts.VersionID,
EvalMetadataFn: func(oi *ObjectInfo, gerr error) (dsc ReplicateDecision, err error) {
if err := enforceRetentionBypassForPut(ctx, r, *oi, objRetention, cred, owner); err != nil {
return dsc, err
}
if objRetention.Mode.Valid() {
oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = string(objRetention.Mode)
oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = amztime.ISO8601Format(objRetention.RetainUntilDate.UTC())
} else {
oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = ""
oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = ""
}
oi.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp] = UTCNow().Format(time.RFC3339Nano)
dsc = mustReplicate(ctx, bucket, object, oi.getMustReplicateOptions(replication.MetadataReplicationType, opts))
if dsc.ReplicateAny() {
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
}
return dsc, nil
},
}
objInfo, err := objectAPI.PutObjectMetadata(ctx, bucket, object, popts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
dsc := mustReplicate(ctx, bucket, object, objInfo.getMustReplicateOptions(replication.MetadataReplicationType, opts))
if dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.MetadataReplicationType)
}
writeSuccessResponseHeadersOnly(w)
// Notify object event.
sendEvent(eventArgs{
EventName: event.ObjectCreatedPutRetention,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
}
// GetObjectRetentionHandler - get object retention configuration of object,
func (api objectAPIHandlers) GetObjectRetentionHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "GetObjectRetention")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
getObjectInfo := objectAPI.GetObjectInfo
if rcfg, _ := globalBucketObjectLockSys.Get(bucket); !rcfg.LockEnabled {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidBucketObjectLockConfiguration), r.URL)
return
}
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
objInfo, err := getObjectInfo(ctx, bucket, object, opts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
retention := objectlock.GetObjectRetentionMeta(objInfo.UserDefined)
if !retention.Mode.Valid() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNoSuchObjectLockConfiguration), r.URL)
return
}
writeSuccessResponseXML(w, encodeResponse(retention))
// Notify object retention accessed via a GET request.
sendEvent(eventArgs{
EventName: event.ObjectAccessedGetRetention,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
}
// ObjectTagSet key value tags
type ObjectTagSet struct {
Tags []tags.Tag `xml:"Tag"`
}
type objectTagging struct {
XMLName xml.Name `xml:"Tagging"`
TagSet *ObjectTagSet `xml:"TagSet"`
}
// GetObjectTaggingHandler - GET object tagging
func (api objectAPIHandlers) GetObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "GetObjectTagging")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
objAPI := api.ObjectAPI()
if objAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
if s3Error := authenticateRequest(ctx, r, policy.GetObjectTaggingAction); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
ot, err := objAPI.GetObjectTags(ctx, bucket, object, opts)
if err != nil {
// if object/version is not found locally, but exists on peer site - proxy
// the tagging request to peer site. The response to client will
// return tags from peer site.
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
proxytgts := getProxyTargets(ctx, bucket, object, opts)
if !proxytgts.Empty() {
globalReplicationStats.Load().incProxy(bucket, getObjectTaggingAPI, false)
// proxy to replication target if site replication is in place.
tags, gerr := proxyGetTaggingToRepTarget(ctx, bucket, object, opts, proxytgts)
if gerr.Err != nil || tags == nil {
globalReplicationStats.Load().incProxy(bucket, getObjectTaggingAPI, true)
writeErrorResponse(ctx, w, toAPIError(ctx, gerr.Err), r.URL)
return
} // overlay tags from peer site.
ot = tags
w.Header()[xhttp.MinIOTaggingProxied] = []string{"true"} // indicate that the request was proxied.
} else {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
} else {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
}
// Set this such that authorization policies can be applied on the object tags.
if tags := ot.String(); tags != "" {
r.Header.Set(xhttp.AmzObjectTagging, tags)
}
if s3Error := authorizeRequest(ctx, r, policy.GetObjectTaggingAction); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
if opts.VersionID != "" && opts.VersionID != nullVersionID {
w.Header()[xhttp.AmzVersionID] = []string{opts.VersionID}
}
otags := &objectTagging{
TagSet: &ObjectTagSet{},
}
var list []tags.Tag
for k, v := range ot.ToMap() {
list = append(list, tags.Tag{
Key: k,
Value: v,
})
}
// Always return in sorted order for tags.
sort.Slice(list, func(i, j int) bool {
return list[i].Key < list[j].Key
})
otags.TagSet.Tags = list
writeSuccessResponseXML(w, encodeResponse(otags))
}
// PutObjectTaggingHandler - PUT object tagging
func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "PutObjectTagging")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
objAPI := api.ObjectAPI()
if objAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
// Tags XML will not be bigger than 1MiB in size, fail if its bigger.
tags, err := tags.ParseObjectXML(io.LimitReader(r.Body, 1<<20))
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Set this such that authorization policies can be applied on the object tags.
r.Header.Set(xhttp.AmzObjectTagging, tags.String())
// Allow putObjectTagging if policy action is set
if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectTaggingAction, bucket, object); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
objInfo, err := objAPI.GetObjectInfo(ctx, bucket, object, opts)
if err != nil {
// if object is not found locally, but exists on peer site - proxy
// the tagging request to peer site. The response to client will
// be 200 with extra header indicating that the request was proxied.
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
proxytgts := getProxyTargets(ctx, bucket, object, opts)
if !proxytgts.Empty() {
globalReplicationStats.Load().incProxy(bucket, putObjectTaggingAPI, false)
// proxy to replication target if site replication is in place.
perr := proxyTaggingToRepTarget(ctx, bucket, object, tags, opts, proxytgts)
if perr.Err != nil {
globalReplicationStats.Load().incProxy(bucket, putObjectTaggingAPI, true)
writeErrorResponse(ctx, w, toAPIError(ctx, perr.Err), r.URL)
return
}
w.Header()[xhttp.MinIOTaggingProxied] = []string{"true"}
writeSuccessResponseHeadersOnly(w)
// when tagging is proxied, the object version is not available to return
// as header in the response, or ObjectInfo in the notification event.
sendEvent(eventArgs{
EventName: event.ObjectCreatedPutTagging,
BucketName: bucket,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
return
}
}
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
tagsStr := tags.String()
dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo.UserDefined, tagsStr, objInfo.ReplicationStatus, replication.MetadataReplicationType, opts))
if dsc.ReplicateAny() {
opts.UserDefined = make(map[string]string)
opts.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
opts.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
opts.UserDefined[ReservedMetadataPrefixLower+TaggingTimestamp] = UTCNow().Format(time.RFC3339Nano)
}
// Put object tags
objInfo, err = objAPI.PutObjectTags(ctx, bucket, object, tagsStr, opts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if dsc.ReplicateAny() {
scheduleReplication(ctx, objInfo, objAPI, dsc, replication.MetadataReplicationType)
}
if objInfo.VersionID != "" && objInfo.VersionID != nullVersionID {
w.Header()[xhttp.AmzVersionID] = []string{objInfo.VersionID}
}
writeSuccessResponseHeadersOnly(w)
sendEvent(eventArgs{
EventName: event.ObjectCreatedPutTagging,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
}
// DeleteObjectTaggingHandler - DELETE object tagging
func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "DeleteObjectTagging")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objAPI := api.ObjectAPI()
if objAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
oi, err := objAPI.GetObjectInfo(ctx, bucket, object, opts)
if err != nil {
// if object is not found locally, but exists on peer site - proxy
// the tagging request to peer site. The response to client will
// be 200 OK with extra header indicating that the request was proxied.
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
proxytgts := getProxyTargets(ctx, bucket, object, opts)
if !proxytgts.Empty() {
globalReplicationStats.Load().incProxy(bucket, removeObjectTaggingAPI, false)
// proxy to replication target if active-active replication is in place.
perr := proxyTaggingToRepTarget(ctx, bucket, object, nil, opts, proxytgts)
if perr.Err != nil {
globalReplicationStats.Load().incProxy(bucket, removeObjectTaggingAPI, true)
writeErrorResponse(ctx, w, toAPIError(ctx, perr.Err), r.URL)
return
}
// when delete tagging is proxied, the object version/tags are not available to return
// as header in the response, nor ObjectInfo in the notification event.
w.Header()[xhttp.MinIOTaggingProxied] = []string{"true"}
writeSuccessNoContent(w)
sendEvent(eventArgs{
EventName: event.ObjectCreatedDeleteTagging,
BucketName: bucket,
Object: oi,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
return
}
}
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if userTags := oi.UserTags; userTags != "" {
// Set this such that authorization policies can be applied on the object tags.
r.Header.Set(xhttp.AmzObjectTagging, oi.UserTags)
}
// Allow deleteObjectTagging if policy action is set
if s3Error := checkRequestAuthType(ctx, r, policy.DeleteObjectTaggingAction, bucket, object); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
dsc := mustReplicate(ctx, bucket, object, oi.getMustReplicateOptions(replication.MetadataReplicationType, opts))
if dsc.ReplicateAny() {
opts.UserDefined = make(map[string]string)
opts.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
opts.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
}
oi, err = objAPI.DeleteObjectTags(ctx, bucket, object, opts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if dsc.ReplicateAny() {
scheduleReplication(ctx, oi, objAPI, dsc, replication.MetadataReplicationType)
}
if oi.VersionID != "" && oi.VersionID != nullVersionID {
w.Header()[xhttp.AmzVersionID] = []string{oi.VersionID}
}
writeSuccessNoContent(w)
sendEvent(eventArgs{
EventName: event.ObjectCreatedDeleteTagging,
BucketName: bucket,
Object: oi,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
}
// RestoreObjectHandler - POST restore object handler.
// ----------
func (api objectAPIHandlers) PostRestoreObjectHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "PostRestoreObject")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Fetch object stat info.
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
getObjectInfo := objectAPI.GetObjectInfo
// Check for auth type to return S3 compatible error.
if s3Error := checkRequestAuthType(ctx, r, policy.RestoreObjectAction, bucket, object); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
if r.ContentLength <= 0 {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEmptyRequestBody), r.URL)
return
}
opts, err := postRestoreOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
objInfo, err := getObjectInfo(ctx, bucket, object, opts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if objInfo.TransitionedObject.Status != lifecycle.TransitionComplete {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidObjectState), r.URL)
return
}
rreq, err := parseRestoreRequest(io.LimitReader(r.Body, r.ContentLength))
if err != nil {
apiErr := errorCodes.ToAPIErr(ErrMalformedXML)
apiErr.Description = err.Error()
writeErrorResponse(ctx, w, apiErr, r.URL)
return
}
// validate the request
if err := rreq.validate(ctx, objectAPI); err != nil {
apiErr := errorCodes.ToAPIErr(ErrMalformedXML)
apiErr.Description = err.Error()
writeErrorResponse(ctx, w, apiErr, r.URL)
return
}
statusCode := http.StatusOK
alreadyRestored := false
if err == nil {
if objInfo.RestoreOngoing && rreq.Type != SelectRestoreRequest {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrObjectRestoreAlreadyInProgress), r.URL)
return
}
if !objInfo.RestoreOngoing && !objInfo.RestoreExpires.IsZero() {
statusCode = http.StatusAccepted
alreadyRestored = true
}
}
// set or upgrade restore expiry
restoreExpiry := lifecycle.ExpectedExpiryTime(time.Now().UTC(), rreq.Days)
metadata := cloneMSS(objInfo.UserDefined)
// update self with restore metadata
if rreq.Type != SelectRestoreRequest {
objInfo.metadataOnly = true // Perform only metadata updates.
metadata[xhttp.AmzRestoreExpiryDays] = strconv.Itoa(rreq.Days)
metadata[xhttp.AmzRestoreRequestDate] = time.Now().UTC().Format(http.TimeFormat)
if alreadyRestored {
metadata[xhttp.AmzRestore] = completedRestoreObj(restoreExpiry).String()
} else {
metadata[xhttp.AmzRestore] = ongoingRestoreObj().String()
}
objInfo.UserDefined = metadata
if _, err := objectAPI.CopyObject(GlobalContext, bucket, object, bucket, object, objInfo, ObjectOptions{
VersionID: objInfo.VersionID,
}, ObjectOptions{
VersionID: objInfo.VersionID,
MTime: objInfo.ModTime,
}); err != nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidObjectState), r.URL)
return
}
// for previously restored object, just update the restore expiry
if alreadyRestored {
return
}
}
restoreObject := mustGetUUID()
if rreq.OutputLocation.S3.BucketName != "" {
w.Header()[xhttp.AmzRestoreOutputPath] = []string{pathJoin(rreq.OutputLocation.S3.BucketName, rreq.OutputLocation.S3.Prefix, restoreObject)}
}
w.WriteHeader(statusCode)
// Notify object restore started via a POST request.
sendEvent(eventArgs{
EventName: event.ObjectRestorePost,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
// now process the restore in background
go func() {
rctx := GlobalContext
if !rreq.SelectParameters.IsEmpty() {
actualSize, err := objInfo.GetActualSize()
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
objectRSC := s3select.NewObjectReadSeekCloser(
func(offset int64) (io.ReadCloser, error) {
rs := &HTTPRangeSpec{
IsSuffixLength: false,
Start: offset,
End: -1,
}
return getTransitionedObjectReader(rctx, bucket, object, rs, r.Header,
objInfo, ObjectOptions{VersionID: objInfo.VersionID})
},
actualSize,
)
defer objectRSC.Close()
if err = rreq.SelectParameters.Open(objectRSC); err != nil {
if serr, ok := err.(s3select.SelectError); ok {
encodedErrorResponse := encodeResponse(APIErrorResponse{
Code: serr.ErrorCode(),
Message: serr.ErrorMessage(),
BucketName: bucket,
Key: object,
Resource: r.URL.Path,
RequestID: w.Header().Get(xhttp.AmzRequestID),
HostID: globalDeploymentID(),
})
writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, mimeXML)
} else {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
}
return
}
nr := httptest.NewRecorder()
rw := xhttp.NewResponseRecorder(nr)
rw.LogErrBody = true
rw.LogAllBody = true
rreq.SelectParameters.Evaluate(rw)
rreq.SelectParameters.Close()
return
}
opts := ObjectOptions{
Transition: TransitionOptions{
RestoreRequest: rreq,
RestoreExpiry: restoreExpiry,
},
VersionID: objInfo.VersionID,
}
if err := objectAPI.RestoreTransitionedObject(rctx, bucket, object, opts); err != nil {
s3LogIf(ctx, fmt.Errorf("Unable to restore transitioned bucket/object %s/%s: %w", bucket, object, err))
return
}
// Notify object restore completed via a POST request.
sendEvent(eventArgs{
EventName: event.ObjectRestoreCompleted,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
}()
}