// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

package cmd

import (
	"archive/tar"
	"bytes"
	"context"
	"encoding/hex"
	"encoding/xml"
	"errors"
	"fmt"
	"io"
	"net/http"
	"net/http/httptest"
	"net/textproto"
	"net/url"
	"os"
	"sort"
	"strconv"
	"strings"
	"sync"
	"time"
	"unicode"

	"github.com/google/uuid"
	"github.com/klauspost/compress/gzhttp"
	miniogo "github.com/minio/minio-go/v7"
	"github.com/minio/minio-go/v7/pkg/credentials"
	"github.com/minio/minio-go/v7/pkg/encrypt"
	"github.com/minio/minio-go/v7/pkg/tags"
	"github.com/minio/minio/internal/amztime"
	"github.com/minio/minio/internal/auth"
	sse "github.com/minio/minio/internal/bucket/encryption"
	"github.com/minio/minio/internal/bucket/lifecycle"
	objectlock "github.com/minio/minio/internal/bucket/object/lock"
	"github.com/minio/minio/internal/bucket/replication"
	"github.com/minio/minio/internal/config/cache"
	"github.com/minio/minio/internal/config/dns"
	"github.com/minio/minio/internal/config/storageclass"
	"github.com/minio/minio/internal/crypto"
	"github.com/minio/minio/internal/etag"
	"github.com/minio/minio/internal/event"
	"github.com/minio/minio/internal/handlers"
	"github.com/minio/minio/internal/hash"
	xhttp "github.com/minio/minio/internal/http"
	xioutil "github.com/minio/minio/internal/ioutil"
	"github.com/minio/minio/internal/kms"
	"github.com/minio/minio/internal/logger"
	"github.com/minio/minio/internal/s3select"
	"github.com/minio/mux"
	"github.com/minio/pkg/v2/policy"
	"github.com/valyala/bytebufferpool"
)

// supportedHeadGetReqParams - supported request parameters for GET and HEAD presigned request.
var supportedHeadGetReqParams = map[string]string{
	"response-expires":             xhttp.Expires,
	"response-content-type":        xhttp.ContentType,
	"response-cache-control":       xhttp.CacheControl,
	"response-content-encoding":    xhttp.ContentEncoding,
	"response-content-language":    xhttp.ContentLanguage,
	"response-content-disposition": xhttp.ContentDisposition,
}

const (
	compressionAlgorithmV1 = "golang/snappy/LZ77"
	compressionAlgorithmV2 = "klauspost/compress/s2"

	// When an upload exceeds encryptBufferThreshold ...
	encryptBufferThreshold = 1 << 20
	// add an input buffer of this size.
	encryptBufferSize = 1 << 20

	// minCompressibleSize is the minimum size at which we enable compression.
	minCompressibleSize = 4096
)

// setHeadGetRespHeaders - set any requested parameters as response headers.
func setHeadGetRespHeaders(w http.ResponseWriter, reqParams url.Values) {
	for k, v := range reqParams {
		if header, ok := supportedHeadGetReqParams[strings.ToLower(k)]; ok {
			w.Header()[header] = []string{strings.Join(v, ",")}
		}
	}
}

// SelectObjectContentHandler - GET Object?select
// ----------
// This implementation of the GET operation retrieves object content based
// on an SQL expression. In the request, along with the sql expression, you must
// also specify a data serialization format (JSON, CSV) of the object.
func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r *http.Request) {
	ctx := newContext(r, w, "SelectObject")

	defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))

	// Fetch object stat info.
	objectAPI := api.ObjectAPI()
	if objectAPI == nil {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
		return
	}

	if crypto.S3.IsRequested(r.Header) || crypto.S3KMS.IsRequested(r.Header) { // If SSE-S3 or SSE-KMS present -> AWS fails with undefined error
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrBadRequest), r.URL)
		return
	}

	vars := mux.Vars(r)
	bucket := vars["bucket"]
	object, err := unescapePath(vars["object"])
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	opts, err := getOpts(ctx, r, bucket, object)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	getObjectInfo := objectAPI.GetObjectInfo

	// Check for auth type to return S3 compatible error.
	// type to return the correct error (NoSuchKey vs AccessDenied)
	if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectAction, bucket, object); s3Error != ErrNone {
		if getRequestAuthType(r) == authTypeAnonymous {
			// As per "Permission" section in
			// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html
			// If the object you request does not exist,
			// the error Amazon S3 returns depends on
			// whether you also have the s3:ListBucket
			// permission.
			// * If you have the s3:ListBucket permission
			//   on the bucket, Amazon S3 will return an
			//   HTTP status code 404 ("no such key")
			//   error.
			// * if you don’t have the s3:ListBucket
			//   permission, Amazon S3 will return an HTTP
			//   status code 403 ("access denied") error.`
			if globalPolicySys.IsAllowed(policy.BucketPolicyArgs{
				Action:          policy.ListBucketAction,
				BucketName:      bucket,
				ConditionValues: getConditionValues(r, "", auth.AnonymousCredentials),
				IsOwner:         false,
			}) {
				_, err = getObjectInfo(ctx, bucket, object, opts)
				if toAPIError(ctx, err).Code == "NoSuchKey" {
					s3Error = ErrNoSuchKey
				}
			}
		}
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
		return
	}

	// Get request range.
	rangeHeader := r.Header.Get(xhttp.Range)
	if rangeHeader != "" {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrUnsupportedRangeHeader), r.URL)
		return
	}

	if r.ContentLength <= 0 {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEmptyRequestBody), r.URL)
		return
	}

	// Take read lock on object, here so subsequent lower-level
	// calls do not need to.
	lock := objectAPI.NewNSLock(bucket, object)
	lkctx, err := lock.GetRLock(ctx, globalOperationTimeout)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}
	ctx = lkctx.Context()
	defer lock.RUnlock(lkctx)

	getObjectNInfo := objectAPI.GetObjectNInfo

	gopts := opts
	gopts.NoLock = true // We already have a lock, we can live with it.
	objInfo, err := getObjectInfo(ctx, bucket, object, gopts)
	if err != nil {
		// Versioning enabled quite possibly object is deleted might be delete-marker
		// if present set the headers, no idea why AWS S3 sets these headers.
		if objInfo.VersionID != "" && objInfo.DeleteMarker {
			w.Header()[xhttp.AmzVersionID] = []string{objInfo.VersionID}
			w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(objInfo.DeleteMarker)}
		}
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	// filter object lock metadata if permission does not permit
	getRetPerms := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object)
	legalHoldPerms := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object)

	// filter object lock metadata if permission does not permit
	objInfo.UserDefined = objectlock.FilterObjectLockMetadata(objInfo.UserDefined, getRetPerms != ErrNone, legalHoldPerms != ErrNone)

	if _, err = DecryptObjectInfo(&objInfo, r); err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	actualSize, err := objInfo.GetActualSize()
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	objectRSC := s3select.NewObjectReadSeekCloser(
		func(offset int64) (io.ReadCloser, error) {
			rs := &HTTPRangeSpec{
				IsSuffixLength: false,
				Start:          offset,
				End:            -1,
			}
			opts.NoLock = true
			return getObjectNInfo(ctx, bucket, object, rs, r.Header, opts)
		},
		actualSize,
	)
	defer objectRSC.Close()
	s3Select, err := s3select.NewS3Select(r.Body)
	if err != nil {
		if serr, ok := err.(s3select.SelectError); ok {
			encodedErrorResponse := encodeResponse(APIErrorResponse{
				Code:       serr.ErrorCode(),
				Message:    serr.ErrorMessage(),
				BucketName: bucket,
				Key:        object,
				Resource:   r.URL.Path,
				RequestID:  w.Header().Get(xhttp.AmzRequestID),
				HostID:     globalDeploymentID(),
			})
			writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, mimeXML)
		} else {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		}
		return
	}
	defer s3Select.Close()

	if err = s3Select.Open(objectRSC); err != nil {
		if serr, ok := err.(s3select.SelectError); ok {
			encodedErrorResponse := encodeResponse(APIErrorResponse{
				Code:       serr.ErrorCode(),
				Message:    serr.ErrorMessage(),
				BucketName: bucket,
				Key:        object,
				Resource:   r.URL.Path,
				RequestID:  w.Header().Get(xhttp.AmzRequestID),
				HostID:     globalDeploymentID(),
			})
			writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, mimeXML)
		} else {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		}
		return
	}

	// Set encryption response headers
	switch kind, _ := crypto.IsEncrypted(objInfo.UserDefined); kind {
	case crypto.S3:
		w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionAES)
	case crypto.S3KMS:
		w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionKMS)
		w.Header().Set(xhttp.AmzServerSideEncryptionKmsID, objInfo.KMSKeyID())
		if kmsCtx, ok := objInfo.UserDefined[crypto.MetaContext]; ok {
			w.Header().Set(xhttp.AmzServerSideEncryptionKmsContext, kmsCtx)
		}
	case crypto.SSEC:
		// Validate the SSE-C Key set in the header.
		if _, err = crypto.SSEC.UnsealObjectKey(r.Header, objInfo.UserDefined, bucket, object); err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}
		w.Header().Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerAlgorithm))
		w.Header().Set(xhttp.AmzServerSideEncryptionCustomerKeyMD5, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerKeyMD5))
	}

	s3Select.Evaluate(w)

	// Notify object accessed via a GET request.
	sendEvent(eventArgs{
		EventName:    event.ObjectAccessedGet,
		BucketName:   bucket,
		Object:       objInfo,
		ReqParams:    extractReqParams(r),
		RespElements: extractRespElements(w),
		UserAgent:    r.UserAgent(),
		Host:         handlers.GetSourceIP(r),
	})
}

