support tagging based policy conditions (#15763)

This commit is contained in:
Harshavardhana
2022-09-28 11:25:46 -07:00
committed by GitHub
parent 4f1ff9c4d9
commit 41b633f5ea
9 changed files with 189 additions and 117 deletions

View File

@@ -27,6 +27,7 @@ import (
jsoniter "github.com/json-iterator/go"
miniogopolicy "github.com/minio/minio-go/v7/pkg/policy"
"github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/internal/handlers"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
@@ -125,6 +126,20 @@ func getConditionValues(r *http.Request, lc string, username string, claims map[
cloneHeader := r.Header.Clone()
if userTags := cloneHeader.Get(xhttp.AmzObjectTagging); userTags != "" {
tag, _ := tags.ParseObjectTags(userTags)
if tag != nil {
tagMap := tag.ToMap()
keys := make([]string, 0, len(tagMap))
for k, v := range tagMap {
args[pathJoin("ExistingObjectTag", k)] = []string{v}
args[pathJoin("RequestObjectTag", k)] = []string{v}
keys = append(keys, k)
}
args["RequestObjectTagKeys"] = keys
}
}
for _, objLock := range []string{
xhttp.AmzObjectLockMode,
xhttp.AmzObjectLockLegalHold,
@@ -137,6 +152,9 @@ func getConditionValues(r *http.Request, lc string, username string, claims map[
}
for key, values := range cloneHeader {
if strings.EqualFold(key, xhttp.AmzObjectTagging) {
continue
}
if existingValues, found := args[key]; found {
args[key] = append(existingValues, values...)
} else {

View File

@@ -418,6 +418,7 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
return checkPreconditions(ctx, w, r, oi, opts)
}
var proxy proxyResult
gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock, opts)
if err != nil {
@@ -463,6 +464,7 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
w.Header()[xhttp.AmzVersionID] = []string{gr.ObjInfo.VersionID}
w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(gr.ObjInfo.DeleteMarker)}
}
QueueReplicationHeal(ctx, bucket, gr.ObjInfo)
}
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
@@ -472,29 +474,29 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
objInfo := gr.ObjInfo
// Automatically remove the object/version is an expiry lifecycle rule can be applied
if lc, err := globalLifecycleSys.Get(bucket); err == nil {
rcfg, _ := globalBucketObjectLockSys.Get(bucket)
action := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo, false)
var success bool
switch action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
success = applyExpiryRule(objInfo, false, action == lifecycle.DeleteVersionAction)
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
// Restored object delete would be still allowed to proceed as success
// since transition behavior is slightly different.
applyExpiryRule(objInfo, true, action == lifecycle.DeleteRestoredVersionAction)
}
if success {
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrNoSuchKey))
return
if !proxy.Proxy { // apply lifecycle rules only for local requests
// Automatically remove the object/version is an expiry lifecycle rule can be applied
if lc, err := globalLifecycleSys.Get(bucket); err == nil {
rcfg, _ := globalBucketObjectLockSys.Get(bucket)
action := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo, false)
var success bool
switch action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
success = applyExpiryRule(objInfo, false, action == lifecycle.DeleteVersionAction)
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
// Restored object delete would be still allowed to proceed as success
// since transition behavior is slightly different.
applyExpiryRule(objInfo, true, action == lifecycle.DeleteRestoredVersionAction)
}
if success {
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrNoSuchKey))
return
}
}
QueueReplicationHeal(ctx, bucket, gr.ObjInfo)
}
// Queue failed/pending replication automatically
if !proxy.Proxy {
QueueReplicationHeal(ctx, bucket, objInfo)
}
// filter object lock metadata if permission does not permit
getRetPerms := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object)
legalHoldPerms := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object)
@@ -632,6 +634,36 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
return
}
// Get request range.
var rs *HTTPRangeSpec
rangeHeader := r.Header.Get(xhttp.Range)
objInfo, err := getObjectInfo(ctx, bucket, object, opts)
var proxy proxyResult
if err != nil {
// proxy HEAD to replication target if active-active replication configured on bucket
proxytgts := getProxyTargets(ctx, bucket, object, opts)
if !proxytgts.Empty() {
if rangeHeader != "" {
rs, _ = parseRequestRangeSpec(rangeHeader)
}
var oi ObjectInfo
oi, proxy = proxyHeadToReplicationTarget(ctx, bucket, object, rs, opts, proxytgts)
if proxy.Proxy {
objInfo = oi
}
if proxy.Err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, proxy.Err), r.URL)
return
}
}
}
if err == nil && objInfo.UserTags != "" {
// Set this such that authorization policies can be applied on the object tags.
r.Header.Set(xhttp.AmzObjectTagging, objInfo.UserTags)
}
if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectAction, bucket, object); s3Error != ErrNone {
if getRequestAuthType(r) == authTypeAnonymous {
// As per "Permission" section in
@@ -653,7 +685,6 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
ConditionValues: getConditionValues(r, "", "", nil),
IsOwner: false,
}) {
_, err = getObjectInfo(ctx, bucket, object, opts)
if toAPIError(ctx, err).Code == "NoSuchKey" {
s3Error = ErrNoSuchKey
}
@@ -662,67 +693,46 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(s3Error))
return
}
// Get request range.
var rs *HTTPRangeSpec
rangeHeader := r.Header.Get(xhttp.Range)
objInfo, err := getObjectInfo(ctx, bucket, object, opts)
var proxy proxyResult
if err != nil {
var oi ObjectInfo
// proxy HEAD to replication target if active-active replication configured on bucket
proxytgts := getProxyTargets(ctx, bucket, object, opts)
if !proxytgts.Empty() {
if rangeHeader != "" {
rs, _ = parseRequestRangeSpec(rangeHeader)
if err != nil && !proxy.Proxy {
if globalBucketVersioningSys.PrefixEnabled(bucket, object) {
switch {
case !objInfo.VersionPurgeStatus.Empty():
w.Header()[xhttp.MinIODeleteReplicationStatus] = []string{string(objInfo.VersionPurgeStatus)}
case !objInfo.ReplicationStatus.Empty() && objInfo.DeleteMarker:
w.Header()[xhttp.MinIODeleteMarkerReplicationStatus] = []string{string(objInfo.ReplicationStatus)}
}
oi, proxy = proxyHeadToReplicationTarget(ctx, bucket, object, rs, opts, proxytgts)
if proxy.Proxy {
objInfo = oi
}
}
if !proxy.Proxy {
if globalBucketVersioningSys.PrefixEnabled(bucket, object) {
switch {
case !objInfo.VersionPurgeStatus.Empty():
w.Header()[xhttp.MinIODeleteReplicationStatus] = []string{string(objInfo.VersionPurgeStatus)}
case !objInfo.ReplicationStatus.Empty() && objInfo.DeleteMarker:
w.Header()[xhttp.MinIODeleteMarkerReplicationStatus] = []string{string(objInfo.ReplicationStatus)}
}
// Versioning enabled quite possibly object is deleted might be delete-marker
// if present set the headers, no idea why AWS S3 sets these headers.
if objInfo.VersionID != "" && objInfo.DeleteMarker {
w.Header()[xhttp.AmzVersionID] = []string{objInfo.VersionID}
w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(objInfo.DeleteMarker)}
}
// Versioning enabled quite possibly object is deleted might be delete-marker
// if present set the headers, no idea why AWS S3 sets these headers.
if objInfo.VersionID != "" && objInfo.DeleteMarker {
w.Header()[xhttp.AmzVersionID] = []string{objInfo.VersionID}
w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(objInfo.DeleteMarker)}
}
QueueReplicationHeal(ctx, bucket, objInfo)
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
return
}
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))
return
}
// Automatically remove the object/version is an expiry lifecycle rule can be applied
if lc, err := globalLifecycleSys.Get(bucket); err == nil {
rcfg, _ := globalBucketObjectLockSys.Get(bucket)
action := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo, false)
var success bool
switch action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
success = applyExpiryRule(objInfo, false, action == lifecycle.DeleteVersionAction)
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
// Restored object delete would be still allowed to proceed as success
// since transition behavior is slightly different.
applyExpiryRule(objInfo, true, action == lifecycle.DeleteRestoredVersionAction)
if !proxy.Proxy { // apply lifecycle rules only locally not for proxied requests
// Automatically remove the object/version is an expiry lifecycle rule can be applied
if lc, err := globalLifecycleSys.Get(bucket); err == nil {
rcfg, _ := globalBucketObjectLockSys.Get(bucket)
action := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo, false)
var success bool
switch action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
success = applyExpiryRule(objInfo, false, action == lifecycle.DeleteVersionAction)
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
// Restored object delete would be still allowed to proceed as success
// since transition behavior is slightly different.
applyExpiryRule(objInfo, true, action == lifecycle.DeleteRestoredVersionAction)
}
if success {
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrNoSuchKey))
return
}
}
if success {
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrNoSuchKey))
return
}
}
// Queue failed/pending replication automatically
if !proxy.Proxy {
QueueReplicationHeal(ctx, bucket, objInfo)
}
@@ -3197,15 +3207,19 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h
return
}
// Allow putObjectTagging if policy action is set
if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectTaggingAction, bucket, object); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
// Tags XML will not be bigger than 1MiB in size, fail if its bigger.
tags, err := tags.ParseObjectXML(io.LimitReader(r.Body, 1<<20))
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
tags, err := tags.ParseObjectXML(io.LimitReader(r.Body, r.ContentLength))
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
// Set this such that authorization policies can be applied on the object tags.
r.Header.Set(xhttp.AmzObjectTagging, tags.String())
// Allow putObjectTagging if policy action is set
if s3Error := checkRequestAuthType(ctx, r, policy.PutObjectTaggingAction, bucket, object); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
@@ -3283,12 +3297,6 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r
return
}
// Allow deleteObjectTagging if policy action is set
if s3Error := checkRequestAuthType(ctx, r, policy.DeleteObjectTaggingAction, bucket, object); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
@@ -3300,6 +3308,18 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
if userTags := oi.UserTags; userTags != "" {
// Set this such that authorization policies can be applied on the object tags.
r.Header.Set(xhttp.AmzObjectTagging, oi.UserTags)
}
// Allow deleteObjectTagging if policy action is set
if s3Error := checkRequestAuthType(ctx, r, policy.DeleteObjectTaggingAction, bucket, object); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
dsc := mustReplicate(ctx, bucket, object, getMustReplicateOptions(oi, replication.MetadataReplicationType, opts))
if dsc.ReplicateAny() {
opts.UserDefined = make(map[string]string)

View File

@@ -30,6 +30,7 @@ import (
"time"
"github.com/gorilla/mux"
"github.com/minio/minio-go/v7/pkg/tags"
sse "github.com/minio/minio/internal/bucket/encryption"
objectlock "github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/bucket/replication"
@@ -129,6 +130,20 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
return
}
if objTags := r.Header.Get(xhttp.AmzObjectTagging); objTags != "" {
if !objectAPI.IsTaggingSupported() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
if _, err := tags.ParseObjectTags(objTags); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
metadata[xhttp.AmzObjectTagging] = objTags
}
retPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectRetentionAction)
holdPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectLegalHoldAction)