Add ObjectTagging Support (#8754)

This PR adds support for AWS S3 ObjectTagging API as explained here
https://docs.aws.amazon.com/AmazonS3/latest/dev/object-tagging.html
This commit is contained in:
Nitish Tiwari
2020-01-20 22:15:59 +05:30
committed by kannappanr
parent dd93eee1e3
commit 61c17c8933
26 changed files with 887 additions and 87 deletions

View File

@@ -50,6 +50,7 @@ import (
"github.com/minio/minio/pkg/ioutil"
"github.com/minio/minio/pkg/policy"
"github.com/minio/minio/pkg/s3select"
"github.com/minio/minio/pkg/tagging"
sha256 "github.com/minio/sha256-simd"
"github.com/minio/sio"
)
@@ -604,13 +605,13 @@ func getCpObjMetadataFromHeader(ctx context.Context, r *http.Request, userMeta m
// if x-amz-metadata-directive says REPLACE then
// we extract metadata from the input headers.
if isMetadataReplace(r.Header) {
if isDirectiveReplace(r.Header.Get(xhttp.AmzMetadataDirective)) {
return extractMetadata(ctx, r)
}
// if x-amz-metadata-directive says COPY then we
// return the default metadata.
if isMetadataCopy(r.Header) {
if isDirectiveCopy(r.Header.Get(xhttp.AmzMetadataDirective)) {
return defaultMeta, nil
}
@@ -618,6 +619,24 @@ func getCpObjMetadataFromHeader(ctx context.Context, r *http.Request, userMeta m
return defaultMeta, nil
}
// Extract tags relevant for an CopyObject operation based on conditional
// header values specified in X-Amz-Tagging-Directive.
func getCpObjTagsFromHeader(ctx context.Context, r *http.Request, tags string) (string, error) {
// if x-amz-tagging-directive says REPLACE then
// we extract tags from the input headers.
if isDirectiveReplace(r.Header.Get(xhttp.AmzTagDirective)) {
if tags := r.Header.Get(xhttp.AmzObjectTagging); tags != "" {
return extractTags(ctx, tags)
}
// Copy is default behavior if x-amz-tagging-directive is set, but x-amz-tagging is
// is not set
return tags, nil
}
// Copy is default behavior if x-amz-tagging-directive is not set.
return tags, nil
}
// Returns a minio-go Client configured to access remote host described by destDNSRecord
// Applicable only in a federated deployment
var getRemoteInstanceClient = func(r *http.Request, host string) (*miniogo.Core, error) {
@@ -737,11 +756,17 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
}
// Check if metadata directive is valid.
if !isMetadataDirectiveValid(r.Header) {
if !isDirectiveValid(r.Header.Get(xhttp.AmzMetadataDirective)) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidMetadataDirective), r.URL, guessIsBrowserReq(r))
return
}
// check if tag directive is valid
if !isDirectiveValid(r.Header.Get(xhttp.AmzTagDirective)) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidTagDirective), r.URL, guessIsBrowserReq(r))
return
}
// This request header needs to be set prior to setting ObjectOptions
if globalAutoEncryption && !crypto.SSEC.IsRequested(r.Header) {
r.Header.Add(crypto.SSEHeader, crypto.SSEAlgorithmAES256)
@@ -795,7 +820,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
defer gr.Close()
srcInfo := gr.ObjInfo
/// maximum Upload size for object in a single CopyObject operation.
// maximum Upload size for object in a single CopyObject operation.
if isMaxObjectSize(srcInfo.Size) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL, guessIsBrowserReq(r))
return
@@ -968,6 +993,17 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
tags, err := getCpObjTagsFromHeader(ctx, r, srcInfo.UserTags)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if tags != "" {
srcInfo.UserDefined[xhttp.AmzObjectTagging] = tags
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
@@ -1002,12 +1038,13 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
// Ensure that metadata does not contain sensitive information
crypto.RemoveSensitiveEntries(srcInfo.UserDefined)
// Check if x-amz-metadata-directive was not set to REPLACE and source,
// desination are same objects. Apply this restriction also when
// Check if x-amz-metadata-directive or x-amz-tagging-directive was not set to REPLACE and source,
// destination are same objects. Apply this restriction also when
// metadataOnly is true indicating that we are not overwriting the object.
// if encryption is enabled we do not need explicit "REPLACE" metadata to
// be enabled as well - this is to allow for key-rotation.
if !isMetadataReplace(r.Header) && srcInfo.metadataOnly && !crypto.IsEncrypted(srcInfo.UserDefined) {
if !isDirectiveReplace(r.Header.Get(xhttp.AmzMetadataDirective)) && !isDirectiveReplace(r.Header.Get(xhttp.AmzTagDirective)) &&
srcInfo.metadataOnly && !crypto.IsEncrypted(srcInfo.UserDefined) {
// If x-amz-metadata-directive is not set to REPLACE then we need
// to error out if source and destination are same.
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopyDest), r.URL, guessIsBrowserReq(r))
@@ -1155,6 +1192,14 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
return
}
if tags := r.Header.Get(http.CanonicalHeaderKey(xhttp.AmzObjectTagging)); tags != "" {
metadata[xhttp.AmzObjectTagging], err = extractTags(ctx, tags)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
}
if rAuthType == authTypeStreamingSigned {
if contentEncoding, ok := metadata["content-encoding"]; ok {
contentEncoding = trimAwsChunkedContentEncoding(contentEncoding)
@@ -2774,3 +2819,103 @@ func (api objectAPIHandlers) GetObjectRetentionHandler(w http.ResponseWriter, r
Host: handlers.GetSourceIP(r),
})
}
// GetObjectTaggingHandler - GET object tagging
func (api objectAPIHandlers) GetObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "GetObjectTagging")
defer logger.AuditLog(w, r, "GetObjectTagging", mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
object := vars["object"]
objAPI := api.ObjectAPI()
if objAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return
}
// Allow getObjectTagging if policy action is set.
if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectTaggingAction, bucket, object); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return
}
// Get object tags
tags, err := objAPI.GetObjectTag(ctx, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
writeSuccessResponseXML(w, encodeResponse(tags))
}
// PutObjectTaggingHandler - PUT object tagging
func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "PutObjectTagging")
defer logger.AuditLog(w, r, "PutObjectTagging", mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
object := vars["object"]
objAPI := api.ObjectAPI()
if objAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return
}
// Allow putObjectTagging if policy action is set
if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectTaggingAction, bucket, object); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return
}
tagging, err := tagging.ParseTagging(io.LimitReader(r.Body, r.ContentLength))
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
// Put object tags
err = objAPI.PutObjectTag(ctx, bucket, object, tagging.String())
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
writeSuccessResponseHeadersOnly(w)
}
// DeleteObjectTaggingHandler - DELETE object tagging
func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "DeleteObjectTagging")
defer logger.AuditLog(w, r, "DeleteObjectTagging", mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
object := vars["object"]
objAPI := api.ObjectAPI()
if objAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return
}
// Allow deleteObjectTagging if policy action is set
if s3Error := checkRequestAuthType(ctx, r, policy.DeleteObjectTaggingAction, bucket, object); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return
}
// Delete object tags
err := objAPI.DeleteObjectTag(ctx, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
writeSuccessResponseHeadersOnly(w)
}