func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI ObjectLayer, bucket, object string, w http.ResponseWriter, r *http.Request) {
	if crypto.S3.IsRequested(r.Header) || crypto.S3KMS.IsRequested(r.Header) { // If SSE-S3 or SSE-KMS present -> AWS fails with undefined error
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrBadRequest), r.URL)
		return
	}

	opts, err := getOpts(ctx, r, bucket, object)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	// Check for auth type to return S3 compatible error.
	// type to return the correct error (NoSuchKey vs AccessDenied)
	if s3Error := authenticateRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone {
		if getRequestAuthType(r) == authTypeAnonymous {
			// As per "Permission" section in
			// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html
			// If the object you request does not exist,
			// the error Amazon S3 returns depends on
			// whether you also have the s3:ListBucket
			// permission.
			// * If you have the s3:ListBucket permission
			//   on the bucket, Amazon S3 will return an
			//   HTTP status code 404 ("no such key")
			//   error.
			// * if you don’t have the s3:ListBucket
			//   permission, Amazon S3 will return an HTTP
			//   status code 403 ("access denied") error.`
			if globalPolicySys.IsAllowed(policy.BucketPolicyArgs{
				Action:          policy.ListBucketAction,
				BucketName:      bucket,
				ConditionValues: getConditionValues(r, "", auth.AnonymousCredentials),
				IsOwner:         false,
			}) {
				getObjectInfo := objectAPI.GetObjectInfo

				_, err = getObjectInfo(ctx, bucket, object, opts)
				if toAPIError(ctx, err).Code == "NoSuchKey" {
					s3Error = ErrNoSuchKey
				}
			}
		}
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
		return
	}

	getObjectNInfo := objectAPI.GetObjectNInfo

	// Get request range.
	var rs *HTTPRangeSpec
	var rangeErr error
	rangeHeader := r.Header.Get(xhttp.Range)
	if rangeHeader != "" {
		// Both 'Range' and 'partNumber' cannot be specified at the same time
		if opts.PartNumber > 0 {
			writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRangePartNumber), r.URL)
			return
		}

		rs, rangeErr = parseRequestRangeSpec(rangeHeader)
		// Handle only errInvalidRange. Ignore other
		// parse error and treat it as regular Get
		// request like Amazon S3.
		if errors.Is(rangeErr, errInvalidRange) {
			writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRange), r.URL)
			return
		}
	}

	cachedResult := globalCacheConfig.Enabled() && opts.VersionID == ""

	var update bool
	if cachedResult {
		rc := &cache.CondCheck{}
		h := r.Header.Clone()
		if opts.PartNumber > 0 {
			h.Set(xhttp.PartNumber, strconv.Itoa(opts.PartNumber))
		}
		rc.Init(bucket, object, h)

		ci, err := globalCacheConfig.Get(rc)
		if ci != nil {
			tgs, ok := ci.Metadata[xhttp.AmzObjectTagging]
			if ok {
				// Set this such that authorization policies can be applied on the object tags.
				r.Header.Set(xhttp.AmzObjectTagging, tgs)
			}

			if s3Error := authorizeRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone {
				writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(s3Error))
				return
			}

			okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent ||
				ci.StatusCode == http.StatusPreconditionFailed || ci.StatusCode == http.StatusNotModified)
			if okSt {
				ci.WriteHeaders(w, func() {
					// set common headers
					setCommonHeaders(w)
				}, func() {
					okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent)
					if okSt && len(ci.Data) > 0 {
						for k, v := range ci.Metadata {
							w.Header().Set(k, v)
						}

						if opts.PartNumber > 0 && strings.Contains(ci.ETag, "-") {
							w.Header()[xhttp.AmzMpPartsCount] = []string{
								strings.TrimLeftFunc(ci.ETag, func(r rune) bool {
									return !unicode.IsNumber(r)
								}),
							}
						}

						// For providing ranged content
						start, rangeLen, err := rs.GetOffsetLength(ci.Size)
						if err != nil {
							start, rangeLen = 0, ci.Size
						}

						// Set content length.
						w.Header().Set(xhttp.ContentLength, strconv.FormatInt(rangeLen, 10))
						if rs != nil {
							contentRange := fmt.Sprintf("bytes %d-%d/%d", start, start+rangeLen-1, ci.Size)
							w.Header().Set(xhttp.ContentRange, contentRange)
						}

						io.Copy(w, bytes.NewReader(ci.Data))
						return
					}
					if ci.StatusCode == http.StatusPreconditionFailed {
						writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrPreconditionFailed), r.URL)
						return
					} else if ci.StatusCode == http.StatusNotModified {
						w.WriteHeader(ci.StatusCode)
						return
					}

					// We did not satisfy any requirement from the cache, update the cache.
					// this basically means that we do not have the Data for the object
					// cached yet
					update = true
				})
				if !update {
					// No update is needed means we have written already to the client just return here.
					return
				}
			}
		}

		if errors.Is(err, cache.ErrKeyMissing) {
			update = true
		}
	}

	// Validate pre-conditions if any.
	opts.CheckPrecondFn = func(oi ObjectInfo) bool {
		if _, err := DecryptObjectInfo(&oi, r); err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return true
		}

		return checkPreconditions(ctx, w, r, oi, opts)
	}

	opts.FastGetObjInfo = true

	var proxy proxyResult
	gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, opts)
	if err != nil {
		var (
			reader *GetObjectReader
			perr   error
		)
		proxytgts := getProxyTargets(ctx, bucket, object, opts)
		if !proxytgts.Empty() {
			globalReplicationStats.incProxy(bucket, getObjectAPI, false)
			// proxy to replication target if active-active replication is in place.
			reader, proxy, perr = proxyGetToReplicationTarget(ctx, bucket, object, rs, r.Header, opts, proxytgts)
			if perr != nil {
				globalReplicationStats.incProxy(bucket, getObjectAPI, true)
				proxyGetErr := ErrorRespToObjectError(perr, bucket, object)
				if !isErrBucketNotFound(proxyGetErr) && !isErrObjectNotFound(proxyGetErr) && !isErrVersionNotFound(proxyGetErr) &&
					!isErrPreconditionFailed(proxyGetErr) && !isErrInvalidRange(proxyGetErr) {
					logger.LogIf(ctx, fmt.Errorf("Proxying request (replication) failed for %s/%s(%s) - %w", bucket, object, opts.VersionID, perr))
				}
			}
			if reader != nil && proxy.Proxy && perr == nil {
				gr = reader
			}
		}
		if reader == nil || !proxy.Proxy {
			// validate if the request indeed was authorized, if it wasn't we need to return "ErrAccessDenied"
			// instead of any namespace related error.
			if s3Error := authorizeRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone {
				writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
				return
			}
			if isErrPreconditionFailed(err) {
				return
			}
			if proxy.Err != nil {
				writeErrorResponse(ctx, w, toAPIError(ctx, proxy.Err), r.URL)
				return
			}
			if gr != nil {
				if !gr.ObjInfo.VersionPurgeStatus.Empty() {
					// Shows the replication status of a permanent delete of a version
					w.Header()[xhttp.MinIODeleteReplicationStatus] = []string{string(gr.ObjInfo.VersionPurgeStatus)}
				}
				if !gr.ObjInfo.ReplicationStatus.Empty() && gr.ObjInfo.DeleteMarker {
					w.Header()[xhttp.MinIODeleteMarkerReplicationStatus] = []string{string(gr.ObjInfo.ReplicationStatus)}
				}

				// Versioning enabled quite possibly object is deleted might be delete-marker
				// if present set the headers, no idea why AWS S3 sets these headers.
				if gr.ObjInfo.VersionID != "" && gr.ObjInfo.DeleteMarker {
					w.Header()[xhttp.AmzVersionID] = []string{gr.ObjInfo.VersionID}
					w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(gr.ObjInfo.DeleteMarker)}
				}
				QueueReplicationHeal(ctx, bucket, gr.ObjInfo, 0)
			}
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}
	}
	defer gr.Close()

	objInfo := gr.ObjInfo

	if objInfo.UserTags != "" {
		r.Header.Set(xhttp.AmzObjectTagging, objInfo.UserTags)
	}

	if s3Error := authorizeRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
		return
	}

	if !proxy.Proxy { // apply lifecycle rules only for local requests
		// Automatically remove the object/version if an expiry lifecycle rule can be applied
		if lc, err := globalLifecycleSys.Get(bucket); err == nil {
			rcfg, _ := globalBucketObjectLockSys.Get(bucket)
			replcfg, _ := getReplicationConfig(ctx, bucket)
			event := evalActionFromLifecycle(ctx, *lc, rcfg, replcfg, objInfo)
			if event.Action.Delete() {
				// apply whatever the expiry rule is.
				applyExpiryRule(event, lcEventSrc_s3GetObject, objInfo)
				if !event.Action.DeleteRestored() {
					// If the ILM action is not on restored object return error.
					writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrNoSuchKey))
					return
				}
			}
		}

		QueueReplicationHeal(ctx, bucket, gr.ObjInfo, 0)
	}

	// filter object lock metadata if permission does not permit
	getRetPerms := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object)
	legalHoldPerms := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object)

	// filter object lock metadata if permission does not permit
	objInfo.UserDefined = objectlock.FilterObjectLockMetadata(objInfo.UserDefined, getRetPerms != ErrNone, legalHoldPerms != ErrNone)

	// Set encryption response headers
	if kind, isEncrypted := crypto.IsEncrypted(objInfo.UserDefined); isEncrypted {
		switch kind {
		case crypto.S3:
			w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionAES)
		case crypto.S3KMS:
			w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionKMS)
			w.Header().Set(xhttp.AmzServerSideEncryptionKmsID, objInfo.KMSKeyID())
			if kmsCtx, ok := objInfo.UserDefined[crypto.MetaContext]; ok {
				w.Header().Set(xhttp.AmzServerSideEncryptionKmsContext, kmsCtx)
			}
		case crypto.SSEC:
			w.Header().Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerAlgorithm))
			w.Header().Set(xhttp.AmzServerSideEncryptionCustomerKeyMD5, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerKeyMD5))
		}
		objInfo.ETag = getDecryptedETag(r.Header, objInfo, false)
	}

	if r.Header.Get(xhttp.AmzChecksumMode) == "ENABLED" && rs == nil {
		// AWS S3 silently drops checksums on range requests.
		hash.AddChecksumHeader(w, objInfo.decryptChecksums(opts.PartNumber))
	}

	var buf *bytebufferpool.ByteBuffer
	if update {
		if globalCacheConfig.MatchesSize(objInfo.Size) {
			buf = bytebufferpool.Get()
			defer bytebufferpool.Put(buf)
		}
		defer func() {
			var data []byte
			if buf != nil {
				data = buf.Bytes()
			}

			asize, err := objInfo.GetActualSize()
			if err != nil {
				asize = objInfo.Size
			}

			globalCacheConfig.Set(&cache.ObjectInfo{
				Key:          objInfo.Name,
				Bucket:       objInfo.Bucket,
				ETag:         objInfo.ETag,
				ModTime:      objInfo.ModTime,
				Expires:      objInfo.ExpiresStr(),
				CacheControl: objInfo.CacheControl,
				Metadata:     cleanReservedKeys(objInfo.UserDefined),
				Range:        rangeHeader,
				PartNumber:   opts.PartNumber,
				Size:         asize,
				Data:         data,
			})
		}()
	}

	if err = setObjectHeaders(w, objInfo, rs, opts); err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	// Set Parts Count Header
	if opts.PartNumber > 0 && len(objInfo.Parts) > 0 {
		setPartsCountHeaders(w, objInfo)
	}

	setHeadGetRespHeaders(w, r.Form)

	var iw io.Writer
	iw = w
	if buf != nil {
		iw = io.MultiWriter(w, buf)
	}

	statusCodeWritten := false
	httpWriter := xioutil.WriteOnClose(iw)
	if rs != nil || opts.PartNumber > 0 {
		statusCodeWritten = true
		w.WriteHeader(http.StatusPartialContent)
	}

	// Write object content to response body
	if _, err = xioutil.Copy(httpWriter, gr); err != nil {
		if !httpWriter.HasWritten() && !statusCodeWritten {
			// write error response only if no data or headers has been written to client yet
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}
		return
	}

	if err = httpWriter.Close(); err != nil {
		if !httpWriter.HasWritten() && !statusCodeWritten { // write error response only if no data or headers has been written to client yet
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}
		return
	}

	// Notify object accessed via a GET request.
	sendEvent(eventArgs{
		EventName:    event.ObjectAccessedGet,
		BucketName:   bucket,
		Object:       objInfo,
		ReqParams:    extractReqParams(r),
		RespElements: extractRespElements(w),
		UserAgent:    r.UserAgent(),
		Host:         handlers.GetSourceIP(r),
	})
}

// GetObjectAttributes ...
func (api objectAPIHandlers) getObjectAttributesHandler(ctx context.Context, objectAPI ObjectLayer, bucket, object string, w http.ResponseWriter, r *http.Request) {
	opts, valid := getAndValidateAttributesOpts(ctx, w, r, bucket, object)
	if !valid {
		return
	}

	var s3Error APIErrorCode
	if opts.VersionID != "" {
		s3Error = checkRequestAuthType(ctx, r, policy.GetObjectVersionAttributesAction, bucket, object)
		if s3Error == ErrNone {
			s3Error = checkRequestAuthType(ctx, r, policy.GetObjectVersionAction, bucket, object)
		}
	} else {
		s3Error = checkRequestAuthType(ctx, r, policy.GetObjectAttributesAction, bucket, object)
		if s3Error == ErrNone {
			s3Error = checkRequestAuthType(ctx, r, policy.GetObjectAction, bucket, object)
		}
	}

	if s3Error != ErrNone {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
		return
	}

	objInfo, err := objectAPI.GetObjectInfo(ctx, bucket, object, opts)
	if err != nil {
		s3Error = checkRequestAuthType(ctx, r, policy.ListBucketAction, bucket, object)
		if s3Error == ErrNone {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrAccessDenied), r.URL)
		return
	}

	if _, err = DecryptObjectInfo(&objInfo, r); err != nil {
		writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
		return
	}

	if checkPreconditions(ctx, w, r, objInfo, opts) {
		return
	}

	OA := new(getObjectAttributesResponse)

	if opts.Versioned {
		w.Header().Set(xhttp.AmzVersionID, objInfo.VersionID)
	}

	lastModified := objInfo.ModTime.UTC().Format(http.TimeFormat)
	w.Header().Set(xhttp.LastModified, lastModified)
	w.Header().Del(xhttp.ContentType)

	if _, ok := opts.ObjectAttributes[xhttp.Checksum]; ok {
		chkSums := objInfo.decryptChecksums(0)
		// AWS does not appear to append part number on this API call.
		switch {
		case chkSums["CRC32"] != "":
			OA.Checksum = new(objectAttributesChecksum)
			OA.Checksum.ChecksumCRC32 = strings.Split(chkSums["CRC32"], "-")[0]
		case chkSums["CRC32C"] != "":
			OA.Checksum = new(objectAttributesChecksum)
			OA.Checksum.ChecksumCRC32C = strings.Split(chkSums["CRC32C"], "-")[0]
		case chkSums["SHA256"] != "":
			OA.Checksum = new(objectAttributesChecksum)
			OA.Checksum.ChecksumSHA1 = strings.Split(chkSums["SHA1"], "-")[0]
		case chkSums["SHA1"] != "":
			OA.Checksum = new(objectAttributesChecksum)
			OA.Checksum.ChecksumSHA256 = strings.Split(chkSums["SHA256"], "-")[0]
		}

	}

	if _, ok := opts.ObjectAttributes[xhttp.ETag]; ok {
		OA.ETag = objInfo.ETag
	}

	if _, ok := opts.ObjectAttributes[xhttp.ObjectSize]; ok {
		OA.ObjectSize, _ = objInfo.GetActualSize()
	}

	if _, ok := opts.ObjectAttributes[xhttp.StorageClass]; ok {
		OA.StorageClass = objInfo.StorageClass
	}

	objInfo.decryptPartsChecksums()

	if _, ok := opts.ObjectAttributes[xhttp.ObjectParts]; ok {
		OA.ObjectParts = new(objectAttributesParts)
		OA.ObjectParts.PartNumberMarker = opts.PartNumberMarker

		OA.ObjectParts.MaxParts = opts.MaxParts
		partsLength := len(objInfo.Parts)
		OA.ObjectParts.PartsCount = partsLength

		if opts.MaxParts > -1 {
			for i, v := range objInfo.Parts {
				if v.Number <= opts.PartNumberMarker {
					continue
				}

				if len(OA.ObjectParts.Parts) == opts.MaxParts {
					break
				}

				OA.ObjectParts.NextPartNumberMarker = v.Number
				OA.ObjectParts.Parts = append(OA.ObjectParts.Parts, &objectAttributesPart{
					ChecksumSHA1:   objInfo.Parts[i].Checksums["SHA1"],
					ChecksumSHA256: objInfo.Parts[i].Checksums["SHA256"],
					ChecksumCRC32:  objInfo.Parts[i].Checksums["CRC32"],
					ChecksumCRC32C: objInfo.Parts[i].Checksums["CRC32C"],
					PartNumber:     objInfo.Parts[i].Number,
					Size:           objInfo.Parts[i].Size,
				})
			}
		}

		if OA.ObjectParts.NextPartNumberMarker != partsLength {
			OA.ObjectParts.IsTruncated = true
		}
	}

	writeSuccessResponseXML(w, encodeResponse(OA))

	sendEvent(eventArgs{
		EventName:    event.ObjectAccessedAttributes,
		BucketName:   bucket,
		Object:       objInfo,
		ReqParams:    extractReqParams(r),
		RespElements: extractRespElements(w),
		UserAgent:    r.UserAgent(),
		Host:         handlers.GetSourceIP(r),
	})

	return
}

// GetObjectHandler - GET Object
// ----------
// This implementation of the GET operation retrieves object. To use GET,
// you must have READ access to the object.
func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
	ctx := newContext(r, w, "GetObject")

	defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))

	objectAPI := api.ObjectAPI()
	if objectAPI == nil {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
		return
	}

	vars := mux.Vars(r)
	bucket := vars["bucket"]
	object, err := unescapePath(vars["object"])
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}
	if !globalAPIConfig.shouldGzipObjects() {
		w.Header().Set(gzhttp.HeaderNoCompression, "true")
	}

	if r.Header.Get(xMinIOExtract) == "true" && strings.Contains(object, archivePattern) {
		api.getObjectInArchiveFileHandler(ctx, objectAPI, bucket, object, w, r)
	} else {
		api.getObjectHandler(ctx, objectAPI, bucket, object, w, r)
	}
}

