fix: support object-remaining-retention-days policy condition (#9259)

This PR also tries to simplify the approach taken in
object-locking implementation by preferential treatment
given towards full validation.

This in-turn has fixed couple of bugs related to
how policy should have been honored when ByPassGovernance
is provided.

Simplifies code a bit, but also duplicates code intentionally
for clarity due to complex nature of object locking
implementation.
This commit is contained in:
Harshavardhana
2020-04-06 13:44:16 -07:00
committed by GitHub
parent b9b1bfefe7
commit 43a3778b45
13 changed files with 677 additions and 309 deletions

View File

@@ -588,11 +588,17 @@ func (web *webAPIHandlers) RemoveObject(r *http.Request, args *RemoveObjectArgs,
if objectAPI == nil {
return toJSONError(ctx, errServerNotInitialized)
}
listObjects := objectAPI.ListObjects
getObjectInfo := objectAPI.GetObjectInfo
if web.CacheAPI() != nil {
getObjectInfo = web.CacheAPI().GetObjectInfo
}
deleteObjects := objectAPI.DeleteObjects
if web.CacheAPI() != nil {
deleteObjects = web.CacheAPI().DeleteObjects
}
claims, owner, authErr := webRequestAuthenticate(r)
if authErr != nil {
if authErr == errNoAuthToken {
@@ -688,6 +694,17 @@ next:
}) {
govBypassPerms = ErrNone
}
if globalIAMSys.IsAllowed(iampolicy.Args{
AccountName: claims.AccessKey,
Action: iampolicy.GetBucketObjectLockConfigurationAction,
BucketName: args.BucketName,
ConditionValues: getConditionValues(r, "", claims.AccessKey, claims.Map()),
IsOwner: owner,
ObjectName: objectName,
Claims: claims.Map(),
}) {
govBypassPerms = ErrNone
}
}
if authErr == errNoAuthToken {
// Check if object is allowed to be deleted anonymously
@@ -711,12 +728,44 @@ next:
}) {
govBypassPerms = ErrNone
}
// Check if object is allowed to be deleted anonymously
if globalPolicySys.IsAllowed(policy.Args{
Action: policy.GetBucketObjectLockConfigurationAction,
BucketName: args.BucketName,
ConditionValues: getConditionValues(r, "", "", nil),
IsOwner: false,
ObjectName: objectName,
}) {
govBypassPerms = ErrNone
}
}
if _, err := enforceRetentionBypassForDelete(ctx, r, args.BucketName, objectName, getObjectInfo, govBypassPerms); err != ErrNone {
if govBypassPerms != ErrNone {
return toJSONError(ctx, errAccessDenied)
}
if err = deleteObject(ctx, objectAPI, web.CacheAPI(), args.BucketName, objectName, r); err != nil {
break next
apiErr := ErrNone
// Deny if global WORM is enabled
if globalWORMEnabled {
opts, err := getOpts(ctx, r, args.BucketName, objectName)
if err != nil {
apiErr = toAPIErrorCode(ctx, err)
} else {
if _, err := getObjectInfo(ctx, args.BucketName, objectName, opts); err == nil {
apiErr = ErrMethodNotAllowed
}
}
}
if _, ok := globalBucketObjectLockConfig.Get(args.BucketName); ok && (apiErr == ErrNone) {
apiErr = enforceRetentionBypassForDeleteWeb(ctx, r, args.BucketName, objectName, getObjectInfo)
if apiErr != ErrNone && apiErr != ErrNoSuchKey {
return toJSONError(ctx, errAccessDenied)
}
}
if apiErr == ErrNone {
if err = deleteObject(ctx, objectAPI, web.CacheAPI(), args.BucketName, objectName, r); err != nil {
break next
}
}
continue
}
@@ -746,23 +795,34 @@ next:
}
}
// For directories, list the contents recursively and remove.
marker := ""
// Allocate new results channel to receive ObjectInfo.
objInfoCh := make(chan ObjectInfo)
// Walk through all objects
if err = objectAPI.Walk(ctx, args.BucketName, objectName, objInfoCh); err != nil {
break next
}
for {
var lo ListObjectsInfo
lo, err = listObjects(ctx, args.BucketName, objectName, marker, "", maxObjectList)
if err != nil {
var objects []string
for obj := range objInfoCh {
if len(objects) == maxObjectList {
// Reached maximum delete requests, attempt a delete for now.
break
}
objects = append(objects, obj.Name)
}
// Nothing to do.
if len(objects) == 0 {
break next
}
marker = lo.NextMarker
for _, obj := range lo.Objects {
err = deleteObject(ctx, objectAPI, web.CacheAPI(), args.BucketName, obj.Name, r)
if err != nil {
break next
}
}
if !lo.IsTruncated {
break
// Deletes a list of objects.
_, err = deleteObjects(ctx, args.BucketName, objects)
if err != nil {
logger.LogIf(ctx, err)
break next
}
}
}
@@ -1097,27 +1157,30 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
// Ensure that metadata does not contain sensitive information
crypto.RemoveSensitiveEntries(metadata)
getObjectInfo := objectAPI.GetObjectInfo
if web.CacheAPI() != nil {
getObjectInfo = web.CacheAPI().GetObjectInfo
}
// enforce object retention rules
retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, getObjectInfo, retPerms, holdPerms)
if s3Err == ErrNone && retentionMode != "" {
opts.UserDefined[xhttp.AmzObjectLockMode] = string(retentionMode)
opts.UserDefined[xhttp.AmzObjectLockRetainUntilDate] = retentionDate.UTC().Format(time.RFC3339)
}
if s3Err == ErrNone && legalHold.Status != "" {
opts.UserDefined[xhttp.AmzObjectLockLegalHold] = string(legalHold.Status)
}
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
}
retentionRequested := objectlock.IsObjectLockRetentionRequested(r.Header)
legalHoldRequested := objectlock.IsObjectLockLegalHoldRequested(r.Header)
putObject := objectAPI.PutObject
getObjectInfo := objectAPI.GetObjectInfo
if web.CacheAPI() != nil {
putObject = web.CacheAPI().PutObject
getObjectInfo = web.CacheAPI().GetObjectInfo
}
if retentionRequested || legalHoldRequested {
// enforce object retention rules
retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, getObjectInfo, retPerms, holdPerms)
if s3Err == ErrNone && retentionMode != "" {
opts.UserDefined[xhttp.AmzObjectLockMode] = string(retentionMode)
opts.UserDefined[xhttp.AmzObjectLockRetainUntilDate] = retentionDate.UTC().Format(time.RFC3339)
}
if s3Err == ErrNone && legalHold.Status != "" {
opts.UserDefined[xhttp.AmzObjectLockLegalHold] = string(legalHold.Status)
}
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
}
}
objInfo, err := putObject(context.Background(), bucket, object, pReader, opts)
@@ -1462,13 +1525,12 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
writeWebErrorResponse(w, errInvalidBucketName)
return
}
getObjectNInfo := objectAPI.GetObjectNInfo
if web.CacheAPI() != nil {
getObjectNInfo = web.CacheAPI().GetObjectNInfo
}
listObjects := objectAPI.ListObjects
archive := zip.NewWriter(w)
defer archive.Close()
@@ -1541,29 +1603,24 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
// If not a directory, compress the file and write it to response.
err := zipit(pathJoin(args.Prefix, object))
if err != nil {
logger.LogIf(ctx, err)
return
}
continue
}
// For directories, list the contents recursively and write the objects as compressed
// date to the response writer.
marker := ""
for {
lo, err := listObjects(ctx, args.BucketName, pathJoin(args.Prefix, object), marker, "",
maxObjectList)
if err != nil {
return
}
marker = lo.NextMarker
for _, obj := range lo.Objects {
err = zipit(obj.Name)
if err != nil {
return
}
}
if !lo.IsTruncated {
break
objInfoCh := make(chan ObjectInfo)
// Walk through all objects
if err := objectAPI.Walk(ctx, args.BucketName, pathJoin(args.Prefix, object), objInfoCh); err != nil {
logger.LogIf(ctx, err)
continue
}
for obj := range objInfoCh {
if err := zipit(obj.Name); err != nil {
logger.LogIf(ctx, err)
continue
}
}
}