Add support for object locking with legal hold. (#8634)

This commit is contained in:
poornas
2020-01-16 15:41:56 -08:00
committed by kannappanr
parent ba758361b3
commit 60e60f68dd
21 changed files with 1559 additions and 517 deletions

View File

@@ -34,6 +34,7 @@ import (
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/hash"
"github.com/minio/minio/pkg/objectlock"
"github.com/minio/minio/pkg/policy"
)
@@ -1611,14 +1612,16 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
apiErr = ErrOperationTimedOut
case errDiskNotFound:
apiErr = ErrSlowDown
case errInvalidRetentionDate:
case objectlock.ErrInvalidRetentionDate:
apiErr = ErrInvalidRetentionDate
case errPastObjectLockRetainDate:
case objectlock.ErrPastObjectLockRetainDate:
apiErr = ErrPastObjectLockRetainDate
case errUnknownWORMModeDirective:
case objectlock.ErrUnknownWORMModeDirective:
apiErr = ErrUnknownWORMModeDirective
case errObjectLockInvalidHeaders:
case objectlock.ErrObjectLockInvalidHeaders:
apiErr = ErrObjectLockInvalidHeaders
case objectlock.ErrMalformedXML:
apiErr = ErrMalformedXML
}
// Compression errors

View File

@@ -111,20 +111,21 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool)
bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc(collectAPIStats("selectobjectcontent", httpTraceHdrs(api.SelectObjectContentHandler))).Queries("select", "").Queries("select-type", "2")
// GetObjectRetention
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(collectAPIStats("getobjectretention", httpTraceHdrs(api.GetObjectRetentionHandler))).Queries("retention", "")
// GetObjectLegalHold
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(collectAPIStats("getobjectlegalhold", httpTraceHdrs(api.GetObjectLegalHoldHandler))).Queries("legal-hold", "")
// GetObject
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(collectAPIStats("getobject", httpTraceHdrs(api.GetObjectHandler)))
// CopyObject
bucket.Methods(http.MethodPut).Path("/{object:.+}").HeadersRegexp(xhttp.AmzCopySource, ".*?(\\/|%2F).*?").HandlerFunc(collectAPIStats("copyobject", httpTraceAll(api.CopyObjectHandler)))
// PutObjectRetention
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(collectAPIStats("putobjectretention", httpTraceHdrs(api.PutObjectRetentionHandler))).Queries("retention", "")
// PutObjectLegalHold
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(collectAPIStats("putobjectlegalhold", httpTraceHdrs(api.PutObjectLegalHoldHandler))).Queries("legal-hold", "")
// PutObject
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(collectAPIStats("putobject", httpTraceHdrs(api.PutObjectHandler)))
// DeleteObject
bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(collectAPIStats("deleteobject", httpTraceAll(api.DeleteObjectHandler)))
// PutObjectLegalHold
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(collectAPIStats("putobjectlegalhold", httpTraceHdrs(api.PutObjectLegalHoldHandler))).Queries("legal-hold", "")
// GetObjectLegalHold
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(collectAPIStats("getobjectlegalhold", httpTraceHdrs(api.GetObjectLegalHoldHandler))).Queries("legal-hold", "")
/// Bucket operations
// GetBucketLocation

View File