func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI ObjectLayer, bucket, object string, w http.ResponseWriter, r *http.Request) {
	if crypto.S3.IsRequested(r.Header) || crypto.S3KMS.IsRequested(r.Header) { // If SSE-S3 or SSE-KMS present -> AWS fails with undefined error
		writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrBadRequest))
		return
	}

	getObjectInfo := objectAPI.GetObjectInfo

	opts, err := getOpts(ctx, r, bucket, object)
	if err != nil {
		writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
		return
	}

	// Check for auth type to return S3 compatible error.
	// type to return the correct error (NoSuchKey vs AccessDenied)
	if s3Error := authenticateRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone {
		if getRequestAuthType(r) == authTypeAnonymous {
			// As per "Permission" section in
			// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectHEAD.html
			// If the object you request does not exist,
			// the error Amazon S3 returns depends on
			// whether you also have the s3:ListBucket
			// permission.
			// * If you have the s3:ListBucket permission
			//   on the bucket, Amazon S3 will return an
			//   HTTP status code 404 ("no such key")
			//   error.
			// * if you don’t have the s3:ListBucket
			//   permission, Amazon S3 will return an HTTP
			//   status code 403 ("access denied") error.`
			if globalPolicySys.IsAllowed(policy.BucketPolicyArgs{
				Action:          policy.ListBucketAction,
				BucketName:      bucket,
				ConditionValues: getConditionValues(r, "", auth.AnonymousCredentials),
				IsOwner:         false,
			}) {
				getObjectInfo := objectAPI.GetObjectInfo

				_, err = getObjectInfo(ctx, bucket, object, opts)
				if toAPIError(ctx, err).Code == "NoSuchKey" {
					s3Error = ErrNoSuchKey
				}
			}
		}
		writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(s3Error))
		return
	}

	// Get request range.
	var rs *HTTPRangeSpec
	rangeHeader := r.Header.Get(xhttp.Range)
	if rangeHeader != "" {
		rs, _ = parseRequestRangeSpec(rangeHeader)
	}

	if rangeHeader != "" {
		// Both 'Range' and 'partNumber' cannot be specified at the same time
		if opts.PartNumber > 0 {
			writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRangePartNumber))
			return
		}

		if rs, err = parseRequestRangeSpec(rangeHeader); err != nil {
			// Handle only errInvalidRange. Ignore other
			// parse error and treat it as regular Get
			// request like Amazon S3.
			if errors.Is(err, errInvalidRange) {
				writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInvalidRange))
				return
			}
		}
	}

	cachedResult := globalCacheConfig.Enabled() && opts.VersionID == ""

	var update bool
	if cachedResult {
		rc := &cache.CondCheck{}
		h := r.Header.Clone()
		if opts.PartNumber > 0 {
			h.Set(xhttp.PartNumber, strconv.Itoa(opts.PartNumber))
		}
		rc.Init(bucket, object, h)

		ci, err := globalCacheConfig.Get(rc)
		if ci != nil {
			tgs, ok := ci.Metadata[xhttp.AmzObjectTagging]
			if ok {
				// Set this such that authorization policies can be applied on the object tags.
				r.Header.Set(xhttp.AmzObjectTagging, tgs)
			}

			if s3Error := authorizeRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone {
				writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(s3Error))
				return
			}

			okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent ||
				ci.StatusCode == http.StatusPreconditionFailed || ci.StatusCode == http.StatusNotModified)
			if okSt {
				ci.WriteHeaders(w, func() {
					// set common headers
					setCommonHeaders(w)
				}, func() {
					okSt := (ci.StatusCode == http.StatusOK || ci.StatusCode == http.StatusPartialContent)
					if okSt {
						for k, v := range ci.Metadata {
							w.Header().Set(k, v)
						}

						// For providing ranged content
						start, rangeLen, err := rs.GetOffsetLength(ci.Size)
						if err != nil {
							start, rangeLen = 0, ci.Size
						}

						if opts.PartNumber > 0 && strings.Contains(ci.ETag, "-") {
							w.Header()[xhttp.AmzMpPartsCount] = []string{
								strings.TrimLeftFunc(ci.ETag, func(r rune) bool {
									return !unicode.IsNumber(r)
								}),
							}
						}

						// Set content length for the range.
						w.Header().Set(xhttp.ContentLength, strconv.FormatInt(rangeLen, 10))
						if rs != nil {
							contentRange := fmt.Sprintf("bytes %d-%d/%d", start, start+rangeLen-1, ci.Size)
							w.Header().Set(xhttp.ContentRange, contentRange)
						}

						return
					}
					if ci.StatusCode == http.StatusPreconditionFailed {
						writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrPreconditionFailed), r.URL)
						return
					}

					w.WriteHeader(ci.StatusCode)
				})
				return
			}
		}
		if errors.Is(err, cache.ErrKeyMissing) {
			update = true
		}
	}

	opts.FastGetObjInfo = true

	objInfo, err := getObjectInfo(ctx, bucket, object, opts)
	var proxy proxyResult
	if err != nil {
		// proxy HEAD to replication target if active-active replication configured on bucket
		proxytgts := getProxyTargets(ctx, bucket, object, opts)
		if !proxytgts.Empty() {
			globalReplicationStats.incProxy(bucket, headObjectAPI, false)
			var oi ObjectInfo
			oi, proxy = proxyHeadToReplicationTarget(ctx, bucket, object, rs, opts, proxytgts)
			if proxy.Proxy {
				objInfo = oi
			}
			if proxy.Err != nil {
				globalReplicationStats.incProxy(bucket, headObjectAPI, true)
				writeErrorResponseHeadersOnly(w, toAPIError(ctx, proxy.Err))
				return
			}
		}
	}

	if objInfo.UserTags != "" {
		// Set this such that authorization policies can be applied on the object tags.
		r.Header.Set(xhttp.AmzObjectTagging, objInfo.UserTags)
	}

	if s3Error := authorizeRequest(ctx, r, policy.GetObjectAction); s3Error != ErrNone {
		writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(s3Error))
		return
	}

	if err != nil && !proxy.Proxy {
		switch {
		case !objInfo.VersionPurgeStatus.Empty():
			w.Header()[xhttp.MinIODeleteReplicationStatus] = []string{string(objInfo.VersionPurgeStatus)}
		case !objInfo.ReplicationStatus.Empty() && objInfo.DeleteMarker:
			w.Header()[xhttp.MinIODeleteMarkerReplicationStatus] = []string{string(objInfo.ReplicationStatus)}
		}
		// Versioning enabled quite possibly object is deleted might be delete-marker
		// if present set the headers, no idea why AWS S3 sets these headers.
		if objInfo.VersionID != "" && objInfo.DeleteMarker {
			w.Header()[xhttp.AmzVersionID] = []string{objInfo.VersionID}
			w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(objInfo.DeleteMarker)}
		}

		QueueReplicationHeal(ctx, bucket, objInfo, 0)
		// do an additional verification whether object exists when object is deletemarker and request
		// is from replication
		if opts.CheckDMReplicationReady {
			topts := opts
			topts.VersionID = ""
			goi, gerr := getObjectInfo(ctx, bucket, object, topts)
			if gerr == nil || goi.VersionID != "" { // object layer returned more info because object is deleted
				w.Header().Set(xhttp.MinIOTargetReplicationReady, "true")
			}
		}

		writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
		return
	}

	if !proxy.Proxy { // apply lifecycle rules only locally not for proxied requests
		// Automatically remove the object/version if an expiry lifecycle rule can be applied
		if lc, err := globalLifecycleSys.Get(bucket); err == nil {
			rcfg, _ := globalBucketObjectLockSys.Get(bucket)
			replcfg, _ := getReplicationConfig(ctx, bucket)
			event := evalActionFromLifecycle(ctx, *lc, rcfg, replcfg, objInfo)
			if event.Action.Delete() {
				// apply whatever the expiry rule is.
				applyExpiryRule(event, lcEventSrc_s3HeadObject, objInfo)
				if !event.Action.DeleteRestored() {
					// If the ILM action is not on restored object return error.
					writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrNoSuchKey))
					return
				}
			}
		}
		QueueReplicationHeal(ctx, bucket, objInfo, 0)
	}

	// filter object lock metadata if permission does not permit
	getRetPerms := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object)
	legalHoldPerms := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object)

	// filter object lock metadata if permission does not permit
	objInfo.UserDefined = objectlock.FilterObjectLockMetadata(objInfo.UserDefined, getRetPerms != ErrNone, legalHoldPerms != ErrNone)

	if _, err = DecryptObjectInfo(&objInfo, r); err != nil {
		writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
		return
	}

	if update {
		asize, err := objInfo.GetActualSize()
		if err != nil {
			asize = objInfo.Size
		}

		defer globalCacheConfig.Set(&cache.ObjectInfo{
			Key:          objInfo.Name,
			Bucket:       objInfo.Bucket,
			ETag:         objInfo.ETag,
			ModTime:      objInfo.ModTime,
			Expires:      objInfo.ExpiresStr(),
			CacheControl: objInfo.CacheControl,
			Size:         asize,
			Metadata:     cleanReservedKeys(objInfo.UserDefined),
		})
	}

	// Validate pre-conditions if any.
	if checkPreconditions(ctx, w, r, objInfo, opts) {
		return
	}

	// Set encryption response headers
	switch kind, _ := crypto.IsEncrypted(objInfo.UserDefined); kind {
	case crypto.S3:
		w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionAES)
	case crypto.S3KMS:
		w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionKMS)
		w.Header().Set(xhttp.AmzServerSideEncryptionKmsID, objInfo.KMSKeyID())
		if kmsCtx, ok := objInfo.UserDefined[crypto.MetaContext]; ok {
			w.Header().Set(xhttp.AmzServerSideEncryptionKmsContext, kmsCtx)
		}
	case crypto.SSEC:
		// Validate the SSE-C Key set in the header.
		if _, err = crypto.SSEC.UnsealObjectKey(r.Header, objInfo.UserDefined, bucket, object); err != nil {
			writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
			return
		}
		w.Header().Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerAlgorithm))
		w.Header().Set(xhttp.AmzServerSideEncryptionCustomerKeyMD5, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerKeyMD5))
	}

	if r.Header.Get(xhttp.AmzChecksumMode) == "ENABLED" && rs == nil {
		// AWS S3 silently drops checksums on range requests.
		hash.AddChecksumHeader(w, objInfo.decryptChecksums(opts.PartNumber))
	}

	// Set standard object headers.
	if err = setObjectHeaders(w, objInfo, rs, opts); err != nil {
		writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
		return
	}

	// Set Parts Count Header
	if opts.PartNumber > 0 && len(objInfo.Parts) > 0 {
		setPartsCountHeaders(w, objInfo)
	}

	// Set any additional requested response headers.
	setHeadGetRespHeaders(w, r.Form)

	// Successful response.
	if rs != nil || opts.PartNumber > 0 {
		w.WriteHeader(http.StatusPartialContent)
	} else {
		w.WriteHeader(http.StatusOK)
	}

	// Notify object accessed via a HEAD request.
	sendEvent(eventArgs{
		EventName:    event.ObjectAccessedHead,
		BucketName:   bucket,
		Object:       objInfo,
		ReqParams:    extractReqParams(r),
		RespElements: extractRespElements(w),
		UserAgent:    r.UserAgent(),
		Host:         handlers.GetSourceIP(r),
	})
}

// GetObjectAttributesHandles - GET Object
// -----------
// This operation retrieves metadata and part metadata from an object without returning the object itself.
func (api objectAPIHandlers) GetObjectAttributesHandler(w http.ResponseWriter, r *http.Request) {
	ctx := newContext(r, w, "GetObjectAttributes")
	defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))

	objectAPI := api.ObjectAPI()
	if objectAPI == nil {
		writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrServerNotInitialized))
		return
	}

	vars := mux.Vars(r)
	bucket := vars["bucket"]
	object, err := unescapePath(vars["object"])
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	api.getObjectAttributesHandler(ctx, objectAPI, bucket, object, w, r)
}

// HeadObjectHandler - HEAD Object
// -----------
// The HEAD operation retrieves metadata from an object without returning the object itself.
func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
	ctx := newContext(r, w, "HeadObject")
	defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))

	objectAPI := api.ObjectAPI()
	if objectAPI == nil {
		writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrServerNotInitialized))
		return
	}

	vars := mux.Vars(r)
	bucket := vars["bucket"]
	object, err := unescapePath(vars["object"])
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	if r.Header.Get(xMinIOExtract) == "true" && strings.Contains(object, archivePattern) {
		api.headObjectInArchiveFileHandler(ctx, objectAPI, bucket, object, w, r)
	} else {
		api.headObjectHandler(ctx, objectAPI, bucket, object, w, r)
	}
}

// Extract metadata relevant for an CopyObject operation based on conditional
// header values specified in X-Amz-Metadata-Directive.
func getCpObjMetadataFromHeader(ctx context.Context, r *http.Request, userMeta map[string]string) (map[string]string, error) {
	// Make a copy of the supplied metadata to avoid
	// to change the original one.
	defaultMeta := make(map[string]string, len(userMeta))
	for k, v := range userMeta {
		// skip tier metadata when copying metadata from source object
		switch k {
		case metaTierName, metaTierStatus, metaTierObjName, metaTierVersionID:
			continue
		}
		defaultMeta[k] = v
	}

	// remove SSE Headers from source info
	crypto.RemoveSSEHeaders(defaultMeta)

	// Storage class is special, it can be replaced regardless of the
	// metadata directive, if set should be preserved and replaced
	// to the destination metadata.
	sc := r.Header.Get(xhttp.AmzStorageClass)
	if sc == "" {
		sc = r.Form.Get(xhttp.AmzStorageClass)
	}

	// if x-amz-metadata-directive says REPLACE then
	// we extract metadata from the input headers.
	if isDirectiveReplace(r.Header.Get(xhttp.AmzMetadataDirective)) {
		emetadata, err := extractMetadataFromReq(ctx, r)
		if err != nil {
			return nil, err
		}
		if sc != "" {
			emetadata[xhttp.AmzStorageClass] = sc
		}
		return emetadata, nil
	}

	if sc != "" {
		defaultMeta[xhttp.AmzStorageClass] = sc
	}

	// if x-amz-metadata-directive says COPY then we
	// return the default metadata.
	if isDirectiveCopy(r.Header.Get(xhttp.AmzMetadataDirective)) {
		return defaultMeta, nil
	}

	// Copy is default behavior if not x-amz-metadata-directive is set.
	return defaultMeta, nil
}

// getRemoteInstanceTransport contains a singleton roundtripper.
var (
	getRemoteInstanceTransport     *http.Transport
	getRemoteInstanceTransportOnce sync.Once
)

// Returns a minio-go Client configured to access remote host described by destDNSRecord
// Applicable only in a federated deployment
var getRemoteInstanceClient = func(r *http.Request, host string) (*miniogo.Core, error) {
	cred := getReqAccessCred(r, globalSite.Region)
	// In a federated deployment, all the instances share config files
	// and hence expected to have same credentials.
	core, err := miniogo.NewCore(host, &miniogo.Options{
		Creds:     credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, ""),
		Secure:    globalIsTLS,
		Transport: getRemoteInstanceTransport,
	})
	if err != nil {
		return nil, err
	}
	core.SetAppInfo("minio-federated", ReleaseTag)
	return core, nil
}

// Check if the destination bucket is on a remote site, this code only gets executed
// when federation is enabled, ie when globalDNSConfig is non 'nil'.
//
// This function is similar to isRemoteCallRequired but specifically for COPY object API
// if destination and source are same we do not need to check for destination bucket
// to exist locally.
func isRemoteCopyRequired(ctx context.Context, srcBucket, dstBucket string, objAPI ObjectLayer) bool {
	if srcBucket == dstBucket {
		return false
	}
	return isRemoteCallRequired(ctx, dstBucket, objAPI)
}

// Check if the bucket is on a remote site, this code only gets executed when federation is enabled.
func isRemoteCallRequired(ctx context.Context, bucket string, objAPI ObjectLayer) bool {
	if globalDNSConfig == nil {
		return false
	}
	if globalBucketFederation {
		_, err := objAPI.GetBucketInfo(ctx, bucket, BucketOptions{})
		return err == toObjectErr(errVolumeNotFound, bucket)
	}
	return false
}

// CopyObjectHandler - Copy Object
// ----------
// This implementation of the PUT operation adds an object to a bucket
// while reading the object from another source.
// Notice: The S3 client can send secret keys in headers for encryption related jobs,
// the handler should ensure to remove these keys before sending them to the object layer.
// Currently these keys are:
//   - X-Amz-Server-Side-Encryption-Customer-Key
//   - X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key
func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
	ctx := newContext(r, w, "CopyObject")

	defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))

	objectAPI := api.ObjectAPI()
	if objectAPI == nil {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
		return
	}

	vars := mux.Vars(r)
	dstBucket := vars["bucket"]
	dstObject, err := unescapePath(vars["object"])
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectAction, dstBucket, dstObject); s3Error != ErrNone {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
		return
	}

	// Read escaped copy source path to check for parameters.
	cpSrcPath := r.Header.Get(xhttp.AmzCopySource)
	var vid string
	if u, err := url.Parse(cpSrcPath); err == nil {
		vid = strings.TrimSpace(u.Query().Get(xhttp.VersionID))
		// Note that url.Parse does the unescaping
		cpSrcPath = u.Path
	}

	srcBucket, srcObject := path2BucketObject(cpSrcPath)
	// If source object is empty or bucket is empty, reply back invalid copy source.
	if srcObject == "" || srcBucket == "" {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL)
		return
	}

	if vid != "" && vid != nullVersionID {
		_, err := uuid.Parse(vid)
		if err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, VersionNotFound{
				Bucket:    srcBucket,
				Object:    srcObject,
				VersionID: vid,
			}), r.URL)
			return
		}
	}

	if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectAction, srcBucket, srcObject); s3Error != ErrNone {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
		return
	}

	// Check if metadata directive is valid.
	if !isDirectiveValid(r.Header.Get(xhttp.AmzMetadataDirective)) {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMetadataDirective), r.URL)
		return
	}

	// check if tag directive is valid
	if !isDirectiveValid(r.Header.Get(xhttp.AmzTagDirective)) {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidTagDirective), r.URL)
		return
	}

	// Validate storage class metadata if present
	dstSc := r.Header.Get(xhttp.AmzStorageClass)
	if dstSc != "" && !storageclass.IsValid(dstSc) {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidStorageClass), r.URL)
		return
	}

	// Check if bucket encryption is enabled
	sseConfig, _ := globalBucketSSEConfigSys.Get(dstBucket)
	sseConfig.Apply(r.Header, sse.ApplyOptions{
		AutoEncrypt: globalAutoEncryption,
	})

	var srcOpts, dstOpts ObjectOptions
	srcOpts, err = copySrcOpts(ctx, r, srcBucket, srcObject)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}
	srcOpts.VersionID = vid

	// convert copy src encryption options for GET calls
	getOpts := ObjectOptions{
		VersionID:        srcOpts.VersionID,
		Versioned:        srcOpts.Versioned,
		VersionSuspended: srcOpts.VersionSuspended,
	}
	getSSE := encrypt.SSE(srcOpts.ServerSideEncryption)
	if getSSE != srcOpts.ServerSideEncryption {
		getOpts.ServerSideEncryption = getSSE
	}

	dstOpts, err = copyDstOpts(ctx, r, dstBucket, dstObject, nil)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}
	cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))

	getObjectNInfo := objectAPI.GetObjectNInfo

	checkCopyPrecondFn := func(o ObjectInfo) bool {
		if _, err := DecryptObjectInfo(&o, r); err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return true
		}
		return checkCopyObjectPreconditions(ctx, w, r, o)
	}
	getOpts.CheckPrecondFn = checkCopyPrecondFn
	if cpSrcDstSame {
		getOpts.NoLock = true
	}

	var rs *HTTPRangeSpec
	gr, err := getObjectNInfo(ctx, srcBucket, srcObject, rs, r.Header, getOpts)
	if err != nil {
		if isErrPreconditionFailed(err) {
			return
		}

		// Versioning enabled quite possibly object is deleted might be delete-marker
		// if present set the headers, no idea why AWS S3 sets these headers.
		if gr != nil && gr.ObjInfo.VersionID != "" && gr.ObjInfo.DeleteMarker {
			w.Header()[xhttp.AmzVersionID] = []string{gr.ObjInfo.VersionID}
			w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(gr.ObjInfo.DeleteMarker)}
		}

		// Update context bucket & object names for correct S3 XML error response
		reqInfo := logger.GetReqInfo(ctx)
		reqInfo.BucketName = srcBucket
		reqInfo.ObjectName = srcObject
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}
	defer gr.Close()
	srcInfo := gr.ObjInfo

	// maximum Upload size for object in a single CopyObject operation.
	if isMaxObjectSize(srcInfo.Size) {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL)
		return
	}

	// We have to copy metadata only if source and destination are same.
	// this changes for encryption which can be observed below.
	if cpSrcDstSame {
		srcInfo.metadataOnly = true
	}

	var chStorageClass bool
	if dstSc != "" && dstSc != srcInfo.StorageClass {
		chStorageClass = true
		srcInfo.metadataOnly = false
	} // no changes in storage-class expected so its a metadataonly operation.

	var reader io.Reader = gr

	// Set the actual size to the compressed/decrypted size if encrypted.
	actualSize, err := srcInfo.GetActualSize()
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}
	length := actualSize

	if !cpSrcDstSame {
		if err := enforceBucketQuotaHard(ctx, dstBucket, actualSize); err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}
	}

	// Check if either the source is encrypted or the destination will be encrypted.
	objectEncryption := crypto.Requested(r.Header)
	objectEncryption = objectEncryption || crypto.IsSourceEncrypted(srcInfo.UserDefined)

	var compressMetadata map[string]string
	// No need to compress for remote etcd calls
	// Pass the decompressed stream to such calls.
	isDstCompressed := isCompressible(r.Header, dstObject) &&
		length > minCompressibleSize &&
		!isRemoteCopyRequired(ctx, srcBucket, dstBucket, objectAPI) && !cpSrcDstSame && !objectEncryption
	if isDstCompressed {
		compressMetadata = make(map[string]string, 2)
		// Preserving the compression metadata.
		compressMetadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2
		compressMetadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(actualSize, 10)

		reader = etag.NewReader(ctx, reader, nil, nil)
		wantEncryption := crypto.Requested(r.Header)
		s2c, cb := newS2CompressReader(reader, actualSize, wantEncryption)
		dstOpts.IndexCB = cb
		defer s2c.Close()
		reader = etag.Wrap(s2c, reader)
		length = -1
	} else {
		delete(srcInfo.UserDefined, ReservedMetadataPrefix+"compression")
		delete(srcInfo.UserDefined, ReservedMetadataPrefix+"actual-size")
		reader = gr
	}

	srcInfo.Reader, err = hash.NewReader(ctx, reader, length, "", "", actualSize)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	pReader := NewPutObjReader(srcInfo.Reader)

	// Handle encryption
	encMetadata := make(map[string]string)
	// Encryption parameters not applicable for this object.
	if _, ok := crypto.IsEncrypted(srcInfo.UserDefined); !ok && crypto.SSECopy.IsRequested(r.Header) {
		writeErrorResponse(ctx, w, toAPIError(ctx, errInvalidEncryptionParameters), r.URL)
		return
	}
	// Encryption parameters not present for this object.
	if crypto.SSEC.IsEncrypted(srcInfo.UserDefined) && !crypto.SSECopy.IsRequested(r.Header) {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidSSECustomerAlgorithm), r.URL)
		return
	}

	var oldKey, newKey []byte
	var newKeyID string
	var kmsCtx kms.Context
	var objEncKey crypto.ObjectKey
	sseCopyKMS := crypto.S3KMS.IsEncrypted(srcInfo.UserDefined)
	sseCopyS3 := crypto.S3.IsEncrypted(srcInfo.UserDefined)
	sseCopyC := crypto.SSEC.IsEncrypted(srcInfo.UserDefined) && crypto.SSECopy.IsRequested(r.Header)
	sseC := crypto.SSEC.IsRequested(r.Header)
	sseS3 := crypto.S3.IsRequested(r.Header)
	sseKMS := crypto.S3KMS.IsRequested(r.Header)

	isSourceEncrypted := sseCopyC || sseCopyS3 || sseCopyKMS
	isTargetEncrypted := sseC || sseS3 || sseKMS

	if sseC {
		newKey, err = ParseSSECustomerRequest(r)
		if err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}
	}
	if crypto.S3KMS.IsRequested(r.Header) {
		newKeyID, kmsCtx, err = crypto.S3KMS.ParseHTTP(r.Header)
		if err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}
	}

	// If src == dst and either
	// - the object is encrypted using SSE-C and two different SSE-C keys are present
	// - the object is encrypted using SSE-S3 and the SSE-S3 header is present
	// - the object storage class is not changing
	// then execute a key rotation.
	if cpSrcDstSame && (sseCopyC && sseC) && !chStorageClass {
		oldKey, err = ParseSSECopyCustomerRequest(r.Header, srcInfo.UserDefined)
		if err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}

		for k, v := range srcInfo.UserDefined {
			if stringsHasPrefixFold(k, ReservedMetadataPrefixLower) {
				encMetadata[k] = v
			}
		}

		if err = rotateKey(ctx, oldKey, newKeyID, newKey, srcBucket, srcObject, encMetadata, kmsCtx); err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}

		// Since we are rotating the keys, make sure to update the metadata.
		srcInfo.metadataOnly = true
		srcInfo.keyRotation = true
	} else {
		if isSourceEncrypted || isTargetEncrypted {
			// We are not only copying just metadata instead
			// we are creating a new object at this point, even
			// if source and destination are same objects.
			if !srcInfo.keyRotation {
				srcInfo.metadataOnly = false
			}
		}

		// Calculate the size of the target object
		var targetSize int64

		switch {
		case isDstCompressed:
			targetSize = -1
		case !isSourceEncrypted && !isTargetEncrypted:
			targetSize, _ = srcInfo.GetActualSize()
		case isSourceEncrypted && isTargetEncrypted:
			objInfo := ObjectInfo{Size: actualSize}
			targetSize = objInfo.EncryptedSize()
		case !isSourceEncrypted && isTargetEncrypted:
			targetSize = srcInfo.EncryptedSize()
		case isSourceEncrypted && !isTargetEncrypted:
			targetSize, _ = srcInfo.DecryptedSize()
		}

		if isTargetEncrypted {
			var encReader io.Reader
			kind, _ := crypto.IsRequested(r.Header)
			encReader, objEncKey, err = newEncryptReader(ctx, srcInfo.Reader, kind, newKeyID, newKey, dstBucket, dstObject, encMetadata, kmsCtx)
			if err != nil {
				writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
				return
			}
			reader = etag.Wrap(encReader, srcInfo.Reader)
		}

		if isSourceEncrypted {
			// Remove all source encrypted related metadata to
			// avoid copying them in target object.
			crypto.RemoveInternalEntries(srcInfo.UserDefined)
		}

		// do not try to verify encrypted content
		srcInfo.Reader, err = hash.NewReader(ctx, reader, targetSize, "", "", actualSize)
		if err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}

		if isTargetEncrypted {
			pReader, err = pReader.WithEncryption(srcInfo.Reader, &objEncKey)
			if err != nil {
				writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
				return
			}
			if dstOpts.IndexCB != nil {
				dstOpts.IndexCB = compressionIndexEncrypter(objEncKey, dstOpts.IndexCB)
			}
		}
	}

	srcInfo.PutObjReader = pReader

	srcInfo.UserDefined, err = getCpObjMetadataFromHeader(ctx, r, srcInfo.UserDefined)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	objTags := srcInfo.UserTags
	// If x-amz-tagging-directive header is REPLACE, get passed tags.
	if isDirectiveReplace(r.Header.Get(xhttp.AmzTagDirective)) {
		objTags = r.Header.Get(xhttp.AmzObjectTagging)
		if _, err := tags.ParseObjectTags(objTags); err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}
	}

	if objTags != "" {
		lastTaggingTimestamp := srcInfo.UserDefined[ReservedMetadataPrefixLower+TaggingTimestamp]
		if dstOpts.ReplicationRequest {
			srcTimestamp := dstOpts.ReplicationSourceTaggingTimestamp
			if !srcTimestamp.IsZero() {
				ondiskTimestamp, err := time.Parse(time.RFC3339Nano, lastTaggingTimestamp)
				// update tagging metadata only if replica  timestamp is newer than what's on disk
				if err != nil || (err == nil && ondiskTimestamp.Before(srcTimestamp)) {
					srcInfo.UserDefined[ReservedMetadataPrefixLower+TaggingTimestamp] = srcTimestamp.UTC().Format(time.RFC3339Nano)
					srcInfo.UserDefined[xhttp.AmzObjectTagging] = objTags
				}
			}
		} else {
			srcInfo.UserDefined[xhttp.AmzObjectTagging] = objTags
			srcInfo.UserDefined[ReservedMetadataPrefixLower+TaggingTimestamp] = UTCNow().Format(time.RFC3339Nano)
		}

	}

	srcInfo.UserDefined = filterReplicationStatusMetadata(srcInfo.UserDefined)
	srcInfo.UserDefined = objectlock.FilterObjectLockMetadata(srcInfo.UserDefined, true, true)
	retPerms := isPutActionAllowed(ctx, getRequestAuthType(r), dstBucket, dstObject, r, policy.PutObjectRetentionAction)
	holdPerms := isPutActionAllowed(ctx, getRequestAuthType(r), dstBucket, dstObject, r, policy.PutObjectLegalHoldAction)
	getObjectInfo := objectAPI.GetObjectInfo

	// apply default bucket configuration/governance headers for dest side.
	retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, dstBucket, dstObject, getObjectInfo, retPerms, holdPerms)
	if s3Err == ErrNone && retentionMode.Valid() {
		lastretentionTimestamp := srcInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp]
		if dstOpts.ReplicationRequest {
			srcTimestamp := dstOpts.ReplicationSourceRetentionTimestamp
			if !srcTimestamp.IsZero() {
				ondiskTimestamp, err := time.Parse(time.RFC3339Nano, lastretentionTimestamp)
				// update retention metadata only if replica  timestamp is newer than what's on disk
				if err != nil || (err == nil && ondiskTimestamp.Before(srcTimestamp)) {
					srcInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode)
					srcInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = amztime.ISO8601Format(retentionDate.UTC())
					srcInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp] = srcTimestamp.UTC().Format(time.RFC3339Nano)
				}
			}
		} else {
			srcInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode)
			srcInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = amztime.ISO8601Format(retentionDate.UTC())
			srcInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp] = UTCNow().Format(time.RFC3339Nano)
		}
	}

	if s3Err == ErrNone && legalHold.Status.Valid() {
		lastLegalHoldTimestamp := srcInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockLegalHoldTimestamp]
		if dstOpts.ReplicationRequest {
			srcTimestamp := dstOpts.ReplicationSourceLegalholdTimestamp
			if !srcTimestamp.IsZero() {
				ondiskTimestamp, err := time.Parse(time.RFC3339Nano, lastLegalHoldTimestamp)
				// update legalhold metadata only if replica timestamp is newer than what's on disk
				if err != nil || (err == nil && ondiskTimestamp.Before(srcTimestamp)) {
					srcInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = string(legalHold.Status)
					srcInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp] = srcTimestamp.Format(time.RFC3339Nano)
				}
			}
		} else {
			srcInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = string(legalHold.Status)
		}
	}
	if s3Err != ErrNone {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
		return
	}
	if rs := r.Header.Get(xhttp.AmzBucketReplicationStatus); rs != "" {
		srcInfo.UserDefined[ReservedMetadataPrefixLower+ReplicaStatus] = replication.Replica.String()
		srcInfo.UserDefined[ReservedMetadataPrefixLower+ReplicaTimestamp] = UTCNow().Format(time.RFC3339Nano)
		srcInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = rs
	}

	op := replication.ObjectReplicationType
	if srcInfo.metadataOnly {
		op = replication.MetadataReplicationType
	}
	if dsc := mustReplicate(ctx, dstBucket, dstObject, srcInfo.getMustReplicateOptions(op, dstOpts)); dsc.ReplicateAny() {
		srcInfo.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
		srcInfo.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
	}
	// Store the preserved compression metadata.
	for k, v := range compressMetadata {
		srcInfo.UserDefined[k] = v
	}

	// We need to preserve the encryption headers set in EncryptRequest,
	// so we do not want to override them, copy them instead.
	for k, v := range encMetadata {
		srcInfo.UserDefined[k] = v
	}

	// Ensure that metadata does not contain sensitive information
	crypto.RemoveSensitiveEntries(srcInfo.UserDefined)

	// If we see legacy source, metadataOnly we have to overwrite the content.
	if srcInfo.Legacy {
		srcInfo.metadataOnly = false
	}

	// Check if x-amz-metadata-directive or x-amz-tagging-directive was not set to REPLACE and source,
	// destination are same objects. Apply this restriction also when
	// metadataOnly is true indicating that we are not overwriting the object.
	// if encryption is enabled we do not need explicit "REPLACE" metadata to
	// be enabled as well - this is to allow for key-rotation.
	if !isDirectiveReplace(r.Header.Get(xhttp.AmzMetadataDirective)) && !isDirectiveReplace(r.Header.Get(xhttp.AmzTagDirective)) &&
		srcInfo.metadataOnly && srcOpts.VersionID == "" && !objectEncryption {
		// If x-amz-metadata-directive is not set to REPLACE then we need
		// to error out if source and destination are same.
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopyDest), r.URL)
		return
	}

	remoteCallRequired := isRemoteCopyRequired(ctx, srcBucket, dstBucket, objectAPI)

	var objInfo ObjectInfo
	var os *objSweeper
	if remoteCallRequired {
		var dstRecords []dns.SrvRecord
		dstRecords, err = globalDNSConfig.Get(dstBucket)
		if err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}

		// Send PutObject request to appropriate instance (in federated deployment)
		core, rerr := getRemoteInstanceClient(r, getHostFromSrv(dstRecords))
		if rerr != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, rerr), r.URL)
			return
		}
		tag, err := tags.ParseObjectTags(objTags)
		if err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}
		// Remove the metadata for remote calls.
		delete(srcInfo.UserDefined, ReservedMetadataPrefix+"compression")
		delete(srcInfo.UserDefined, ReservedMetadataPrefix+"actual-size")
		opts := miniogo.PutObjectOptions{
			UserMetadata:         srcInfo.UserDefined,
			ServerSideEncryption: dstOpts.ServerSideEncryption,
			UserTags:             tag.ToMap(),
		}
		remoteObjInfo, rerr := core.PutObject(ctx, dstBucket, dstObject, srcInfo.Reader,
			srcInfo.Size, "", "", opts)
		if rerr != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, rerr), r.URL)
			return
		}
		objInfo.UserDefined = cloneMSS(opts.UserMetadata)
		objInfo.ETag = remoteObjInfo.ETag
		objInfo.ModTime = remoteObjInfo.LastModified
	} else {
		os = newObjSweeper(dstBucket, dstObject).WithVersioning(dstOpts.Versioned, dstOpts.VersionSuspended)
		// Get appropriate object info to identify the remote object to delete
		if !srcInfo.metadataOnly {
			goiOpts := os.GetOpts()
			if !globalTierConfigMgr.Empty() {
				if goi, gerr := getObjectInfo(ctx, dstBucket, dstObject, goiOpts); gerr == nil {
					os.SetTransitionState(goi.TransitionedObject)
				}
			}
		}

		copyObjectFn := objectAPI.CopyObject

		// Copy source object to destination, if source and destination
		// object is same then only metadata is updated.
		objInfo, err = copyObjectFn(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
		if err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}
	}

	origETag := objInfo.ETag
	objInfo.ETag = getDecryptedETag(r.Header, objInfo, false)
	response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime)
	encodedSuccessResponse := encodeResponse(response)

	if dsc := mustReplicate(ctx, dstBucket, dstObject, objInfo.getMustReplicateOptions(replication.ObjectReplicationType, dstOpts)); dsc.ReplicateAny() {
		scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.ObjectReplicationType)
	}

	setPutObjHeaders(w, objInfo, false)
	// We must not use the http.Header().Set method here because some (broken)
	// clients expect the x-amz-copy-source-version-id header key to be literally
	// "x-amz-copy-source-version-id"- not in canonicalized form, preserve it.
	if srcOpts.VersionID != "" {
		w.Header()[strings.ToLower(xhttp.AmzCopySourceVersionID)] = []string{srcOpts.VersionID}
	}

	// Write success response.
	writeSuccessResponseXML(w, encodedSuccessResponse)

	// Notify object created event.
	sendEvent(eventArgs{
		EventName:    event.ObjectCreatedCopy,
		BucketName:   dstBucket,
		Object:       objInfo,
		ReqParams:    extractReqParams(r),
		RespElements: extractRespElements(w),
		UserAgent:    r.UserAgent(),
		Host:         handlers.GetSourceIP(r),
	})

	asize, err := objInfo.GetActualSize()
	if err != nil {
		asize = objInfo.Size
	}

	defer globalCacheConfig.Set(&cache.ObjectInfo{
		Key:          objInfo.Name,
		Bucket:       objInfo.Bucket,
		ETag:         objInfo.ETag,
		ModTime:      objInfo.ModTime,
		Expires:      objInfo.ExpiresStr(),
		CacheControl: objInfo.CacheControl,
		Size:         asize,
		Metadata:     cleanReservedKeys(objInfo.UserDefined),
	})

	if !remoteCallRequired && !globalTierConfigMgr.Empty() {
		// Schedule object for immediate transition if eligible.
		objInfo.ETag = origETag
		enqueueTransitionImmediate(objInfo, lcEventSrc_s3CopyObject)
		// Remove the transitioned object whose object version is being overwritten.
		os.Sweep()
	}
}