@@ -40,6 +40,7 @@ import (
"github.com/minio/minio/pkg/handlers"
"github.com/minio/minio/pkg/hash"
iampolicy "github.com/minio/minio/pkg/iam/policy"
"github.com/minio/minio/pkg/objectlock"
"github.com/minio/minio/pkg/policy"
"github.com/minio/minio/pkg/sync/errgroup"
)
@@ -577,14 +578,14 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
return
}
if objectLockEnabled {
if objectLockEnabled && !globalIsGateway {
configFile := path.Join(bucketConfigPrefix, bucket, bucketObjectLockEnabledConfigFile)
if err = saveConfig(ctx, objectAPI, configFile, []byte(bucketObjectLockEnabledConfig)); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
globalBucketObjectLockConfig.Set(bucket, Retention{})
globalNotificationSys.PutBucketObjectLockConfig(ctx, bucket, Retention{})
globalBucketObjectLockConfig.Set(bucket, objectlock.Retention{})
globalNotificationSys.PutBucketObjectLockConfig(ctx, bucket, objectlock.Retention{})
}
// Make sure to add Location information here only for bucket
@@ -1005,7 +1006,7 @@ func (api objectAPIHandlers) PutBucketObjectLockConfigHandler(w http.ResponseWri
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return
}
config, err := parseObjectLockConfig(r.Body)
config, err := objectlock.ParseObjectLockConfig(r.Body)
if err != nil {
apiErr := errorCodes.ToAPIErr(ErrMalformedXML)
apiErr.Description = err.Error()
@@ -1099,7 +1100,7 @@ func (api objectAPIHandlers) GetBucketObjectLockConfigHandler(w http.ResponseWri
return
}
if configData, err = xml.Marshal(newObjectLockConfig()); err != nil {
if configData, err = xml.Marshal(objectlock.NewObjectLockConfig()); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}

View File

@@ -32,6 +32,7 @@ import (
"github.com/minio/minio/cmd/config/cache"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/color"
"github.com/minio/minio/pkg/objectlock"
"github.com/minio/minio/pkg/sync/errgroup"
"github.com/minio/minio/pkg/wildcard"
)
@@ -251,7 +252,13 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
c.cacheStats.incMiss()
return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
}
// skip cache for objects with locks
objRetention := objectlock.GetObjectRetentionMeta(objInfo.UserDefined)
legalHold := objectlock.GetObjectLegalHoldMeta(objInfo.UserDefined)
if objRetention.Mode != objectlock.Invalid || legalHold.Status != "" {
c.cacheStats.incMiss()
return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
}
if cacheErr == nil {
// if ETag matches for stale cache entry, serve from cache
if cacheReader.ObjInfo.ETag == objInfo.ETag {
@@ -596,8 +603,9 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *
}
// skip cache for objects with locks
objRetention := getObjectRetentionMeta(opts.UserDefined)
if objRetention.Mode == Governance || objRetention.Mode == Compliance {
objRetention := objectlock.GetObjectRetentionMeta(opts.UserDefined)
legalHold := objectlock.GetObjectLegalHoldMeta(opts.UserDefined)
if objRetention.Mode != objectlock.Invalid || legalHold.Status != "" {
dcache.Delete(ctx, bucket, object)
return putObjectFn(ctx, bucket, object, r, opts)
}

View File

@@ -37,6 +37,7 @@ import (
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/certs"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/objectlock"
"github.com/minio/minio/pkg/pubsub"
)
@@ -201,7 +202,7 @@ var (
// Is worm enabled
globalWORMEnabled bool
globalBucketObjectLockConfig = newBucketObjectLockConfig()
globalBucketObjectLockConfig = objectlock.NewBucketObjectLockConfig()
// Disk cache drives
globalCacheConfig cache.Config

View File

@@ -36,6 +36,7 @@ import (
"github.com/minio/minio/pkg/lifecycle"
"github.com/minio/minio/pkg/madmin"
xnet "github.com/minio/minio/pkg/net"
"github.com/minio/minio/pkg/objectlock"
"github.com/minio/minio/pkg/policy"
"github.com/minio/minio/pkg/sync/errgroup"
)
@@ -668,7 +669,7 @@ func (sys *NotificationSys) initBucketObjectLockConfig(objAPI ObjectLayer) error
if string(bucketObjLockData) != bucketObjectLockEnabledConfig {
// this should never happen
logger.LogIf(ctx, errMalformedBucketObjectConfig)
logger.LogIf(ctx, objectlock.ErrMalformedBucketObjectConfig)
continue
}
@@ -677,17 +678,17 @@ func (sys *NotificationSys) initBucketObjectLockConfig(objAPI ObjectLayer) error
if err != nil {
if err == errConfigNotFound {
globalBucketObjectLockConfig.Set(bucket.Name, Retention{})
globalBucketObjectLockConfig.Set(bucket.Name, objectlock.Retention{})
continue
}
return err
}
config, err := parseObjectLockConfig(bytes.NewReader(configData))
config, err := objectlock.ParseObjectLockConfig(bytes.NewReader(configData))
if err != nil {
return err
}
retention := Retention{}
retention := objectlock.Retention{}
if config.Rule != nil {
retention = config.ToRetention()
}
@@ -874,7 +875,7 @@ func (sys *NotificationSys) Send(args eventArgs) []event.TargetIDErr {
}
// PutBucketObjectLockConfig - put bucket object lock configuration to all peers.
func (sys *NotificationSys) PutBucketObjectLockConfig(ctx context.Context, bucketName string, retention Retention) {
func (sys *NotificationSys) PutBucketObjectLockConfig(ctx context.Context, bucketName string, retention objectlock.Retention) {
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {

View File

@@ -44,6 +44,8 @@ import (
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/handlers"
"github.com/minio/minio/pkg/hash"
"github.com/minio/minio/pkg/objectlock"
iampolicy "github.com/minio/minio/pkg/iam/policy"
"github.com/minio/minio/pkg/ioutil"
"github.com/minio/minio/pkg/policy"
@@ -209,8 +211,10 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
return
}
getRetPerms := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object)
legalHoldPerms := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object)
// filter object lock metadata if permission does not permit
objInfo.UserDefined = filterObjectLockMetadata(ctx, r, bucket, object, objInfo.UserDefined, false, getRetPerms)
objInfo.UserDefined = objectlock.FilterObjectLockMetadata(objInfo.UserDefined, getRetPerms != ErrNone, legalHoldPerms != ErrNone)
if err = s3Select.Open(getObject); err != nil {
if serr, ok := err.(s3select.SelectError); ok {
@@ -353,7 +357,10 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
// filter object lock metadata if permission does not permit
getRetPerms := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object)
objInfo.UserDefined = filterObjectLockMetadata(ctx, r, bucket, object, objInfo.UserDefined, false, getRetPerms)
legalHoldPerms := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object)
// filter object lock metadata if permission does not permit
objInfo.UserDefined = objectlock.FilterObjectLockMetadata(objInfo.UserDefined, getRetPerms != ErrNone, legalHoldPerms != ErrNone)
if objectAPI.IsEncryptionSupported() {
objInfo.UserDefined = CleanMinioInternalMetadataKeys(objInfo.UserDefined)
@@ -518,7 +525,10 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re
// filter object lock metadata if permission does not permit
getRetPerms := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, bucket, object)
objInfo.UserDefined = filterObjectLockMetadata(ctx, r, bucket, object, objInfo.UserDefined, false, getRetPerms)
legalHoldPerms := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object)
// filter object lock metadata if permission does not permit
objInfo.UserDefined = objectlock.FilterObjectLockMetadata(objInfo.UserDefined, getRetPerms != ErrNone, legalHoldPerms != ErrNone)
if objectAPI.IsEncryptionSupported() {
if _, err = DecryptObjectInfo(&objInfo, r.Header); err != nil {
@@ -962,16 +972,19 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
isCpy := true
getRetPerms := checkRequestAuthType(ctx, r, policy.GetObjectRetentionAction, srcBucket, srcObject)
srcInfo.UserDefined = filterObjectLockMetadata(ctx, r, srcBucket, srcObject, srcInfo.UserDefined, isCpy, getRetPerms)
srcInfo.UserDefined = objectlock.FilterObjectLockMetadata(srcInfo.UserDefined, true, true)
retPerms := isPutActionAllowed(getRequestAuthType(r), dstBucket, dstObject, r, iampolicy.PutObjectRetentionAction)
holdPerms := isPutActionAllowed(getRequestAuthType(r), dstBucket, dstObject, r, iampolicy.PutObjectLegalHoldAction)
// apply default bucket configuration/governance headers for dest side.
retentionMode, retentionDate, s3Err := checkPutObjectRetentionAllowed(ctx, r, dstBucket, dstObject, getObjectInfo, retPerms)
retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, dstBucket, dstObject, getObjectInfo, retPerms, holdPerms)
if s3Err == ErrNone && retentionMode != "" {
srcInfo.UserDefined[xhttp.AmzObjectLockMode] = string(retentionMode)
srcInfo.UserDefined[xhttp.AmzObjectLockRetainUntilDate] = retentionDate.UTC().Format(time.RFC3339)
}
if s3Err == ErrNone && legalHold.Status != "" {
srcInfo.UserDefined[xhttp.AmzObjectLockLegalHold] = string(legalHold.Status)
}
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
@@ -1251,11 +1264,16 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
putObject = api.CacheAPI().PutObject
}
retPerms := isPutActionAllowed(rAuthType, bucket, object, r, iampolicy.PutObjectRetentionAction)
retentionMode, retentionDate, s3Err := checkPutObjectRetentionAllowed(ctx, r, bucket, object, getObjectInfo, retPerms)
holdPerms := isPutActionAllowed(getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectLegalHoldAction)
retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, getObjectInfo, retPerms, holdPerms)
if s3Err == ErrNone && retentionMode != "" {
metadata[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode)
metadata[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = retentionDate.UTC().Format(time.RFC3339)
}
if s3Err == ErrNone && legalHold.Status != "" {
metadata[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = string(legalHold.Status)
}
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
@@ -1409,11 +1427,16 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
return
}
retPerms := isPutActionAllowed(getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectRetentionAction)
retentionMode, retentionDate, s3Err := checkPutObjectRetentionAllowed(ctx, r, bucket, object, objectAPI.GetObjectInfo, retPerms)
holdPerms := isPutActionAllowed(getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectLegalHoldAction)
retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, objectAPI.GetObjectInfo, retPerms, holdPerms)
if s3Err == ErrNone && retentionMode != "" {
metadata[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode)
metadata[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = retentionDate.UTC().Format(time.RFC3339)
}
if s3Err == ErrNone && legalHold.Status != "" {
metadata[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = string(legalHold.Status)
}
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
@@ -2215,14 +2238,16 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
}
// Reject retention or governance headers if set, CompleteMultipartUpload spec
// does not use these headers, and should not be passed down to checkPutObjectRetentionAllowed
if isObjectLockRequested(r.Header) || isObjectLockGovernanceBypassSet(r.Header) {
// does not use these headers, and should not be passed down to checkPutObjectLockAllowed
if objectlock.IsObjectLockRequested(r.Header) || objectlock.IsObjectLockGovernanceBypassSet(r.Header) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL, guessIsBrowserReq(r))
return
}
// Enforce object lock governance in case a competing upload finalized first.
retPerms := isPutActionAllowed(getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectRetentionAction)
if _, _, s3Err := checkPutObjectRetentionAllowed(ctx, r, bucket, object, objectAPI.GetObjectInfo, retPerms); s3Err != ErrNone {
holdPerms := isPutActionAllowed(getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectLegalHoldAction)
if _, _, _, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, objectAPI.GetObjectInfo, retPerms, holdPerms); s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
}
@@ -2467,23 +2492,81 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r
return
}
// Check permissions to perform this legal hold operation
if s3Err := isPutActionAllowed(getRequestAuthType(r), bucket, object, r, policy.PutObjectLegalHoldAction); s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
}
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
// Get Content-Md5 sent by client and verify if valid
md5Bytes, err := checkValidMD5(r.Header)
if err != nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidDigest), r.URL, guessIsBrowserReq(r))
return
}
if _, ok := globalBucketObjectLockConfig.Get(bucket); !ok {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidBucketObjectLockConfiguration), r.URL, guessIsBrowserReq(r))
return
}
legalHold, err := objectlock.ParseObjectLegalHold(r.Body)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
// verify Content-MD5 sum of request body if this header set
if len(md5Bytes) > 0 {
data, err := xml.Marshal(legalHold)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if hex.EncodeToString(md5Bytes) != getMD5Hash(data) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidDigest), r.URL, guessIsBrowserReq(r))
return
}
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if _, err = getObjectInfo(ctx, bucket, object, opts); err != nil {
objInfo, err := getObjectInfo(ctx, bucket, object, opts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = strings.ToUpper(string(legalHold.Status))
objInfo.metadataOnly = true
if _, err = objectAPI.CopyObject(ctx, bucket, object, bucket, object, objInfo, ObjectOptions{}, ObjectOptions{}); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r))
writeSuccessNoContent(w)
// Notify object event.
sendEvent(eventArgs{
EventName: event.ObjectCreatedPutLegalHold,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
}
// GetObjectLegalHoldHandler - get legal hold configuration to object,
@@ -2506,6 +2589,10 @@ func (api objectAPIHandlers) GetObjectLegalHoldHandler(w http.ResponseWriter, r
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return
}
if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectLegalHoldAction, bucket, object); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
@@ -2518,12 +2605,25 @@ func (api objectAPIHandlers) GetObjectLegalHoldHandler(w http.ResponseWriter, r
return
}
if _, err = getObjectInfo(ctx, bucket, object, opts); err != nil {
objInfo, err := getObjectInfo(ctx, bucket, object, opts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r))
legalHold := objectlock.GetObjectLegalHoldMeta(objInfo.UserDefined)
writeSuccessResponseXML(w, encodeResponse(legalHold))
// Notify object legal hold accessed via a GET request.
sendEvent(eventArgs{
EventName: event.ObjectAccessedGetLegalHold,
BucketName: bucket,
Object: objInfo,
ReqParams: extractReqParams(r),
RespElements: extractRespElements(w),
UserAgent: r.UserAgent(),
Host: handlers.GetSourceIP(r),
})
}
// PutObjectRetentionHandler - set object hold configuration to object,
@@ -2568,7 +2668,7 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r
return
}
objRetention, err := parseObjectRetention(r.Body)
objRetention, err := objectlock.ParseObjectRetention(r.Body)
if err != nil {
apiErr := errorCodes.ToAPIErr(ErrMalformedXML)
apiErr.Description = err.Error()
@@ -2660,7 +2760,7 @@ func (api objectAPIHandlers) GetObjectRetentionHandler(w http.ResponseWriter, r
return
}
retention := getObjectRetentionMeta(objInfo.UserDefined)
retention := objectlock.GetObjectRetentionMeta(objInfo.UserDefined)
writeSuccessResponseXML(w, encodeResponse(retention))
// Notify object retention accessed via a GET request.

View File

@@ -18,387 +18,12 @@ package cmd
import (
"context"
"encoding/xml"
"errors"
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/env"
"github.com/minio/minio/pkg/objectlock"
)
// RetentionMode - object retention mode.
type RetentionMode string
const (
// Governance - governance mode.
Governance RetentionMode = "GOVERNANCE"
// Compliance - compliance mode.
Compliance RetentionMode = "COMPLIANCE"
// Invalid - invalid retention mode.
Invalid RetentionMode = ""
)
func parseRetentionMode(modeStr string) (mode RetentionMode) {
switch strings.ToUpper(modeStr) {
case "GOVERNANCE":
mode = Governance
case "COMPLIANCE":
mode = Compliance
default:
mode = Invalid
}
return mode
}
var (
errMalformedBucketObjectConfig = errors.New("Invalid bucket object lock config")
errInvalidRetentionDate = errors.New("Date must be provided in ISO 8601 format")
errPastObjectLockRetainDate = errors.New("the retain until date must be in the future")
errUnknownWORMModeDirective = errors.New("unknown WORM mode directive")
errObjectLockMissingContentMD5 = errors.New("Content-MD5 HTTP header is required for Put Object requests with Object Lock parameters")
errObjectLockInvalidHeaders = errors.New("x-amz-object-lock-retain-until-date and x-amz-object-lock-mode must both be supplied")
)
const (
ntpServerEnv = "MINIO_NTP_SERVER"
)
var (
ntpServer = env.Get(ntpServerEnv, "")
)
// Retention - bucket level retention configuration.
type Retention struct {
Mode RetentionMode
Validity time.Duration
}
// IsEmpty - returns whether retention is empty or not.
func (r Retention) IsEmpty() bool {
return r.Mode == "" || r.Validity == 0
}
// Retain - check whether given date is retainable by validity time.
func (r Retention) Retain(created time.Time) bool {
t, err := UTCNowNTP()
if err != nil {
logger.LogIf(context.Background(), err)
// Retain
return true
}
return globalWORMEnabled || created.Add(r.Validity).After(t)
}
// BucketObjectLockConfig - map of bucket and retention configuration.
type BucketObjectLockConfig struct {
sync.RWMutex
retentionMap map[string]Retention
}
// Set - set retention configuration.
func (config *BucketObjectLockConfig) Set(bucketName string, retention Retention) {
config.Lock()
config.retentionMap[bucketName] = retention
config.Unlock()
}
// Get - Get retention configuration.
func (config *BucketObjectLockConfig) Get(bucketName string) (r Retention, ok bool) {
config.RLock()
defer config.RUnlock()
r, ok = config.retentionMap[bucketName]
return r, ok
}
// Remove - removes retention configuration.
func (config *BucketObjectLockConfig) Remove(bucketName string) {
config.Lock()
delete(config.retentionMap, bucketName)
config.Unlock()
}
func newBucketObjectLockConfig() *BucketObjectLockConfig {
return &BucketObjectLockConfig{
retentionMap: map[string]Retention{},
}
}
// DefaultRetention - default retention configuration.
type DefaultRetention struct {
XMLName xml.Name `xml:"DefaultRetention"`
Mode RetentionMode `xml:"Mode"`
Days *uint64 `xml:"Days"`
Years *uint64 `xml:"Years"`
}
// Maximum support retention days and years supported by AWS S3.
const (
// This tested by using `mc lock` command
maximumRetentionDays = 36500
maximumRetentionYears = 100
)
// UnmarshalXML - decodes XML data.
func (dr *DefaultRetention) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
// Make subtype to avoid recursive UnmarshalXML().
type defaultRetention DefaultRetention
retention := defaultRetention{}
if err := d.DecodeElement(&retention, &start); err != nil {
return err
}
switch string(retention.Mode) {
case "GOVERNANCE", "COMPLIANCE":
default:
return fmt.Errorf("unknown retention mode %v", retention.Mode)
}
if retention.Days == nil && retention.Years == nil {
return fmt.Errorf("either Days or Years must be specified")
}
if retention.Days != nil && retention.Years != nil {
return fmt.Errorf("either Days or Years must be specified, not both")
}
if retention.Days != nil {
if *retention.Days == 0 {
return fmt.Errorf("Default retention period must be a positive integer value for 'Days'")
}
if *retention.Days > maximumRetentionDays {
return fmt.Errorf("Default retention period too large for 'Days' %d", *retention.Days)
}
} else if *retention.Years == 0 {
return fmt.Errorf("Default retention period must be a positive integer value for 'Years'")
} else if *retention.Years > maximumRetentionYears {
return fmt.Errorf("Default retention period too large for 'Years' %d", *retention.Years)
}
*dr = DefaultRetention(retention)
return nil
}
// ObjectLockConfig - object lock configuration specified in
// https://docs.aws.amazon.com/AmazonS3/latest/API/Type_API_ObjectLockConfiguration.html
type ObjectLockConfig struct {
XMLNS string `xml:"xmlns,attr,omitempty"`
XMLName xml.Name `xml:"ObjectLockConfiguration"`
ObjectLockEnabled string `xml:"ObjectLockEnabled"`
Rule *struct {
DefaultRetention DefaultRetention `xml:"DefaultRetention"`
} `xml:"Rule,omitempty"`
}
// UnmarshalXML - decodes XML data.
func (config *ObjectLockConfig) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
// Make subtype to avoid recursive UnmarshalXML().
type objectLockConfig ObjectLockConfig
parsedConfig := objectLockConfig{}
if err := d.DecodeElement(&parsedConfig, &start); err != nil {
return err
}
if parsedConfig.ObjectLockEnabled != "Enabled" {
return fmt.Errorf("only 'Enabled' value is allowd to ObjectLockEnabled element")
}
*config = ObjectLockConfig(parsedConfig)
return nil
}
// ToRetention - convert to Retention type.
func (config *ObjectLockConfig) ToRetention() (r Retention) {
if config.Rule != nil {
r.Mode = config.Rule.DefaultRetention.Mode
t, err := UTCNowNTP()
if err != nil {
logger.LogIf(context.Background(), err)
// Do not change any configuration
// upon NTP failure.
return r
}
if config.Rule.DefaultRetention.Days != nil {
r.Validity = t.AddDate(0, 0, int(*config.Rule.DefaultRetention.Days)).Sub(t)
} else {
r.Validity = t.AddDate(int(*config.Rule.DefaultRetention.Years), 0, 0).Sub(t)
}
}
return r
}
func parseObjectLockConfig(reader io.Reader) (*ObjectLockConfig, error) {
config := ObjectLockConfig{}
if err := xml.NewDecoder(reader).Decode(&config); err != nil {
return nil, err
}
return &config, nil
}
func newObjectLockConfig() *ObjectLockConfig {
return &ObjectLockConfig{
ObjectLockEnabled: "Enabled",
}
}
// RetentionDate is a embedded type containing time.Time to unmarshal
// Date in Retention
type RetentionDate struct {
time.Time
}
// UnmarshalXML parses date from Expiration and validates date format
func (rDate *RetentionDate) UnmarshalXML(d *xml.Decoder, startElement xml.StartElement) error {
var dateStr string
err := d.DecodeElement(&dateStr, &startElement)
if err != nil {
return err
}
// While AWS documentation mentions that the date specified
// must be present in ISO 8601 format, in reality they allow
// users to provide RFC 3339 compliant dates.
retDate, err := time.Parse(time.RFC3339, dateStr)
if err != nil {
return errInvalidRetentionDate
}
*rDate = RetentionDate{retDate}
return nil
}
// MarshalXML encodes expiration date if it is non-zero and encodes
// empty string otherwise
func (rDate *RetentionDate) MarshalXML(e *xml.Encoder, startElement xml.StartElement) error {
if *rDate == (RetentionDate{time.Time{}}) {
return nil
}
return e.EncodeElement(rDate.Format(time.RFC3339), startElement)
}
// ObjectRetention specified in
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectRetention.html
type ObjectRetention struct {
XMLNS string `xml:"xmlns,attr,omitempty"`
XMLName xml.Name `xml:"Retention"`
Mode RetentionMode `xml:"Mode,omitempty"`
RetainUntilDate RetentionDate `xml:"RetainUntilDate,omitempty"`
}
func parseObjectRetention(reader io.Reader) (*ObjectRetention, error) {
ret := ObjectRetention{}
if err := xml.NewDecoder(reader).Decode(&ret); err != nil {
return nil, err
}
if ret.Mode != Compliance && ret.Mode != Governance {
return &ret, errUnknownWORMModeDirective
}
t, err := UTCNowNTP()
if err != nil {
logger.LogIf(context.Background(), err)
return &ret, errPastObjectLockRetainDate
}
if ret.RetainUntilDate.Before(t) {
return &ret, errPastObjectLockRetainDate
}
return &ret, nil
}
func isObjectLockRetentionRequested(h http.Header) bool {
if _, ok := h[xhttp.AmzObjectLockMode]; ok {
return true
}
if _, ok := h[xhttp.AmzObjectLockRetainUntilDate]; ok {
return true
}
return false
}
func isObjectLockLegalHoldRequested(h http.Header) bool {
_, ok := h[xhttp.AmzObjectLockLegalHold]
return ok
}
func isObjectLockGovernanceBypassSet(h http.Header) bool {
v, ok := h[xhttp.AmzObjectLockBypassGovernance]
if !ok {
return false
}
val := strings.Join(v, "")
return strings.ToLower(val) == "true"
}
func isObjectLockRequested(h http.Header) bool {
return isObjectLockLegalHoldRequested(h) || isObjectLockRetentionRequested(h)
}
func parseObjectLockRetentionHeaders(h http.Header) (rmode RetentionMode, r RetentionDate, err error) {
retMode, ok := h[xhttp.AmzObjectLockMode]
if ok {
rmode = parseRetentionMode(strings.Join(retMode, ""))
if rmode == Invalid {
return rmode, r, errUnknownWORMModeDirective
}
}
var retDate time.Time
dateStr, ok := h[xhttp.AmzObjectLockRetainUntilDate]
if ok {
// While AWS documentation mentions that the date specified
// must be present in ISO 8601 format, in reality they allow
// users to provide RFC 3339 compliant dates.
retDate, err = time.Parse(time.RFC3339, strings.Join(dateStr, ""))
if err != nil {
return rmode, r, errInvalidRetentionDate
}
t, err := UTCNowNTP()
if err != nil {
logger.LogIf(context.Background(), err)
return rmode, r, errPastObjectLockRetainDate
}
if retDate.Before(t) {
return rmode, r, errPastObjectLockRetainDate
}
}
if len(retMode) == 0 || len(dateStr) == 0 {
return rmode, r, errObjectLockInvalidHeaders
}
return rmode, RetentionDate{retDate}, nil
}
func getObjectRetentionMeta(meta map[string]string) ObjectRetention {
var mode RetentionMode
var retainTill RetentionDate
if modeStr, ok := meta[strings.ToLower(xhttp.AmzObjectLockMode)]; ok {
mode = parseRetentionMode(modeStr)
}
if tillStr, ok := meta[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)]; ok {
if t, e := time.Parse(time.RFC3339, tillStr); e == nil {
retainTill = RetentionDate{t.UTC()}
}
}
return ObjectRetention{Mode: mode, RetainUntilDate: retainTill}
}
// enforceRetentionBypassForDelete enforces whether an existing object under governance can be deleted
// with governance bypass headers set in the request.
// Objects under site wide WORM can never be overwritten.
@@ -424,16 +49,19 @@ func enforceRetentionBypassForDelete(ctx context.Context, r *http.Request, bucke
}
return oi, toAPIErrorCode(ctx, err)
}
ret := getObjectRetentionMeta(oi.UserDefined)
ret := objectlock.GetObjectRetentionMeta(oi.UserDefined)
lhold := objectlock.GetObjectLegalHoldMeta(oi.UserDefined)
if lhold.Status == objectlock.ON {
return oi, ErrObjectLocked
}
// Here bucket does not support object lock
if ret.Mode == Invalid {
if ret.Mode == objectlock.Invalid {
return oi, ErrNone
}
if ret.Mode != Compliance && ret.Mode != Governance {
if ret.Mode != objectlock.Compliance && ret.Mode != objectlock.Governance {
return oi, ErrUnknownWORMModeDirective
}
t, err := UTCNowNTP()
t, err := objectlock.UTCNowNTP()
if err != nil {
logger.LogIf(ctx, err)
return oi, ErrObjectLocked
@@ -441,7 +69,7 @@ func enforceRetentionBypassForDelete(ctx context.Context, r *http.Request, bucke
if ret.RetainUntilDate.Before(t) {
return oi, ErrNone
}
if isObjectLockGovernanceBypassSet(r.Header) && ret.Mode == Governance && govBypassPerm == ErrNone {
if objectlock.IsObjectLockGovernanceBypassSet(r.Header) && ret.Mode == objectlock.Governance && govBypassPerm == ErrNone {
return oi, ErrNone
}
return oi, ErrObjectLocked
@@ -453,7 +81,7 @@ func enforceRetentionBypassForDelete(ctx context.Context, r *http.Request, bucke
// For objects in "Governance" mode, overwrite is allowed if a) object retention date is past OR
// governance bypass headers are set and user has governance bypass permissions.
// Objects in compliance mode can be overwritten only if retention date is being extended. No mode change is permitted.
func enforceRetentionBypassForPut(ctx context.Context, r *http.Request, bucket, object string, getObjectInfoFn GetObjectInfoFn, govBypassPerm APIErrorCode, objRetention *ObjectRetention) (oi ObjectInfo, s3Err APIErrorCode) {
func enforceRetentionBypassForPut(ctx context.Context, r *http.Request, bucket, object string, getObjectInfoFn GetObjectInfoFn, govBypassPerm APIErrorCode, objRetention *objectlock.ObjectRetention) (oi ObjectInfo, s3Err APIErrorCode) {
if globalWORMEnabled {
return oi, ErrObjectLocked
}
@@ -474,24 +102,24 @@ func enforceRetentionBypassForPut(ctx context.Context, r *http.Request, bucket,
return oi, toAPIErrorCode(ctx, err)
}
ret := getObjectRetentionMeta(oi.UserDefined)
ret := objectlock.GetObjectRetentionMeta(oi.UserDefined)
// no retention metadata on object
if ret.Mode == Invalid {
if ret.Mode == objectlock.Invalid {
if _, isWORMBucket := globalBucketObjectLockConfig.Get(bucket); !isWORMBucket {
return oi, ErrInvalidBucketObjectLockConfiguration
}
return oi, ErrNone
}
t, err := UTCNowNTP()
t, err := objectlock.UTCNowNTP()
if err != nil {
logger.LogIf(ctx, err)
return oi, ErrObjectLocked
}
if ret.Mode == Compliance {
if ret.Mode == objectlock.Compliance {
// Compliance retention mode cannot be changed and retention period cannot be shortened as per
// https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lock-overview.html#object-lock-retention-modes
if objRetention.Mode != Compliance || objRetention.RetainUntilDate.Before(ret.RetainUntilDate.Time) {
if objRetention.Mode != objectlock.Compliance || objRetention.RetainUntilDate.Before(ret.RetainUntilDate.Time) {
return oi, ErrObjectLocked
}
if objRetention.RetainUntilDate.Before(t) {
@@ -500,8 +128,8 @@ func enforceRetentionBypassForPut(ctx context.Context, r *http.Request, bucket,
return oi, ErrNone
}
if ret.Mode == Governance {
if !isObjectLockGovernanceBypassSet(r.Header) {
if ret.Mode == objectlock.Governance {
if !objectlock.IsObjectLockGovernanceBypassSet(r.Header) {
if objRetention.RetainUntilDate.Before(t) {
return oi, ErrInvalidRetentionDate
}
@@ -515,111 +143,104 @@ func enforceRetentionBypassForPut(ctx context.Context, r *http.Request, bucket,
return oi, ErrNone
}
// checkPutObjectRetentionAllowed enforces object retention policy for requests with WORM headers
// checkPutObjectLockAllowed enforces object retention policy and legal hold policy
// for requests with WORM headers
// See https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lock-managing.html for the spec.
// For non-existing objects with object retention headers set, this method returns ErrNone if bucket has
// locking enabled and user has requisite permissions (s3:PutObjectRetention)
// If object exists on object store and site wide WORM enabled - this method
// returns an error. For objects in "Governance" mode, overwrite is allowed if the retention date has expired.
// For objects in "Compliance" mode, retention date cannot be shortened, and mode cannot be altered.
func checkPutObjectRetentionAllowed(ctx context.Context, r *http.Request, bucket, object string, getObjectInfoFn GetObjectInfoFn, retentionPermErr APIErrorCode) (RetentionMode, RetentionDate, APIErrorCode) {
var mode RetentionMode
var retainDate RetentionDate
// For objects with legal hold header set, the s3:PutObjectLegalHold permission is expected to be set
// Both legal hold and retention can be applied independently on an object
func checkPutObjectLockAllowed(ctx context.Context, r *http.Request, bucket, object string, getObjectInfoFn GetObjectInfoFn, retentionPermErr, legalHoldPermErr APIErrorCode) (objectlock.Mode, objectlock.RetentionDate, objectlock.ObjectLegalHold, APIErrorCode) {
var mode objectlock.Mode
var retainDate objectlock.RetentionDate
var legalHold objectlock.ObjectLegalHold
retention, isWORMBucket := globalBucketObjectLockConfig.Get(bucket)
retentionRequested := isObjectLockRequested(r.Header)
retentionRequested := objectlock.IsObjectLockRetentionRequested(r.Header)
legalHoldRequested := objectlock.IsObjectLockLegalHoldRequested(r.Header)
var objExists bool
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
return mode, retainDate, toAPIErrorCode(ctx, err)
return mode, retainDate, legalHold, toAPIErrorCode(ctx, err)
}
if objInfo, err := getObjectInfoFn(ctx, bucket, object, opts); err == nil {
objExists = true
r := getObjectRetentionMeta(objInfo.UserDefined)
if globalWORMEnabled || r.Mode == Compliance {
return mode, retainDate, ErrObjectLocked
r := objectlock.GetObjectRetentionMeta(objInfo.UserDefined)
if globalWORMEnabled || r.Mode == objectlock.Compliance {
return mode, retainDate, legalHold, ErrObjectLocked
}
mode = r.Mode
retainDate = r.RetainUntilDate
legalHold = objectlock.GetObjectLegalHoldMeta(objInfo.UserDefined)
// Disallow overwriting an object on legal hold
if legalHold.Status == "ON" {
return mode, retainDate, legalHold, ErrObjectLocked
}
}
if legalHoldRequested {
if !isWORMBucket {
return mode, retainDate, legalHold, ErrInvalidBucketObjectLockConfiguration
}
var lerr error
if legalHold, lerr = objectlock.ParseObjectLockLegalHoldHeaders(r.Header); lerr != nil {
return mode, retainDate, legalHold, toAPIErrorCode(ctx, err)
}
}
if retentionRequested {
if !isWORMBucket {
return mode, retainDate, ErrInvalidBucketObjectLockConfiguration
return mode, retainDate, legalHold, ErrInvalidBucketObjectLockConfiguration
}
rMode, rDate, err := parseObjectLockRetentionHeaders(r.Header)
legalHold, err := objectlock.ParseObjectLockLegalHoldHeaders(r.Header)
if err != nil {
return mode, retainDate, toAPIErrorCode(ctx, err)
return mode, retainDate, legalHold, toAPIErrorCode(ctx, err)
}
rMode, rDate, err := objectlock.ParseObjectLockRetentionHeaders(r.Header)
if err != nil {
return mode, retainDate, legalHold, toAPIErrorCode(ctx, err)
}
// AWS S3 just creates a new version of object when an object is being overwritten.
t, err := UTCNowNTP()
t, err := objectlock.UTCNowNTP()
if err != nil {
logger.LogIf(ctx, err)
return mode, retainDate, ErrObjectLocked
return mode, retainDate, legalHold, ErrObjectLocked
}
if objExists && retainDate.After(t) {
return mode, retainDate, ErrObjectLocked
return mode, retainDate, legalHold, ErrObjectLocked
}
if rMode == Invalid {
return mode, retainDate, toAPIErrorCode(ctx, errObjectLockInvalidHeaders)
if rMode == objectlock.Invalid {
return mode, retainDate, legalHold, toAPIErrorCode(ctx, objectlock.ErrObjectLockInvalidHeaders)
}
if retentionPermErr != ErrNone {
return mode, retainDate, retentionPermErr
return mode, retainDate, legalHold, retentionPermErr
}
return rMode, rDate, ErrNone
return rMode, rDate, legalHold, ErrNone
}
if !retentionRequested && isWORMBucket {
if retention.IsEmpty() && (mode == Compliance || mode == Governance) {
return mode, retainDate, ErrObjectLocked
if retention.IsEmpty() && (mode == objectlock.Compliance || mode == objectlock.Governance) {
return mode, retainDate, legalHold, ErrObjectLocked
}
if retentionPermErr != ErrNone {
return mode, retainDate, retentionPermErr
return mode, retainDate, legalHold, retentionPermErr
}
t, err := UTCNowNTP()
t, err := objectlock.UTCNowNTP()
if err != nil {
logger.LogIf(ctx, err)
return mode, retainDate, ErrObjectLocked
return mode, retainDate, legalHold, ErrObjectLocked
}
// AWS S3 just creates a new version of object when an object is being overwritten.
if objExists && retainDate.After(t) {
return mode, retainDate, ErrObjectLocked
return mode, retainDate, legalHold, ErrObjectLocked
}
if !legalHoldRequested {
// inherit retention from bucket configuration
return retention.Mode, objectlock.RetentionDate{Time: t.Add(retention.Validity)}, legalHold, ErrNone
}
// inherit retention from bucket configuration
return retention.Mode, RetentionDate{t.Add(retention.Validity)}, ErrNone
}
return mode, retainDate, ErrNone
}
// filter object lock metadata if s3:GetObjectRetention permission is denied or if isCopy flag set.
func filterObjectLockMetadata(ctx context.Context, r *http.Request, bucket, object string, metadata map[string]string, isCopy bool, getRetPerms APIErrorCode) map[string]string {
// Copy on write
dst := metadata
var copied bool
delKey := func(key string) {
if _, ok := metadata[key]; !ok {
return
}
if !copied {
dst = make(map[string]string, len(metadata))
for k, v := range metadata {
dst[k] = v
}
copied = true
}
delete(dst, key)
}
ret := getObjectRetentionMeta(metadata)
if ret.Mode == Invalid || isCopy {
delKey(xhttp.AmzObjectLockMode)
delKey(xhttp.AmzObjectLockRetainUntilDate)
return metadata
}
if getRetPerms == ErrNone {
return dst
}
delKey(xhttp.AmzObjectLockMode)
delKey(xhttp.AmzObjectLockRetainUntilDate)
return dst
return mode, retainDate, legalHold, ErrNone
}

View File

@@ -35,6 +35,7 @@ import (
"github.com/minio/minio/pkg/lifecycle"
"github.com/minio/minio/pkg/madmin"
xnet "github.com/minio/minio/pkg/net"
"github.com/minio/minio/pkg/objectlock"
"github.com/minio/minio/pkg/policy"
trace "github.com/minio/minio/pkg/trace"
)
@@ -430,7 +431,7 @@ func (client *peerRESTClient) PutBucketNotification(bucket string, rulesMap even
}
// PutBucketObjectLockConfig - PUT bucket object lock configuration.
func (client *peerRESTClient) PutBucketObjectLockConfig(bucket string, retention Retention) error {
func (client *peerRESTClient) PutBucketObjectLockConfig(bucket string, retention objectlock.Retention) error {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)

View File

@@ -33,6 +33,7 @@ import (
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/lifecycle"
objectlock "github.com/minio/minio/pkg/objectlock"
"github.com/minio/minio/pkg/policy"
trace "github.com/minio/minio/pkg/trace"
)
@@ -845,7 +846,7 @@ func (s *peerRESTServer) PutBucketObjectLockConfigHandler(w http.ResponseWriter,
return
}
var retention Retention
var retention objectlock.Retention
if r.ContentLength < 0 {
s.writeErrorResponse(w, errInvalidArgument)
return

View File

@@ -39,7 +39,6 @@ import (
"sync"
"time"
"github.com/beevik/ntp"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/handlers"
@@ -354,17 +353,6 @@ func UTCNow() time.Time {
return time.Now().UTC()
}
// UTCNowNTP - is similar in functionality to UTCNow()
// but only used when we do not wish to rely on system
// time.
func UTCNowNTP() (time.Time, error) {
// ntp server is disabled
if ntpServer == "" {
return UTCNow(), nil
}
return ntp.Time(ntpServer)
}
// GenETag - generate UUID based ETag
func GenETag() string {
return ToS3ETag(getMD5Hash([]byte(mustGetUUID())))

View File

@@ -50,6 +50,7 @@ import (
"github.com/minio/minio/pkg/hash"
iampolicy "github.com/minio/minio/pkg/iam/policy"
"github.com/minio/minio/pkg/ioutil"
"github.com/minio/minio/pkg/objectlock"
"github.com/minio/minio/pkg/policy"
)
@@ -928,6 +929,7 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
object := vars["object"]
retPerms := ErrAccessDenied
holdPerms := ErrAccessDenied
claims, owner, authErr := webRequestAuthenticate(r)
if authErr != nil {
@@ -974,7 +976,17 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
}) {
retPerms = ErrNone
}
if globalIAMSys.IsAllowed(iampolicy.Args{
AccountName: claims.AccessKey(),
Action: iampolicy.PutObjectLegalHoldAction,
BucketName: bucket,
ConditionValues: getConditionValues(r, "", claims.AccessKey(), claims.Map()),
IsOwner: owner,
ObjectName: object,
Claims: claims.Map(),
}) {
holdPerms = ErrNone
}
}
// Check if bucket is a reserved bucket name or invalid.
@@ -1068,11 +1080,14 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
getObjectInfo = web.CacheAPI().GetObjectInfo
}
// enforce object retention rules
retentionMode, retentionDate, s3Err := checkPutObjectRetentionAllowed(ctx, r, bucket, object, getObjectInfo, retPerms)
retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, getObjectInfo, retPerms, holdPerms)
if s3Err == ErrNone && retentionMode != "" {
opts.UserDefined[xhttp.AmzObjectLockMode] = string(retentionMode)
opts.UserDefined[xhttp.AmzObjectLockRetainUntilDate] = retentionDate.UTC().Format(time.RFC3339)
}
if s3Err == ErrNone && legalHold.Status != "" {
opts.UserDefined[xhttp.AmzObjectLockLegalHold] = string(legalHold.Status)
}
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
@@ -1130,6 +1145,7 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) {
token := r.URL.Query().Get("token")
getRetPerms := ErrAccessDenied
legalHoldPerms := ErrAccessDenied
claims, owner, authErr := webTokenAuthenticate(token)
if authErr != nil {
@@ -1154,6 +1170,15 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) {
}) {
getRetPerms = ErrNone
}
if globalPolicySys.IsAllowed(policy.Args{
Action: policy.GetObjectLegalHoldAction,
BucketName: bucket,
ConditionValues: getConditionValues(r, "", "", nil),
IsOwner: false,
ObjectName: object,
}) {
legalHoldPerms = ErrNone
}
} else {
writeWebErrorResponse(w, authErr)
return
@@ -1185,6 +1210,17 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) {
}) {
getRetPerms = ErrNone
}
if globalIAMSys.IsAllowed(iampolicy.Args{
AccountName: claims.AccessKey(),
Action: iampolicy.GetObjectLegalHoldAction,
BucketName: bucket,
ConditionValues: getConditionValues(r, "", claims.AccessKey(), claims.Map()),
IsOwner: owner,
ObjectName: object,
Claims: claims.Map(),
}) {
legalHoldPerms = ErrNone
}
}
// Check if bucket is a reserved bucket name or invalid.
@@ -1209,7 +1245,7 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) {
objInfo := gr.ObjInfo
// filter object lock metadata if permission does not permit
objInfo.UserDefined = filterObjectLockMetadata(ctx, r, bucket, object, objInfo.UserDefined, false, getRetPerms)
objInfo.UserDefined = objectlock.FilterObjectLockMetadata(objInfo.UserDefined, getRetPerms != ErrNone, legalHoldPerms != ErrNone)
if objectAPI.IsEncryptionSupported() {
if _, err = DecryptObjectInfo(&objInfo, r.Header); err != nil {
@@ -1304,6 +1340,8 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
token := r.URL.Query().Get("token")
claims, owner, authErr := webTokenAuthenticate(token)
var getRetPerms []APIErrorCode
var legalHoldPerms []APIErrorCode
if authErr != nil {
if authErr == errNoAuthToken {
for _, object := range args.Objects {
@@ -1329,6 +1367,18 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
retentionPerm = ErrNone
}
getRetPerms = append(getRetPerms, retentionPerm)
legalHoldPerm := ErrAccessDenied
if globalPolicySys.IsAllowed(policy.Args{
Action: policy.GetObjectLegalHoldAction,
BucketName: args.BucketName,
ConditionValues: getConditionValues(r, "", "", nil),
IsOwner: false,
ObjectName: pathJoin(args.Prefix, object),
}) {
legalHoldPerm = ErrNone
}
legalHoldPerms = append(legalHoldPerms, legalHoldPerm)
}
} else {
writeWebErrorResponse(w, authErr)
@@ -1354,7 +1404,7 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
retentionPerm := ErrAccessDenied
if globalIAMSys.IsAllowed(iampolicy.Args{
AccountName: claims.AccessKey(),
Action: iampolicy.GetObjectAction,
Action: iampolicy.GetObjectRetentionAction,
BucketName: args.BucketName,
ConditionValues: getConditionValues(r, "", claims.AccessKey(), claims.Map()),
IsOwner: owner,
@@ -1364,6 +1414,20 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
retentionPerm = ErrNone
}
getRetPerms = append(getRetPerms, retentionPerm)
legalHoldPerm := ErrAccessDenied
if globalIAMSys.IsAllowed(iampolicy.Args{
AccountName: claims.AccessKey(),
Action: iampolicy.GetObjectLegalHoldAction,
BucketName: args.BucketName,
ConditionValues: getConditionValues(r, "", claims.AccessKey(), claims.Map()),
IsOwner: owner,
ObjectName: pathJoin(args.Prefix, object),
Claims: claims.Map(),
}) {
legalHoldPerm = ErrNone
}
legalHoldPerms = append(legalHoldPerms, legalHoldPerm)
}
}
@@ -1394,7 +1458,7 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
info := gr.ObjInfo
// filter object lock metadata if permission does not permit
info.UserDefined = filterObjectLockMetadata(ctx, r, args.BucketName, objectName, info.UserDefined, false, getRetPerms[i])
info.UserDefined = objectlock.FilterObjectLockMetadata(info.UserDefined, getRetPerms[i] != ErrNone, legalHoldPerms[i] != ErrNone)
if info.IsCompressed() {
// For reporting, set the file size to the uncompressed size.