// PutObjectHandler - PUT Object
// ----------
// This implementation of the PUT operation adds an object to a bucket.
// Notice: The S3 client can send secret keys in headers for encryption related jobs,
// the handler should ensure to remove these keys before sending them to the object layer.
// Currently these keys are:
//   - X-Amz-Server-Side-Encryption-Customer-Key
//   - X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key
func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
	ctx := newContext(r, w, "PutObject")
	defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))

	objectAPI := api.ObjectAPI()
	if objectAPI == nil {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
		return
	}

	vars := mux.Vars(r)
	bucket := vars["bucket"]
	object, err := unescapePath(vars["object"])
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	// X-Amz-Copy-Source shouldn't be set for this call.
	if _, ok := r.Header[xhttp.AmzCopySource]; ok {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL)
		return
	}

	// Validate storage class metadata if present
	if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" {
		if !storageclass.IsValid(sc) {
			writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidStorageClass), r.URL)
			return
		}
	}

	clientETag, err := etag.FromContentMD5(r.Header)
	if err != nil {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidDigest), r.URL)
		return
	}

	// if Content-Length is unknown/missing, deny the request
	size := r.ContentLength
	rAuthType := getRequestAuthType(r)
	switch rAuthType {
	// Check signature types that must have content length
	case authTypeStreamingSigned, authTypeStreamingSignedTrailer, authTypeStreamingUnsignedTrailer:
		if sizeStr, ok := r.Header[xhttp.AmzDecodedContentLength]; ok {
			if sizeStr[0] == "" {
				writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL)
				return
			}
			size, err = strconv.ParseInt(sizeStr[0], 10, 64)
			if err != nil {
				writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
				return
			}
		}
	}
	if size == -1 {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL)
		return
	}

	// maximum Upload size for objects in a single operation
	if isMaxObjectSize(size) {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL)
		return
	}

	metadata, err := extractMetadataFromReq(ctx, r)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	if objTags := r.Header.Get(xhttp.AmzObjectTagging); objTags != "" {
		if _, err := tags.ParseObjectTags(objTags); err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}

		metadata[xhttp.AmzObjectTagging] = objTags
	}

	var (
		md5hex              = clientETag.String()
		sha256hex           = ""
		rd        io.Reader = r.Body
		s3Err     APIErrorCode
		putObject = objectAPI.PutObject
	)

	// Check if put is allowed
	if s3Err = isPutActionAllowed(ctx, rAuthType, bucket, object, r, policy.PutObjectAction); s3Err != ErrNone {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
		return
	}

	switch rAuthType {
	case authTypeStreamingSigned, authTypeStreamingSignedTrailer:
		// Initialize stream signature verifier.
		rd, s3Err = newSignV4ChunkedReader(r, rAuthType == authTypeStreamingSignedTrailer)
		if s3Err != ErrNone {
			writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
			return
		}
	case authTypeStreamingUnsignedTrailer:
		// Initialize stream chunked reader with optional trailers.
		rd, s3Err = newUnsignedV4ChunkedReader(r, true)
		if s3Err != ErrNone {
			writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
			return
		}
	case authTypeSignedV2, authTypePresignedV2:
		s3Err = isReqAuthenticatedV2(r)
		if s3Err != ErrNone {
			writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
			return
		}

	case authTypePresigned, authTypeSigned:
		if s3Err = reqSignatureV4Verify(r, globalSite.Region, serviceS3); s3Err != ErrNone {
			writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
			return
		}
		if !skipContentSha256Cksum(r) {
			sha256hex = getContentSha256Cksum(r, serviceS3)
		}
	}

	if _, ok := r.Header[xhttp.MinIOSourceReplicationCheck]; ok {
		// requests to just validate replication settings and permissions are not allowed to write data
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrReplicationPermissionCheckError), r.URL)
		return
	}

	if err := enforceBucketQuotaHard(ctx, bucket, size); err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}
	if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() {
		if s3Err = isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, policy.ReplicateObjectAction); s3Err != ErrNone {
			writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
			return
		}
		metadata[ReservedMetadataPrefixLower+ReplicaStatus] = replication.Replica.String()
		metadata[ReservedMetadataPrefixLower+ReplicaTimestamp] = UTCNow().Format(time.RFC3339Nano)
		defer globalReplicationStats.UpdateReplicaStat(bucket, size)
	}

	// Check if bucket encryption is enabled
	sseConfig, _ := globalBucketSSEConfigSys.Get(bucket)
	sseConfig.Apply(r.Header, sse.ApplyOptions{
		AutoEncrypt: globalAutoEncryption,
	})

	var buf *bytebufferpool.ByteBuffer
	if globalCacheConfig.MatchesSize(size) {
		buf = bytebufferpool.Get()
		defer bytebufferpool.Put(buf)
	}

	var reader io.Reader
	reader = rd
	if buf != nil {
		reader = io.TeeReader(rd, buf)
	}

	actualSize := size
	var idxCb func() []byte
	if isCompressible(r.Header, object) && size > minCompressibleSize {
		// Storing the compression metadata.
		metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2
		metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10)

		actualReader, err := hash.NewReader(ctx, reader, size, md5hex, sha256hex, actualSize)
		if err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}
		if err = actualReader.AddChecksum(r, false); err != nil {
			writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidChecksum), r.URL)
			return
		}
		// Set compression metrics.
		var s2c io.ReadCloser
		wantEncryption := crypto.Requested(r.Header)
		s2c, idxCb = newS2CompressReader(actualReader, actualSize, wantEncryption)
		defer s2c.Close()

		reader = etag.Wrap(s2c, actualReader)
		size = -1   // Since compressed size is un-predictable.
		md5hex = "" // Do not try to verify the content.
		sha256hex = ""
	}

	var forceMD5 []byte
	// Optimization: If SSE-KMS and SSE-C did not request Content-Md5. Use uuid as etag. Optionally enable this also
	// for server that is started with `--no-compat`.
	if !etag.ContentMD5Requested(r.Header) && (crypto.S3KMS.IsRequested(r.Header) || crypto.SSEC.IsRequested(r.Header) || !globalServerCtxt.StrictS3Compat) {
		forceMD5 = mustGetUUIDBytes()
	}
	hashReader, err := hash.NewReaderWithOpts(ctx, reader, hash.Options{
		Size:       size,
		MD5Hex:     md5hex,
		SHA256Hex:  sha256hex,
		ActualSize: actualSize,
		DisableMD5: false,
		ForceMD5:   forceMD5,
	})
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}
	if err := hashReader.AddChecksum(r, size < 0); err != nil {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidChecksum), r.URL)
		return
	}

	rawReader := hashReader
	pReader := NewPutObjReader(rawReader)

	var opts ObjectOptions
	opts, err = putOptsFromReq(ctx, r, bucket, object, metadata)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}
	opts.IndexCB = idxCb

	if opts.PreserveETag != "" ||
		r.Header.Get(xhttp.IfMatch) != "" ||
		r.Header.Get(xhttp.IfNoneMatch) != "" {
		opts.CheckPrecondFn = func(oi ObjectInfo) bool {
			if _, err := DecryptObjectInfo(&oi, r); err != nil {
				writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
				return true
			}
			return checkPreconditionsPUT(ctx, w, r, oi, opts)
		}
	}

	retPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, policy.PutObjectRetentionAction)
	holdPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, policy.PutObjectLegalHoldAction)

	getObjectInfo := objectAPI.GetObjectInfo

	retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, getObjectInfo, retPerms, holdPerms)
	if s3Err == ErrNone && retentionMode.Valid() {
		metadata[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode)
		metadata[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = amztime.ISO8601Format(retentionDate.UTC())
	}
	if s3Err == ErrNone && legalHold.Status.Valid() {
		metadata[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = string(legalHold.Status)
	}
	if s3Err != ErrNone {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
		return
	}
	if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
		metadata[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
		metadata[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
	}
	var objectEncryptionKey crypto.ObjectKey
	if crypto.Requested(r.Header) {
		if crypto.SSECopy.IsRequested(r.Header) {
			writeErrorResponse(ctx, w, toAPIError(ctx, errInvalidEncryptionParameters), r.URL)
			return
		}

		if crypto.SSEC.IsRequested(r.Header) && crypto.S3.IsRequested(r.Header) {
			writeErrorResponse(ctx, w, toAPIError(ctx, crypto.ErrIncompatibleEncryptionMethod), r.URL)
			return
		}

		if crypto.SSEC.IsRequested(r.Header) && crypto.S3KMS.IsRequested(r.Header) {
			writeErrorResponse(ctx, w, toAPIError(ctx, crypto.ErrIncompatibleEncryptionMethod), r.URL)
			return
		}

		if crypto.SSEC.IsRequested(r.Header) && isCompressible(r.Header, object) {
			writeErrorResponse(ctx, w, toAPIError(ctx, crypto.ErrIncompatibleEncryptionWithCompression), r.URL)
			return
		}

		reader, objectEncryptionKey, err = EncryptRequest(hashReader, r, bucket, object, metadata)
		if err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}

		wantSize := int64(-1)
		if size >= 0 {
			info := ObjectInfo{Size: size}
			wantSize = info.EncryptedSize()
		}

		// do not try to verify encrypted content
		hashReader, err = hash.NewReader(ctx, etag.Wrap(reader, hashReader), wantSize, "", "", actualSize)
		if err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}
		pReader, err = pReader.WithEncryption(hashReader, &objectEncryptionKey)
		if err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}
		if opts.IndexCB != nil {
			opts.IndexCB = compressionIndexEncrypter(objectEncryptionKey, opts.IndexCB)
		}
		opts.EncryptFn = metadataEncrypter(objectEncryptionKey)
	}

	// Ensure that metadata does not contain sensitive information
	crypto.RemoveSensitiveEntries(metadata)

	os := newObjSweeper(bucket, object).WithVersioning(opts.Versioned, opts.VersionSuspended)
	if !globalTierConfigMgr.Empty() {
		// Get appropriate object info to identify the remote object to delete
		goiOpts := os.GetOpts()
		if goi, gerr := getObjectInfo(ctx, bucket, object, goiOpts); gerr == nil {
			os.SetTransitionState(goi.TransitionedObject)
		}
	}

	// Create the object..
	objInfo, err := putObject(ctx, bucket, object, pReader, opts)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	if r.Header.Get(xMinIOExtract) == "true" && HasSuffix(object, archiveExt) {
		opts := ObjectOptions{VersionID: objInfo.VersionID, MTime: objInfo.ModTime}
		if _, err := updateObjectMetadataWithZipInfo(ctx, objectAPI, bucket, object, opts); err != nil {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}
	}

	origETag := objInfo.ETag
	if kind, encrypted := crypto.IsEncrypted(objInfo.UserDefined); encrypted {
		switch kind {
		case crypto.S3:
			w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionAES)
			objInfo.ETag, _ = DecryptETag(objectEncryptionKey, ObjectInfo{ETag: objInfo.ETag})
		case crypto.S3KMS:
			w.Header().Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionKMS)
			w.Header().Set(xhttp.AmzServerSideEncryptionKmsID, objInfo.KMSKeyID())
			if kmsCtx, ok := objInfo.UserDefined[crypto.MetaContext]; ok {
				w.Header().Set(xhttp.AmzServerSideEncryptionKmsContext, kmsCtx)
			}
			if len(objInfo.ETag) >= 32 && strings.Count(objInfo.ETag, "-") != 1 {
				objInfo.ETag = objInfo.ETag[len(objInfo.ETag)-32:]
			}
		case crypto.SSEC:
			w.Header().Set(xhttp.AmzServerSideEncryptionCustomerAlgorithm, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerAlgorithm))
			w.Header().Set(xhttp.AmzServerSideEncryptionCustomerKeyMD5, r.Header.Get(xhttp.AmzServerSideEncryptionCustomerKeyMD5))

			if len(objInfo.ETag) >= 32 && strings.Count(objInfo.ETag, "-") != 1 {
				objInfo.ETag = objInfo.ETag[len(objInfo.ETag)-32:]
			}
		}
	}
	if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
		scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.ObjectReplicationType)
	}

	setPutObjHeaders(w, objInfo, false)

	defer func() {
		var data []byte
		if buf != nil {
			data = buf.Bytes()
		}

		asize, err := objInfo.GetActualSize()
		if err != nil {
			asize = objInfo.Size
		}

		globalCacheConfig.Set(&cache.ObjectInfo{
			Key:          objInfo.Name,
			Bucket:       objInfo.Bucket,
			ETag:         objInfo.ETag,
			ModTime:      objInfo.ModTime,
			Expires:      objInfo.ExpiresStr(),
			CacheControl: objInfo.CacheControl,
			Size:         asize,
			Metadata:     cleanReservedKeys(objInfo.UserDefined),
			Data:         data,
		})
	}()

	// Notify object created event.
	evt := eventArgs{
		EventName:    event.ObjectCreatedPut,
		BucketName:   bucket,
		Object:       objInfo,
		ReqParams:    extractReqParams(r),
		RespElements: extractRespElements(w),
		UserAgent:    r.UserAgent(),
		Host:         handlers.GetSourceIP(r),
	}
	sendEvent(evt)
	if objInfo.NumVersions > int(scannerExcessObjectVersions.Load()) {
		evt.EventName = event.ObjectManyVersions
		sendEvent(evt)

		auditLogInternal(context.Background(), AuditLogOptions{
			Event:     "scanner:manyversions",
			APIName:   "PutObject",
			Bucket:    objInfo.Bucket,
			Object:    objInfo.Name,
			VersionID: objInfo.VersionID,
			Status:    http.StatusText(http.StatusOK),
		})
	}

	// Do not send checksums in events to avoid leaks.
	hash.TransferChecksumHeader(w, r)
	writeSuccessResponseHeadersOnly(w)

	// Remove the transitioned object whose object version is being overwritten.
	if !globalTierConfigMgr.Empty() {
		// Schedule object for immediate transition if eligible.
		objInfo.ETag = origETag
		enqueueTransitionImmediate(objInfo, lcEventSrc_s3PutObject)
		os.Sweep()
	}
}

// PutObjectExtractHandler - PUT Object extract is an extended API
// based off from AWS Snowball feature to auto extract compressed
// stream will be extracted in the same directory it is stored in
// and the folder structures will be built out accordingly.
func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *http.Request) {
	ctx := newContext(r, w, "PutObjectExtract")
	defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))

	objectAPI := api.ObjectAPI()
	if objectAPI == nil {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
		return
	}

	if crypto.S3KMS.IsRequested(r.Header) { // SSE-KMS is not supported
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
		return
	}

	vars := mux.Vars(r)
	bucket := vars["bucket"]
	object, err := unescapePath(vars["object"])
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	// X-Amz-Copy-Source shouldn't be set for this call.
	if _, ok := r.Header[xhttp.AmzCopySource]; ok {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL)
		return
	}

	// Validate storage class metadata if present
	sc := r.Header.Get(xhttp.AmzStorageClass)
	if sc != "" {
		if !storageclass.IsValid(sc) {
			writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidStorageClass), r.URL)
			return
		}
	}

	clientETag, err := etag.FromContentMD5(r.Header)
	if err != nil {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidDigest), r.URL)
		return
	}

	// if Content-Length is unknown/missing, deny the request
	size := r.ContentLength
	rAuthType := getRequestAuthType(r)
	if rAuthType == authTypeStreamingSigned || rAuthType == authTypeStreamingSignedTrailer {
		if sizeStr, ok := r.Header[xhttp.AmzDecodedContentLength]; ok {
			if sizeStr[0] == "" {
				writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL)
				return
			}
			size, err = strconv.ParseInt(sizeStr[0], 10, 64)
			if err != nil {
				writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
				return
			}
		}
	}

	if size == -1 {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL)
		return
	}

	// maximum Upload size for objects in a single operation
	if isMaxObjectSize(size) {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL)
		return
	}

	var (
		md5hex              = clientETag.String()
		sha256hex           = ""
		reader    io.Reader = r.Body
		s3Err     APIErrorCode
		putObject = objectAPI.PutObject
	)

	// Check if put is allowed
	if s3Err = isPutActionAllowed(ctx, rAuthType, bucket, object, r, policy.PutObjectAction); s3Err != ErrNone {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
		return
	}

	switch rAuthType {
	case authTypeStreamingSigned, authTypeStreamingSignedTrailer:
		// Initialize stream signature verifier.
		reader, s3Err = newSignV4ChunkedReader(r, rAuthType == authTypeStreamingSignedTrailer)
		if s3Err != ErrNone {
			writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
			return
		}
	case authTypeSignedV2, authTypePresignedV2:
		s3Err = isReqAuthenticatedV2(r)
		if s3Err != ErrNone {
			writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
			return
		}

	case authTypePresigned, authTypeSigned:
		if s3Err = reqSignatureV4Verify(r, globalSite.Region, serviceS3); s3Err != ErrNone {
			writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
			return
		}
		if !skipContentSha256Cksum(r) {
			sha256hex = getContentSha256Cksum(r, serviceS3)
		}
	}

	hreader, err := hash.NewReader(ctx, reader, size, md5hex, sha256hex, size)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}
	if err = hreader.AddChecksum(r, false); err != nil {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidChecksum), r.URL)
		return
	}

	if err := enforceBucketQuotaHard(ctx, bucket, size); err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	// Check if bucket encryption is enabled
	sseConfig, _ := globalBucketSSEConfigSys.Get(bucket)
	sseConfig.Apply(r.Header, sse.ApplyOptions{
		AutoEncrypt: globalAutoEncryption,
	})

	retPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, policy.PutObjectRetentionAction)
	holdPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, policy.PutObjectLegalHoldAction)

	getObjectInfo := objectAPI.GetObjectInfo

	// These are static for all objects extracted.
	reqParams := extractReqParams(r)
	respElements := map[string]string{
		"requestId": w.Header().Get(xhttp.AmzRequestID),
		"nodeId":    w.Header().Get(xhttp.AmzRequestHostID),
	}
	if sc == "" {
		sc = storageclass.STANDARD
	}

	putObjectTar := func(reader io.Reader, info os.FileInfo, object string) error {
		size := info.Size()

		metadata := map[string]string{
			xhttp.AmzStorageClass: sc, // save same storage-class as incoming stream.
		}

		actualSize := size
		var idxCb func() []byte
		if isCompressible(r.Header, object) && size > minCompressibleSize {
			// Storing the compression metadata.
			metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2
			metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10)

			actualReader, err := hash.NewReader(ctx, reader, size, "", "", actualSize)
			if err != nil {
				return err
			}

			// Set compression metrics.
			wantEncryption := crypto.Requested(r.Header)
			s2c, cb := newS2CompressReader(actualReader, actualSize, wantEncryption)
			defer s2c.Close()
			idxCb = cb
			reader = etag.Wrap(s2c, actualReader)
			size = -1 // Since compressed size is un-predictable.
		}

		hashReader, err := hash.NewReader(ctx, reader, size, "", "", actualSize)
		if err != nil {
			return err
		}

		rawReader := hashReader
		pReader := NewPutObjReader(rawReader)

		if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() {
			if s3Err = isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, policy.ReplicateObjectAction); s3Err != ErrNone {
				return err
			}
			metadata[ReservedMetadataPrefixLower+ReplicaStatus] = replication.Replica.String()
			metadata[ReservedMetadataPrefixLower+ReplicaTimestamp] = UTCNow().Format(time.RFC3339Nano)
		}

		var (
			versionID string
			hdrs      http.Header
		)

		if tarHdrs, ok := info.Sys().(*tar.Header); ok && len(tarHdrs.PAXRecords) > 0 {
			versionID = tarHdrs.PAXRecords["minio.versionId"]
			hdrs = make(http.Header)
			for k, v := range tarHdrs.PAXRecords {
				if k == "minio.versionId" {
					continue
				}
				if strings.HasPrefix(k, "minio.metadata.") {
					k = strings.TrimPrefix(k, "minio.metadata.")
					hdrs.Set(k, v)
				}
			}
			m, err := extractMetadata(ctx, textproto.MIMEHeader(hdrs))
			if err != nil {
				return err
			}
			for k, v := range m {
				metadata[k] = v
			}
		} else {
			versionID = r.Form.Get(xhttp.VersionID)
			hdrs = r.Header
		}

		opts, err := putOpts(ctx, bucket, object, versionID, hdrs, metadata)
		if err != nil {
			return err
		}

		opts.MTime = info.ModTime()
		if opts.MTime.Unix() <= 0 {
			opts.MTime = UTCNow()
		}
		opts.IndexCB = idxCb

		retentionMode, retentionDate, legalHold, s3err := checkPutObjectLockAllowed(ctx, r, bucket, object, getObjectInfo, retPerms, holdPerms)
		if s3err == ErrNone && retentionMode.Valid() {
			metadata[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode)
			metadata[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = amztime.ISO8601Format(retentionDate.UTC())
		}

		if s3err == ErrNone && legalHold.Status.Valid() {
			metadata[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = string(legalHold.Status)
		}

		if s3err != ErrNone {
			s3Err = s3err
			return ObjectLocked{}
		}

		if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
			metadata[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
			metadata[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()

		}

		var objectEncryptionKey crypto.ObjectKey
		if crypto.Requested(r.Header) {
			if crypto.SSECopy.IsRequested(r.Header) {
				return errInvalidEncryptionParameters
			}

			reader, objectEncryptionKey, err = EncryptRequest(hashReader, r, bucket, object, metadata)
			if err != nil {
				return err
			}

			wantSize := int64(-1)
			if size >= 0 {
				info := ObjectInfo{Size: size}
				wantSize = info.EncryptedSize()
			}

			// do not try to verify encrypted content
			hashReader, err = hash.NewReader(ctx, etag.Wrap(reader, hashReader), wantSize, "", "", actualSize)
			if err != nil {
				return err
			}

			pReader, err = pReader.WithEncryption(hashReader, &objectEncryptionKey)
			if err != nil {
				return err
			}
		}
		if opts.IndexCB != nil {
			opts.IndexCB = compressionIndexEncrypter(objectEncryptionKey, opts.IndexCB)
		}

		// Ensure that metadata does not contain sensitive information
		crypto.RemoveSensitiveEntries(metadata)

		os := newObjSweeper(bucket, object).WithVersioning(opts.Versioned, opts.VersionSuspended)
		if !globalTierConfigMgr.Empty() {
			// Get appropriate object info to identify the remote object to delete
			goiOpts := os.GetOpts()
			if goi, gerr := getObjectInfo(ctx, bucket, object, goiOpts); gerr == nil {
				os.SetTransitionState(goi.TransitionedObject)
			}
		}

		// Create the object..
		objInfo, err := putObject(ctx, bucket, object, pReader, opts)
		if err != nil {
			return err
		}

		origETag := objInfo.ETag
		objInfo.ETag = getDecryptedETag(r.Header, objInfo, false)

		if dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(metadata, "", "", replication.ObjectReplicationType, opts)); dsc.ReplicateAny() {
			scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.ObjectReplicationType)
		}

		asize, err := objInfo.GetActualSize()
		if err != nil {
			asize = objInfo.Size
		}

		defer globalCacheConfig.Set(&cache.ObjectInfo{
			Key:          objInfo.Name,
			Bucket:       objInfo.Bucket,
			ETag:         objInfo.ETag,
			ModTime:      objInfo.ModTime,
			Expires:      objInfo.ExpiresStr(),
			CacheControl: objInfo.CacheControl,
			Size:         asize,
			Metadata:     cleanReservedKeys(objInfo.UserDefined),
		})

		// Notify object created event.
		evt := eventArgs{
			EventName:    event.ObjectCreatedPut,
			BucketName:   bucket,
			Object:       objInfo,
			ReqParams:    reqParams,
			RespElements: respElements,
			UserAgent:    r.UserAgent(),
			Host:         handlers.GetSourceIP(r),
		}
		sendEvent(evt)

		// Remove the transitioned object whose object version is being overwritten.
		if !globalTierConfigMgr.Empty() {
			objInfo.ETag = origETag
			// Schedule object for immediate transition if eligible.
			enqueueTransitionImmediate(objInfo, lcEventSrc_s3PutObject)
			os.Sweep()
		}

		return nil
	}

	var opts untarOptions
	opts.ignoreDirs = strings.EqualFold(r.Header.Get(xhttp.MinIOSnowballIgnoreDirs), "true")
	opts.ignoreErrs = strings.EqualFold(r.Header.Get(xhttp.MinIOSnowballIgnoreErrors), "true")
	opts.prefixAll = r.Header.Get(xhttp.MinIOSnowballPrefix)
	if opts.prefixAll != "" {
		opts.prefixAll = trimLeadingSlash(pathJoin(opts.prefixAll, slashSeparator))
	}

	if err = untar(ctx, hreader, putObjectTar, opts); err != nil {
		apiErr := errorCodes.ToAPIErr(s3Err)
		// If not set, convert or use BadRequest
		if s3Err == ErrNone {
			apiErr = toAPIError(ctx, err)
			if apiErr.Code == "InternalError" {
				// Convert generic internal errors to bad requests.
				apiErr = APIError{
					Code:           "BadRequest",
					Description:    err.Error(),
					HTTPStatusCode: http.StatusBadRequest,
				}
			}
		}
		writeErrorResponse(ctx, w, apiErr, r.URL)
		return
	}

	w.Header()[xhttp.ETag] = []string{`"` + hex.EncodeToString(hreader.MD5Current()) + `"`}
	hash.TransferChecksumHeader(w, r)
	writeSuccessResponseHeadersOnly(w)
}

// Delete objectAPIHandlers

// DeleteObjectHandler - delete an object
func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
	ctx := newContext(r, w, "DeleteObject")

	defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))

	vars := mux.Vars(r)
	bucket := vars["bucket"]
	object, err := unescapePath(vars["object"])
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	objectAPI := api.ObjectAPI()
	if objectAPI == nil {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
		return
	}

	if s3Error := checkRequestAuthType(ctx, r, policy.DeleteObjectAction, bucket, object); s3Error != ErrNone {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
		return
	}
	if _, ok := r.Header[xhttp.MinIOSourceReplicationCheck]; ok {
		// requests to just validate replication settings and permissions are not allowed to delete data
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrReplicationPermissionCheckError), r.URL)
		return
	}

	replica := r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String()
	if replica {
		if s3Error := checkRequestAuthType(ctx, r, policy.ReplicateDeleteAction, bucket, object); s3Error != ErrNone {
			writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
			return
		}
	}

	if globalDNSConfig != nil {
		_, err := globalDNSConfig.Get(bucket)
		if err != nil && err != dns.ErrNotImplemented {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}
	}

	opts, err := delOpts(ctx, r, bucket, object)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	rcfg, _ := globalBucketObjectLockSys.Get(bucket)
	if rcfg.LockEnabled && opts.DeletePrefix {
		apiErr := toAPIError(ctx, errInvalidArgument)
		apiErr.Description = "force-delete is forbidden on Object Locking enabled buckets"
		writeErrorResponse(ctx, w, apiErr, r.URL)
		return
	}

	os := newObjSweeper(bucket, object).WithVersion(opts.VersionID).WithVersioning(opts.Versioned, opts.VersionSuspended)

	opts.SetEvalMetadataFn(func(oi *ObjectInfo, gerr error) (dsc ReplicateDecision, err error) {
		if replica { // no need to check replication on receiver
			return dsc, nil
		}
		dsc = checkReplicateDelete(ctx, bucket, ObjectToDelete{
			ObjectV: ObjectV{
				ObjectName: object,
				VersionID:  opts.VersionID,
			},
		}, *oi, opts, gerr)
		// Mutations of objects on versioning suspended buckets
		// affect its null version. Through opts below we select
		// the null version's remote object to delete if
		// transitioned.
		if gerr == nil {
			os.SetTransitionState(oi.TransitionedObject)
		}
		return dsc, nil
	})

	vID := opts.VersionID
	if replica {
		opts.SetReplicaStatus(replication.Replica)
		if opts.VersionPurgeStatus().Empty() {
			// opts.VersionID holds delete marker version ID to replicate and not yet present on disk
			vID = ""
		}
	}
	opts.SetEvalRetentionBypassFn(func(goi ObjectInfo, gerr error) (err error) {
		err = nil
		if vID != "" {
			err := enforceRetentionBypassForDelete(ctx, r, bucket, ObjectToDelete{
				ObjectV: ObjectV{
					ObjectName: object,
					VersionID:  vID,
				},
			}, goi, gerr)
			if err != nil && !isErrObjectNotFound(err) {
				return err
			}
		}
		return
	})

	deleteObject := objectAPI.DeleteObject

	// http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectDELETE.html
	objInfo, err := deleteObject(ctx, bucket, object, opts)
	if err != nil {
		if _, ok := err.(BucketNotFound); ok {
			// When bucket doesn't exist specially handle it.
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}
		if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
			// Send an event when the object is not found
			objInfo.Name = object
			objInfo.VersionID = opts.VersionID
			sendEvent(eventArgs{
				EventName:    event.ObjectRemovedNoOP,
				BucketName:   bucket,
				Object:       objInfo,
				ReqParams:    extractReqParams(r),
				RespElements: extractRespElements(w),
				UserAgent:    r.UserAgent(),
				Host:         handlers.GetSourceIP(r),
			})
			writeSuccessNoContent(w)
			return
		}
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	if objInfo.Name == "" {
		writeSuccessNoContent(w)
		return
	}

	defer globalCacheConfig.Delete(bucket, object)

	setPutObjHeaders(w, objInfo, true)
	writeSuccessNoContent(w)

	eventName := event.ObjectRemovedDelete
	if objInfo.DeleteMarker {
		eventName = event.ObjectRemovedDeleteMarkerCreated
	}

	// Notify object deleted event.
	sendEvent(eventArgs{
		EventName:    eventName,
		BucketName:   bucket,
		Object:       objInfo,
		ReqParams:    extractReqParams(r),
		RespElements: extractRespElements(w),
		UserAgent:    r.UserAgent(),
		Host:         handlers.GetSourceIP(r),
	})

	if objInfo.ReplicationStatus == replication.Pending || objInfo.VersionPurgeStatus == Pending {
		dmVersionID := ""
		versionID := ""
		if objInfo.DeleteMarker {
			dmVersionID = objInfo.VersionID
		} else {
			versionID = objInfo.VersionID
		}
		dobj := DeletedObjectReplicationInfo{
			DeletedObject: DeletedObject{
				ObjectName:            object,
				VersionID:             versionID,
				DeleteMarkerVersionID: dmVersionID,
				DeleteMarkerMTime:     DeleteMarkerMTime{objInfo.ModTime},
				DeleteMarker:          objInfo.DeleteMarker,
				ReplicationState:      objInfo.ReplicationState(),
			},
			Bucket:    bucket,
			EventType: ReplicateIncomingDelete,
		}
		scheduleReplicationDelete(ctx, dobj, objectAPI)
	}

	// Remove the transitioned object whose object version is being overwritten.
	if !globalTierConfigMgr.Empty() {
		os.Sweep()
	}
}

// PutObjectLegalHoldHandler - set legal hold configuration to object,
func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r *http.Request) {
	ctx := newContext(r, w, "PutObjectLegalHold")

	defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))

	vars := mux.Vars(r)
	bucket := vars["bucket"]
	object, err := unescapePath(vars["object"])
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	objectAPI := api.ObjectAPI()
	if objectAPI == nil {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
		return
	}

	// Check permissions to perform this legal hold operation
	if s3Err := checkRequestAuthType(ctx, r, policy.PutObjectLegalHoldAction, bucket, object); s3Err != ErrNone {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
		return
	}

	if _, err := objectAPI.GetBucketInfo(ctx, bucket, BucketOptions{}); err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}
	if !hasContentMD5(r.Header) {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentMD5), r.URL)
		return
	}

	if rcfg, _ := globalBucketObjectLockSys.Get(bucket); !rcfg.LockEnabled {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidBucketObjectLockConfiguration), r.URL)
		return
	}

	legalHold, err := objectlock.ParseObjectLegalHold(io.LimitReader(r.Body, r.ContentLength))
	if err != nil {
		apiErr := errorCodes.ToAPIErr(ErrMalformedXML)
		apiErr.Description = err.Error()
		writeErrorResponse(ctx, w, apiErr, r.URL)
		return
	}

	opts, err := getOpts(ctx, r, bucket, object)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	popts := ObjectOptions{
		MTime:     opts.MTime,
		VersionID: opts.VersionID,
		EvalMetadataFn: func(oi *ObjectInfo, gerr error) (ReplicateDecision, error) {
			oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = strings.ToUpper(string(legalHold.Status))
			oi.UserDefined[ReservedMetadataPrefixLower+ObjectLockLegalHoldTimestamp] = UTCNow().Format(time.RFC3339Nano)

			dsc := mustReplicate(ctx, bucket, object, oi.getMustReplicateOptions(replication.MetadataReplicationType, opts))
			if dsc.ReplicateAny() {
				oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
				oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
			}
			return dsc, nil
		},
	}

	objInfo, err := objectAPI.PutObjectMetadata(ctx, bucket, object, popts)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	dsc := mustReplicate(ctx, bucket, object, objInfo.getMustReplicateOptions(replication.MetadataReplicationType, opts))
	if dsc.ReplicateAny() {
		scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.MetadataReplicationType)
	}

	writeSuccessResponseHeadersOnly(w)

	// Notify object event.
	sendEvent(eventArgs{
		EventName:    event.ObjectCreatedPutLegalHold,
		BucketName:   bucket,
		Object:       objInfo,
		ReqParams:    extractReqParams(r),
		RespElements: extractRespElements(w),
		UserAgent:    r.UserAgent(),
		Host:         handlers.GetSourceIP(r),
	})
}

// GetObjectLegalHoldHandler - get legal hold configuration to object,
func (api objectAPIHandlers) GetObjectLegalHoldHandler(w http.ResponseWriter, r *http.Request) {
	ctx := newContext(r, w, "GetObjectLegalHold")

	defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))

	vars := mux.Vars(r)
	bucket := vars["bucket"]
	object, err := unescapePath(vars["object"])
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	objectAPI := api.ObjectAPI()
	if objectAPI == nil {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
		return
	}
	if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object); s3Error != ErrNone {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
		return
	}

	getObjectInfo := objectAPI.GetObjectInfo

	if rcfg, _ := globalBucketObjectLockSys.Get(bucket); !rcfg.LockEnabled {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidBucketObjectLockConfiguration), r.URL)
		return
	}

	opts, err := getOpts(ctx, r, bucket, object)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	objInfo, err := getObjectInfo(ctx, bucket, object, opts)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	legalHold := objectlock.GetObjectLegalHoldMeta(objInfo.UserDefined)
	if legalHold.IsEmpty() {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNoSuchObjectLockConfiguration), r.URL)
		return
	}

	writeSuccessResponseXML(w, encodeResponse(legalHold))
	// Notify object legal hold accessed via a GET request.
	sendEvent(eventArgs{
		EventName:    event.ObjectAccessedGetLegalHold,
		BucketName:   bucket,
		Object:       objInfo,
		ReqParams:    extractReqParams(r),
		RespElements: extractRespElements(w),
		UserAgent:    r.UserAgent(),
		Host:         handlers.GetSourceIP(r),
	})
}

// PutObjectRetentionHandler - set object hold configuration to object,
func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r *http.Request) {
	ctx := newContext(r, w, "PutObjectRetention")

	defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))

	vars := mux.Vars(r)
	bucket := vars["bucket"]
	object, err := unescapePath(vars["object"])
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	objectAPI := api.ObjectAPI()
	if objectAPI == nil {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
		return
	}

	cred, owner, s3Err := validateSignature(getRequestAuthType(r), r)
	if s3Err != ErrNone {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL)
		return
	}

	if _, err := objectAPI.GetBucketInfo(ctx, bucket, BucketOptions{}); err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	if !hasContentMD5(r.Header) {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentMD5), r.URL)
		return
	}

	if rcfg, _ := globalBucketObjectLockSys.Get(bucket); !rcfg.LockEnabled {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidBucketObjectLockConfiguration), r.URL)
		return
	}

	objRetention, err := objectlock.ParseObjectRetention(r.Body)
	if err != nil {
		apiErr := errorCodes.ToAPIErr(ErrMalformedXML)
		apiErr.Description = err.Error()
		writeErrorResponse(ctx, w, apiErr, r.URL)
		return
	}

	opts, err := getOpts(ctx, r, bucket, object)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	popts := ObjectOptions{
		MTime:     opts.MTime,
		VersionID: opts.VersionID,
		EvalMetadataFn: func(oi *ObjectInfo, gerr error) (dsc ReplicateDecision, err error) {
			if err := enforceRetentionBypassForPut(ctx, r, *oi, objRetention, cred, owner); err != nil {
				return dsc, err
			}
			if objRetention.Mode.Valid() {
				oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = string(objRetention.Mode)
				oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = amztime.ISO8601Format(objRetention.RetainUntilDate.UTC())
			} else {
				oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = ""
				oi.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = ""
			}
			oi.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp] = UTCNow().Format(time.RFC3339Nano)
			dsc = mustReplicate(ctx, bucket, object, oi.getMustReplicateOptions(replication.MetadataReplicationType, opts))
			if dsc.ReplicateAny() {
				oi.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
				oi.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
			}
			return dsc, nil
		},
	}

	objInfo, err := objectAPI.PutObjectMetadata(ctx, bucket, object, popts)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	dsc := mustReplicate(ctx, bucket, object, objInfo.getMustReplicateOptions(replication.MetadataReplicationType, opts))
	if dsc.ReplicateAny() {
		scheduleReplication(ctx, objInfo, objectAPI, dsc, replication.MetadataReplicationType)
	}

	writeSuccessResponseHeadersOnly(w)

	// Notify object  event.
	sendEvent(eventArgs{
		EventName:    event.ObjectCreatedPutRetention,
		BucketName:   bucket,
		Object:       objInfo,
		ReqParams:    extractReqParams(r),
		RespElements: extractRespElements(w),
		UserAgent:    r.UserAgent(),
		Host:         handlers.GetSourceIP(r),
	})
}

// GetObjectRetentionHandler - get object retention configuration of object,
func (api objectAPIHandlers) GetObjectRetentionHandler(w http.ResponseWriter, r *http.Request) {
	ctx := newContext(r, w, "GetObjectRetention")
	defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))

	vars := mux.Vars(r)
	bucket := vars["bucket"]
	object, err := unescapePath(vars["object"])
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	objectAPI := api.ObjectAPI()
	if objectAPI == nil {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
		return
	}
	if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object); s3Error != ErrNone {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
		return
	}

	getObjectInfo := objectAPI.GetObjectInfo

	if rcfg, _ := globalBucketObjectLockSys.Get(bucket); !rcfg.LockEnabled {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidBucketObjectLockConfiguration), r.URL)
		return
	}

	opts, err := getOpts(ctx, r, bucket, object)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	objInfo, err := getObjectInfo(ctx, bucket, object, opts)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	retention := objectlock.GetObjectRetentionMeta(objInfo.UserDefined)
	if !retention.Mode.Valid() {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNoSuchObjectLockConfiguration), r.URL)
		return
	}

	writeSuccessResponseXML(w, encodeResponse(retention))
	// Notify object retention accessed via a GET request.
	sendEvent(eventArgs{
		EventName:    event.ObjectAccessedGetRetention,
		BucketName:   bucket,
		Object:       objInfo,
		ReqParams:    extractReqParams(r),
		RespElements: extractRespElements(w),
		UserAgent:    r.UserAgent(),
		Host:         handlers.GetSourceIP(r),
	})
}

// ObjectTagSet key value tags
type ObjectTagSet struct {
	Tags []tags.Tag `xml:"Tag"`
}

type objectTagging struct {
	XMLName xml.Name      `xml:"Tagging"`
	TagSet  *ObjectTagSet `xml:"TagSet"`
}

// GetObjectTaggingHandler - GET object tagging
func (api objectAPIHandlers) GetObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
	ctx := newContext(r, w, "GetObjectTagging")
	defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))

	vars := mux.Vars(r)
	bucket := vars["bucket"]
	object, err := unescapePath(vars["object"])
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	objAPI := api.ObjectAPI()
	if objAPI == nil {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
		return
	}

	if s3Error := authenticateRequest(ctx, r, policy.GetObjectTaggingAction); s3Error != ErrNone {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
		return
	}

	opts, err := getOpts(ctx, r, bucket, object)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	ot, err := objAPI.GetObjectTags(ctx, bucket, object, opts)
	if err != nil {
		// if object/version is not found locally, but exists on peer site - proxy
		// the tagging request to peer site. The response to client will
		// return tags from peer site.
		if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
			proxytgts := getProxyTargets(ctx, bucket, object, opts)
			if !proxytgts.Empty() {
				globalReplicationStats.incProxy(bucket, getObjectTaggingAPI, false)
				// proxy to replication target if site replication is in place.
				tags, gerr := proxyGetTaggingToRepTarget(ctx, bucket, object, opts, proxytgts)
				if gerr.Err != nil || tags == nil {
					globalReplicationStats.incProxy(bucket, getObjectTaggingAPI, true)
					writeErrorResponse(ctx, w, toAPIError(ctx, gerr.Err), r.URL)
					return
				} // overlay tags from peer site.
				ot = tags
				w.Header()[xhttp.MinIOTaggingProxied] = []string{"true"} // indicate that the request was proxied.
			} else {
				writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
				return
			}
		} else {
			writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
			return
		}
	}

	// Set this such that authorization policies can be applied on the object tags.
	if tags := ot.String(); tags != "" {
		r.Header.Set(xhttp.AmzObjectTagging, tags)
	}

	if s3Error := authorizeRequest(ctx, r, policy.GetObjectTaggingAction); s3Error != ErrNone {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
		return
	}

	if opts.VersionID != "" && opts.VersionID != nullVersionID {
		w.Header()[xhttp.AmzVersionID] = []string{opts.VersionID}
	}

	otags := &objectTagging{
		TagSet: &ObjectTagSet{},
	}

	var list []tags.Tag
	for k, v := range ot.ToMap() {
		list = append(list, tags.Tag{
			Key:   k,
			Value: v,
		})
	}
	// Always return in sorted order for tags.
	sort.Slice(list, func(i, j int) bool {
		return list[i].Key < list[j].Key
	})
	otags.TagSet.Tags = list

	writeSuccessResponseXML(w, encodeResponse(otags))
}

// PutObjectTaggingHandler - PUT object tagging
func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
	ctx := newContext(r, w, "PutObjectTagging")
	defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))

	vars := mux.Vars(r)
	bucket := vars["bucket"]
	object, err := unescapePath(vars["object"])
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	objAPI := api.ObjectAPI()
	if objAPI == nil {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
		return
	}

	// Tags XML will not be bigger than 1MiB in size, fail if its bigger.
	tags, err := tags.ParseObjectXML(io.LimitReader(r.Body, 1<<20))
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	// Set this such that authorization policies can be applied on the object tags.
	r.Header.Set(xhttp.AmzObjectTagging, tags.String())

	// Allow putObjectTagging if policy action is set
	if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectTaggingAction, bucket, object); s3Error != ErrNone {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
		return
	}

	opts, err := getOpts(ctx, r, bucket, object)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	objInfo, err := objAPI.GetObjectInfo(ctx, bucket, object, opts)
	if err != nil {
		// if object is not found locally, but exists on peer site - proxy
		// the tagging request to peer site. The response to client will
		// be 200 with extra header indicating that the request was proxied.
		if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
			proxytgts := getProxyTargets(ctx, bucket, object, opts)
			if !proxytgts.Empty() {
				globalReplicationStats.incProxy(bucket, putObjectTaggingAPI, false)
				// proxy to replication target if site replication is in place.
				perr := proxyTaggingToRepTarget(ctx, bucket, object, tags, opts, proxytgts)
				if perr.Err != nil {
					globalReplicationStats.incProxy(bucket, putObjectTaggingAPI, true)
					writeErrorResponse(ctx, w, toAPIError(ctx, perr.Err), r.URL)
					return
				}
				w.Header()[xhttp.MinIOTaggingProxied] = []string{"true"}
				writeSuccessResponseHeadersOnly(w)
				// when tagging is proxied, the object version is not available to return
				// as header in the response, or ObjectInfo in the notification event.
				sendEvent(eventArgs{
					EventName:    event.ObjectCreatedPutTagging,
					BucketName:   bucket,
					ReqParams:    extractReqParams(r),
					RespElements: extractRespElements(w),
					UserAgent:    r.UserAgent(),
					Host:         handlers.GetSourceIP(r),
				})
				return
			}
		}
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}
	tagsStr := tags.String()

	dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(objInfo.UserDefined, tagsStr, objInfo.ReplicationStatus, replication.MetadataReplicationType, opts))
	if dsc.ReplicateAny() {
		opts.UserDefined = make(map[string]string)
		opts.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
		opts.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
		opts.UserDefined[ReservedMetadataPrefixLower+TaggingTimestamp] = UTCNow().Format(time.RFC3339Nano)
	}

	// Put object tags
	objInfo, err = objAPI.PutObjectTags(ctx, bucket, object, tagsStr, opts)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	if dsc.ReplicateAny() {
		scheduleReplication(ctx, objInfo, objAPI, dsc, replication.MetadataReplicationType)
	}

	if objInfo.VersionID != "" && objInfo.VersionID != nullVersionID {
		w.Header()[xhttp.AmzVersionID] = []string{objInfo.VersionID}
	}

	writeSuccessResponseHeadersOnly(w)

	sendEvent(eventArgs{
		EventName:    event.ObjectCreatedPutTagging,
		BucketName:   bucket,
		Object:       objInfo,
		ReqParams:    extractReqParams(r),
		RespElements: extractRespElements(w),
		UserAgent:    r.UserAgent(),
		Host:         handlers.GetSourceIP(r),
	})
}

// DeleteObjectTaggingHandler - DELETE object tagging
func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
	ctx := newContext(r, w, "DeleteObjectTagging")
	defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))

	objAPI := api.ObjectAPI()
	if objAPI == nil {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
		return
	}

	vars := mux.Vars(r)
	bucket := vars["bucket"]
	object, err := unescapePath(vars["object"])
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	opts, err := getOpts(ctx, r, bucket, object)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	oi, err := objAPI.GetObjectInfo(ctx, bucket, object, opts)
	if err != nil {
		// if object is not found locally, but exists on peer site - proxy
		// the tagging request to peer site. The response to client will
		// be 200 OK with extra header indicating that the request was proxied.
		if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
			proxytgts := getProxyTargets(ctx, bucket, object, opts)
			if !proxytgts.Empty() {
				globalReplicationStats.incProxy(bucket, removeObjectTaggingAPI, false)
				// proxy to replication target if active-active replication is in place.
				perr := proxyTaggingToRepTarget(ctx, bucket, object, nil, opts, proxytgts)
				if perr.Err != nil {
					globalReplicationStats.incProxy(bucket, removeObjectTaggingAPI, true)
					writeErrorResponse(ctx, w, toAPIError(ctx, perr.Err), r.URL)
					return
				}
				// when delete tagging is proxied, the object version/tags are not available to return
				// as header in the response, nor ObjectInfo in the notification event.
				w.Header()[xhttp.MinIOTaggingProxied] = []string{"true"}
				writeSuccessNoContent(w)
				sendEvent(eventArgs{
					EventName:    event.ObjectCreatedDeleteTagging,
					BucketName:   bucket,
					Object:       oi,
					ReqParams:    extractReqParams(r),
					RespElements: extractRespElements(w),
					UserAgent:    r.UserAgent(),
					Host:         handlers.GetSourceIP(r),
				})
				return
			}
		}
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	if userTags := oi.UserTags; userTags != "" {
		// Set this such that authorization policies can be applied on the object tags.
		r.Header.Set(xhttp.AmzObjectTagging, oi.UserTags)
	}

	// Allow deleteObjectTagging if policy action is set
	if s3Error := checkRequestAuthType(ctx, r, policy.DeleteObjectTaggingAction, bucket, object); s3Error != ErrNone {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
		return
	}

	dsc := mustReplicate(ctx, bucket, object, oi.getMustReplicateOptions(replication.MetadataReplicationType, opts))
	if dsc.ReplicateAny() {
		opts.UserDefined = make(map[string]string)
		opts.UserDefined[ReservedMetadataPrefixLower+ReplicationTimestamp] = UTCNow().Format(time.RFC3339Nano)
		opts.UserDefined[ReservedMetadataPrefixLower+ReplicationStatus] = dsc.PendingStatus()
	}

	oi, err = objAPI.DeleteObjectTags(ctx, bucket, object, opts)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	if dsc.ReplicateAny() {
		scheduleReplication(ctx, oi, objAPI, dsc, replication.MetadataReplicationType)
	}

	if oi.VersionID != "" && oi.VersionID != nullVersionID {
		w.Header()[xhttp.AmzVersionID] = []string{oi.VersionID}
	}
	writeSuccessNoContent(w)

	sendEvent(eventArgs{
		EventName:    event.ObjectCreatedDeleteTagging,
		BucketName:   bucket,
		Object:       oi,
		ReqParams:    extractReqParams(r),
		RespElements: extractRespElements(w),
		UserAgent:    r.UserAgent(),
		Host:         handlers.GetSourceIP(r),
	})
}

// RestoreObjectHandler - POST restore object handler.
// ----------
func (api objectAPIHandlers) PostRestoreObjectHandler(w http.ResponseWriter, r *http.Request) {
	ctx := newContext(r, w, "PostRestoreObject")
	defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
	vars := mux.Vars(r)
	bucket := vars["bucket"]
	object, err := unescapePath(vars["object"])
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	// Fetch object stat info.
	objectAPI := api.ObjectAPI()
	if objectAPI == nil {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
		return
	}

	getObjectInfo := objectAPI.GetObjectInfo

	// Check for auth type to return S3 compatible error.
	if s3Error := checkRequestAuthType(ctx, r, policy.RestoreObjectAction, bucket, object); s3Error != ErrNone {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
		return
	}

	if r.ContentLength <= 0 {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEmptyRequestBody), r.URL)
		return
	}
	opts, err := postRestoreOpts(ctx, r, bucket, object)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	objInfo, err := getObjectInfo(ctx, bucket, object, opts)
	if err != nil {
		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
		return
	}

	if objInfo.TransitionedObject.Status != lifecycle.TransitionComplete {
		writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidObjectState), r.URL)
		return
	}

	rreq, err := parseRestoreRequest(io.LimitReader(r.Body, r.ContentLength))
	if err != nil {
		apiErr := errorCodes.ToAPIErr(ErrMalformedXML)
		apiErr.Description = err.Error()
		writeErrorResponse(ctx, w, apiErr, r.URL)
		return
	}
	// validate the request
	if err := rreq.validate(ctx, objectAPI); err != nil {
		apiErr := errorCodes.ToAPIErr(ErrMalformedXML)
		apiErr.Description = err.Error()
		writeErrorResponse(ctx, w, apiErr, r.URL)
		return
	}
	statusCode := http.StatusOK
	alreadyRestored := false
	if err == nil {
		if objInfo.RestoreOngoing && rreq.Type != SelectRestoreRequest {
			writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrObjectRestoreAlreadyInProgress), r.URL)
			return
		}
		if !objInfo.RestoreOngoing && !objInfo.RestoreExpires.IsZero() {
			statusCode = http.StatusAccepted
			alreadyRestored = true
		}
	}
	// set or upgrade restore expiry
	restoreExpiry := lifecycle.ExpectedExpiryTime(time.Now().UTC(), rreq.Days)
	metadata := cloneMSS(objInfo.UserDefined)

	// update self with restore metadata
	if rreq.Type != SelectRestoreRequest {
		objInfo.metadataOnly = true // Perform only metadata updates.
		metadata[xhttp.AmzRestoreExpiryDays] = strconv.Itoa(rreq.Days)
		metadata[xhttp.AmzRestoreRequestDate] = time.Now().UTC().Format(http.TimeFormat)
		if alreadyRestored {
			metadata[xhttp.AmzRestore] = completedRestoreObj(restoreExpiry).String()
		} else {
			metadata[xhttp.AmzRestore] = ongoingRestoreObj().String()
		}
		objInfo.UserDefined = metadata
		if _, err := objectAPI.CopyObject(GlobalContext, bucket, object, bucket, object, objInfo, ObjectOptions{
			VersionID: objInfo.VersionID,
		}, ObjectOptions{
			VersionID: objInfo.VersionID,
		}); err != nil {
			writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidObjectState), r.URL)
			return
		}
		// for previously restored object, just update the restore expiry
		if alreadyRestored {
			return
		}
	}

	restoreObject := mustGetUUID()
	if rreq.OutputLocation.S3.BucketName != "" {
		w.Header()[xhttp.AmzRestoreOutputPath] = []string{pathJoin(rreq.OutputLocation.S3.BucketName, rreq.OutputLocation.S3.Prefix, restoreObject)}
	}
	w.WriteHeader(statusCode)
	// Notify object restore started via a POST request.
	sendEvent(eventArgs{
		EventName:  event.ObjectRestorePost,
		BucketName: bucket,
		Object:     objInfo,
		ReqParams:  extractReqParams(r),
		UserAgent:  r.UserAgent(),
		Host:       handlers.GetSourceIP(r),
	})
	// now process the restore in background
	go func() {
		rctx := GlobalContext
		if !rreq.SelectParameters.IsEmpty() {
			actualSize, err := objInfo.GetActualSize()
			if err != nil {
				writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
				return
			}

			objectRSC := s3select.NewObjectReadSeekCloser(
				func(offset int64) (io.ReadCloser, error) {
					rs := &HTTPRangeSpec{
						IsSuffixLength: false,
						Start:          offset,
						End:            -1,
					}
					return getTransitionedObjectReader(rctx, bucket, object, rs, r.Header,
						objInfo, ObjectOptions{VersionID: objInfo.VersionID})
				},
				actualSize,
			)
			defer objectRSC.Close()
			if err = rreq.SelectParameters.Open(objectRSC); err != nil {
				if serr, ok := err.(s3select.SelectError); ok {
					encodedErrorResponse := encodeResponse(APIErrorResponse{
						Code:       serr.ErrorCode(),
						Message:    serr.ErrorMessage(),
						BucketName: bucket,
						Key:        object,
						Resource:   r.URL.Path,
						RequestID:  w.Header().Get(xhttp.AmzRequestID),
						HostID:     globalDeploymentID(),
					})
					writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, mimeXML)
				} else {
					writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
				}
				return
			}
			nr := httptest.NewRecorder()
			rw := xhttp.NewResponseRecorder(nr)
			rw.LogErrBody = true
			rw.LogAllBody = true
			rreq.SelectParameters.Evaluate(rw)
			rreq.SelectParameters.Close()
			return
		}
		opts := ObjectOptions{
			Transition: TransitionOptions{
				RestoreRequest: rreq,
				RestoreExpiry:  restoreExpiry,
			},
			VersionID: objInfo.VersionID,
		}
		if err := objectAPI.RestoreTransitionedObject(rctx, bucket, object, opts); err != nil {
			logger.LogIf(ctx, fmt.Errorf("Unable to restore transitioned bucket/object %s/%s: %w", bucket, object, err))
			return
		}

		// Notify object restore completed via a POST request.
		sendEvent(eventArgs{
			EventName:  event.ObjectRestoreCompleted,
			BucketName: bucket,
			Object:     objInfo,
			ReqParams:  extractReqParams(r),
			UserAgent:  r.UserAgent(),
			Host:       handlers.GetSourceIP(r),
		})
	}()
}