Add Get/Put Bucket Lock Configuration API support (#8120)

This feature implements [PUT Bucket object lock configuration][1] and
[GET Bucket object lock configuration][2]. After object lock
configuration is set, existing and new objects are set to WORM for
specified duration. Currently Governance mode works exactly like
Compliance mode.

Fixes #8101

[1] https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTObjectLockConfiguration.html
[2] https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETObjectLockConfiguration.html
This commit is contained in:
Bala FA
2019-11-13 04:20:18 +05:30
committed by kannappanr
parent 2dad14974e
commit fb48ca5020
18 changed files with 863 additions and 77 deletions

View File

@@ -111,6 +111,14 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool)
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(collectAPIStats("putobject", httpTraceHdrs(api.PutObjectHandler)))
// DeleteObject
bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(collectAPIStats("deleteobject", httpTraceAll(api.DeleteObjectHandler)))
// PutObjectLegalHold
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(httpTraceHdrs(api.PutObjectLegalHoldHandler)).Queries("legal-hold", "").Queries("versionId", "")
// GetObjectLegalHold
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(httpTraceHdrs(api.GetObjectLegalHoldHandler)).Queries("legal-hold", "").Queries("versionId", "")
// PutObjectRetention
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(httpTraceHdrs(api.PutObjectRetentionHandler)).Queries("retention", "").Queries("versionId", "")
// GetObjectRetention
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(httpTraceHdrs(api.GetObjectRetentionHandler)).Queries("retention", "").Queries("versionId", "")
/// Bucket operations
// GetBucketLocation
@@ -127,8 +135,6 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool)
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketcors", httpTraceAll(api.GetBucketCorsHandler))).Queries("cors", "")
// GetBucketWebsiteHandler - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketwebsite", httpTraceAll(api.GetBucketWebsiteHandler))).Queries("website", "")
// GetBucketVersioningHandler - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketversion", httpTraceAll(api.GetBucketVersioningHandler))).Queries("versioning", "")
// GetBucketAccelerateHandler - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketaccelerate", httpTraceAll(api.GetBucketAccelerateHandler))).Queries("accelerate", "")
// GetBucketRequestPaymentHandler - this is a dummy call.
@@ -146,6 +152,10 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool)
// DeleteBucketTaggingHandler
bucket.Methods(http.MethodDelete).HandlerFunc(collectAPIStats("deletebuckettagging", httpTraceAll(api.DeleteBucketTaggingHandler))).Queries("tagging", "")
// GetBucketObjectLockConfig
bucket.Methods(http.MethodGet).HandlerFunc(httpTraceAll(api.GetBucketObjectLockConfigHandler)).Queries("object-lock", "")
// GetBucketVersioning
bucket.Methods(http.MethodGet).HandlerFunc(httpTraceAll(api.GetBucketVersioningHandler)).Queries("versioning", "")
// GetBucketNotification
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketnotification", httpTraceAll(api.GetBucketNotificationHandler))).Queries("notification", "")
// ListenBucketNotification
@@ -163,6 +173,10 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool)
// PutBucketPolicy
bucket.Methods("PUT").HandlerFunc(collectAPIStats("putbucketpolicy", httpTraceAll(api.PutBucketPolicyHandler))).Queries("policy", "")
// PutBucketObjectLockConfig
bucket.Methods(http.MethodPut).HandlerFunc(httpTraceAll(api.PutBucketObjectLockConfigHandler)).Queries("object-lock", "")
// PutBucketVersioning
bucket.Methods(http.MethodPut).HandlerFunc(httpTraceAll(api.PutBucketVersioningHandler)).Queries("versioning", "")
// PutBucketNotification
bucket.Methods(http.MethodPut).HandlerFunc(collectAPIStats("putbucketnotification", httpTraceAll(api.PutBucketNotificationHandler))).Queries("notification", "")
// PutBucket

View File

@@ -44,6 +44,13 @@ import (
"github.com/minio/minio/pkg/sync/errgroup"
)
const (
getBucketVersioningResponse = `<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"/>`
objectLockConfig = "object-lock.xml"
objectLockEnabledConfigFile = "object-lock-enabled.json"
objectLockEnabledConfig = `{"x-amz-bucket-object-lock-enabled":true}`
)
// Check if there are buckets on server without corresponding entry in etcd backend and
// make entries. Here is the general flow
// - Range over all the available buckets
@@ -364,7 +371,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
}
// Deny if WORM is enabled
if globalWORMEnabled {
if _, isWORMBucket := isWORMEnabled(bucket); isWORMBucket {
// Not required to check whether given objects exist or not, because
// DeleteMultipleObject is always successful irrespective of object existence.
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
@@ -473,6 +480,16 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
vars := mux.Vars(r)
bucket := vars["bucket"]
objectLockEnabled := false
if _, found := r.Header[http.CanonicalHeaderKey("x-amz-bucket-object-lock-enabled")]; found {
if r.Header.Get("x-amz-bucket-object-lock-enabled") != "true" {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL, guessIsBrowserReq(r))
return
}
objectLockEnabled = true
}
if s3Error := checkRequestAuthType(ctx, r, policy.CreateBucketAction, bucket, ""); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return
@@ -500,6 +517,15 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if objectLockEnabled {
configFile := path.Join(bucketConfigPrefix, bucket, objectLockEnabledConfigFile)
if err = saveConfig(ctx, objectAPI, configFile, []byte(objectLockEnabledConfig)); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
}
if err = globalDNSConfig.Put(bucket); err != nil {
objectAPI.DeleteBucket(ctx, bucket)
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
@@ -528,6 +554,14 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
return
}
if objectLockEnabled {
configFile := path.Join(bucketConfigPrefix, bucket, objectLockEnabledConfigFile)
if err = saveConfig(ctx, objectAPI, configFile, []byte(objectLockEnabledConfig)); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
}
// Make sure to add Location information here only for bucket
w.Header().Set(xhttp.Location, path.Clean(r.URL.Path)) // Clean any trailing slashes.
@@ -858,6 +892,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
}
}
globalBucketRetentionConfig.Delete(bucket)
globalNotificationSys.RemoveNotification(bucket)
globalPolicySys.Remove(bucket)
globalNotificationSys.DeleteBucket(ctx, bucket)
@@ -867,3 +902,181 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
// Write success response.
writeSuccessNoContent(w)
}
// PutBucketVersioningHandler - PUT Bucket Versioning.
// ----------
// No-op. Available for API compatibility.
func (api objectAPIHandlers) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "PutBucketVersioning")
defer logger.AuditLog(w, r, "PutBucketVersioning", mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return
}
getBucketInfo := objectAPI.GetBucketInfo
if _, err := getBucketInfo(ctx, bucket); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
// Write success response.
writeSuccessResponseHeadersOnly(w)
}
// GetBucketVersioningHandler - GET Bucket Versioning.
// ----------
// No-op. Available for API compatibility.
func (api objectAPIHandlers) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "GetBucketVersioning")
defer logger.AuditLog(w, r, "GetBucketVersioning", mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return
}
getBucketInfo := objectAPI.GetBucketInfo
if _, err := getBucketInfo(ctx, bucket); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
// Write success response.
writeSuccessResponseXML(w, []byte(getBucketVersioningResponse))
}
// PutBucketObjectLockConfigHandler - PUT Bucket object lock configuration.
// ----------
// Places an Object Lock configuration on the specified bucket. The rule
// specified in the Object Lock configuration will be applied by default
// to every new object placed in the specified bucket.
func (api objectAPIHandlers) PutBucketObjectLockConfigHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "PutBucketObjectLockConfig")
defer logger.AuditLog(w, r, "PutBucketObjectLockConfig", mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return
}
// Deny if WORM is enabled
if globalWORMEnabled {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
return
}
config, err := parseObjectLockConfig(r.Body)
if err != nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMalformedXML), r.URL, guessIsBrowserReq(r))
return
}
configFile := path.Join(bucketConfigPrefix, bucket, objectLockEnabledConfigFile)
configData, err := readConfig(ctx, objectAPI, configFile)
if err != nil {
aerr := toAPIError(ctx, err)
if err == errConfigNotFound {
aerr = errorCodes.ToAPIErr(ErrMethodNotAllowed)
}
writeErrorResponse(ctx, w, aerr, r.URL, guessIsBrowserReq(r))
return
}
if string(configData) != objectLockEnabledConfig {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL, guessIsBrowserReq(r))
return
}
if config.Rule != nil {
data, err := xml.Marshal(config)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
configFile := path.Join(bucketConfigPrefix, bucket, objectLockConfig)
if err = saveConfig(ctx, objectAPI, configFile, data); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
retention := config.ToRetention()
globalBucketRetentionConfig.Set(bucket, retention)
globalNotificationSys.PutBucketObjectLockConfig(ctx, bucket, retention)
} else {
globalBucketRetentionConfig.Delete(bucket)
}
// Write success response.
writeSuccessResponseHeadersOnly(w)
}
// GetBucketObjectLockConfigHandler - GET Bucket object lock configuration.
// ----------
// Gets the Object Lock configuration for a bucket. The rule specified in
// the Object Lock configuration will be applied by default to every new
// object placed in the specified bucket.
func (api objectAPIHandlers) GetBucketObjectLockConfigHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "GetBucketObjectLockConfig")
defer logger.AuditLog(w, r, "GetBucketObjectLockConfig", mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return
}
configFile := path.Join(bucketConfigPrefix, bucket, objectLockEnabledConfigFile)
configData, err := readConfig(ctx, objectAPI, configFile)
if err != nil {
aerr := toAPIError(ctx, err)
if err == errConfigNotFound {
aerr = errorCodes.ToAPIErr(ErrMethodNotAllowed)
}
writeErrorResponse(ctx, w, aerr, r.URL, guessIsBrowserReq(r))
return
}
if string(configData) != objectLockEnabledConfig {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL, guessIsBrowserReq(r))
return
}
configFile = path.Join(bucketConfigPrefix, bucket, objectLockConfig)
configData, err = readConfig(ctx, objectAPI, configFile)
if err != nil {
if err != errConfigNotFound {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if configData, err = xml.Marshal(newObjectLockConfig()); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
}
// Write success response.
writeSuccessResponseXML(w, configData)
}

View File

@@ -48,12 +48,6 @@ func (api objectAPIHandlers) GetBucketWebsiteHandler(w http.ResponseWriter, r *h
w.(http.Flusher).Flush()
}
// GetBucketVersioning - GET bucket versioning, a dummy api
func (api objectAPIHandlers) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Request) {
writeSuccessResponseHeadersOnly(w)
w.(http.Flusher).Flush()
}
// GetBucketAccelerate - GET bucket accelerate, a dummy api
func (api objectAPIHandlers) GetBucketAccelerateHandler(w http.ResponseWriter, r *http.Request) {
writeSuccessResponseHeadersOnly(w)

View File

@@ -683,8 +683,8 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
}
// Deny if WORM is enabled
if globalWORMEnabled {
if _, err = fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object)); err == nil {
if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket {
if fi, err := fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object)); err == nil && retention.Retain(fi.ModTime()) {
return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object}
}
}

View File

@@ -925,8 +925,8 @@ func (fs *FSObjects) putObject(ctx context.Context, bucket string, object string
// Entire object was written to the temp location, now it's safe to rename it to the actual location.
fsNSObjPath := pathJoin(fs.fsPath, bucket, object)
// Deny if WORM is enabled
if globalWORMEnabled {
if _, err = fsStatFile(ctx, fsNSObjPath); err == nil {
if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket {
if fi, err := fsStatFile(ctx, fsNSObjPath); err == nil && retention.Retain(fi.ModTime()) {
return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object}
}
}

View File

@@ -202,6 +202,8 @@ var (
// Is worm enabled
globalWORMEnabled bool
globalBucketRetentionConfig = newBucketRetentionConfig()
// Disk cache drives
globalCacheConfig cache.Config

View File

@@ -789,6 +789,35 @@ func (sys *NotificationSys) load(buckets []BucketInfo, objAPI ObjectLayer) error
return nil
}
func (sys *NotificationSys) initBucketRetentionConfig(objAPI ObjectLayer) error {
buckets, err := objAPI.ListBuckets(context.Background())
if err != nil {
return err
}
for _, bucket := range buckets {
ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucket.Name})
configFile := path.Join(bucketConfigPrefix, bucket.Name, objectLockConfig)
configData, err := readConfig(ctx, objAPI, configFile)
if err != nil {
if err == errConfigNotFound {
continue
}
return err
}
config, err := parseObjectLockConfig(bytes.NewReader(configData))
if err != nil {
return err
}
if config.Rule != nil {
globalBucketRetentionConfig.Set(bucket.Name, config.ToRetention())
}
}
return nil
}
// Init - initializes notification system from notification.xml and listener.json of all buckets.
func (sys *NotificationSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error {
if objAPI == nil {
@@ -808,7 +837,8 @@ func (sys *NotificationSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error
// - Read quorum is lost just after the initialization
// of the object layer.
retryTimerCh := newRetryTimerSimple(doneCh)
for {
stop := false
for !stop {
select {
case <-retryTimerCh:
if err := sys.load(buckets, objAPI); err != nil {
@@ -820,6 +850,26 @@ func (sys *NotificationSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error
}
return err
}
stop = true
case <-globalOSSignalCh:
return fmt.Errorf("Initializing Notification sub-system gracefully stopped")
}
}
// Initializing bucket retention config needs a retry mechanism if
// read quorum is lost just after the initialization of the object layer.
for {
select {
case <-retryTimerCh:
if err := sys.initBucketRetentionConfig(objAPI); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {
logger.Info("Waiting for bucket retention configuration to be initialized..")
continue
}
return err
}
return nil
case <-globalOSSignalCh:
return fmt.Errorf("Initializing Notification sub-system gracefully stopped")
@@ -926,6 +976,25 @@ func (sys *NotificationSys) Send(args eventArgs) []event.TargetIDErr {
return sys.send(args.BucketName, args.ToEvent(), targetIDs...)
}
// PutBucketObjectLockConfig - put bucket object lock configuration to all peers.
func (sys *NotificationSys) PutBucketObjectLockConfig(ctx context.Context, bucketName string, retention Retention) {
var wg sync.WaitGroup
for _, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(client *peerRESTClient) {
defer wg.Done()
if err := client.PutBucketObjectLockConfig(bucketName, retention); err != nil {
logger.GetReqInfo(ctx).AppendTags("remotePeer", client.host.Name)
logger.LogIf(ctx, err)
}
}(client)
}
wg.Wait()
}
// NetReadPerfInfo - Network read performance information.
func (sys *NotificationSys) NetReadPerfInfo(size int64) []ServerNetReadPerfInfo {
reply := make([]ServerNetReadPerfInfo, len(sys.peerClients))

View File

@@ -736,8 +736,10 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
}
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
if globalWORMEnabled { // Deny if WORM is enabled.
if _, err := objectAPI.GetObjectInfo(ctx, dstBucket, dstObject, dstOpts); err == nil {
// Deny if WORM is enabled.
if retention, isWORMBucket := isWORMEnabled(dstBucket); isWORMBucket {
if oi, err := objectAPI.GetObjectInfo(ctx, dstBucket, dstObject, dstOpts); err == nil && retention.Retain(oi.ModTime) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
return
}
@@ -1202,11 +1204,6 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
putObject = api.CacheAPI().PutObject
}
rawReader := hashReader
pReader := NewPutObjReader(rawReader, nil, nil)
@@ -1220,8 +1217,8 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
}
// Deny if WORM is enabled
if globalWORMEnabled {
if _, err = getObjectInfo(ctx, bucket, object, opts); err == nil {
if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket {
if oi, err := objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil && retention.Retain(oi.ModTime) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
return
}
@@ -1347,8 +1344,8 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
}
// Deny if WORM is enabled
if globalWORMEnabled {
if _, err = objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil {
if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket {
if oi, err := objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil && retention.Retain(oi.ModTime) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
return
}
@@ -1520,8 +1517,8 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
}
// Deny if WORM is enabled
if globalWORMEnabled {
if _, err = objectAPI.GetObjectInfo(ctx, dstBucket, dstObject, dstOpts); err == nil {
if retention, isWORMBucket := isWORMEnabled(dstBucket); isWORMBucket {
if oi, err := objectAPI.GetObjectInfo(ctx, dstBucket, dstObject, dstOpts); err == nil && retention.Retain(oi.ModTime) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
return
}
@@ -1897,8 +1894,8 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
pReader := NewPutObjReader(rawReader, nil, nil)
// Deny if WORM is enabled
if globalWORMEnabled {
if _, err = objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil {
if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket {
if oi, err := objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil && retention.Retain(oi.ModTime) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
return
}
@@ -2006,8 +2003,8 @@ func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter,
}
// Deny if WORM is enabled
if globalWORMEnabled {
if _, err := objectAPI.GetObjectInfo(ctx, bucket, object, ObjectOptions{}); err == nil {
if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket {
if oi, err := objectAPI.GetObjectInfo(ctx, bucket, object, ObjectOptions{}); err == nil && retention.Retain(oi.ModTime) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
return
}
@@ -2182,8 +2179,8 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
}
// Deny if WORM is enabled
if globalWORMEnabled {
if _, err := objectAPI.GetObjectInfo(ctx, bucket, object, ObjectOptions{}); err == nil {
if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket {
if oi, err := objectAPI.GetObjectInfo(ctx, bucket, object, ObjectOptions{}); err == nil && retention.Retain(oi.ModTime) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
return
}
@@ -2377,7 +2374,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
}
// Deny if WORM is enabled
if globalWORMEnabled {
if _, isWORMBucket := isWORMEnabled(bucket); isWORMBucket {
// Not required to check whether given object exists or not, because
// DeleteObject is always successful irrespective of object existence.
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
@@ -2404,3 +2401,163 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
}
writeSuccessNoContent(w)
}
// PutObjectLegalHoldHandler - set legal hold configuration to object,
func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "PutObjectLegalHold")
defer logger.AuditLog(w, r, "PutObjectLegalHold", mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
object := vars["object"]
if vid := r.URL.Query().Get("versionId"); vid != "" && vid != "null" {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNoSuchVersion), r.URL, guessIsBrowserReq(r))
return
}
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if _, err = getObjectInfo(ctx, bucket, object, opts); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r))
}
// GetObjectLegalHoldHandler - get legal hold configuration to object,
func (api objectAPIHandlers) GetObjectLegalHoldHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "GetObjectLegalHold")
defer logger.AuditLog(w, r, "GetObjectLegalHold", mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
object := vars["object"]
if vid := r.URL.Query().Get("versionId"); vid != "" && vid != "null" {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNoSuchVersion), r.URL, guessIsBrowserReq(r))
return
}
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if _, err = getObjectInfo(ctx, bucket, object, opts); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r))
}
// PutObjectRetentionHandler - set legal hold configuration to object,
func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "PutObjectRetention")
defer logger.AuditLog(w, r, "PutObjectRetention", mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
object := vars["object"]
if vid := r.URL.Query().Get("versionId"); vid != "" && vid != "null" {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNoSuchVersion), r.URL, guessIsBrowserReq(r))
return
}
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if _, err = getObjectInfo(ctx, bucket, object, opts); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r))
}
// GetObjectRetentionHandler - get legal hold configuration to object,
func (api objectAPIHandlers) GetObjectRetentionHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "GetObjectRetention")
defer logger.AuditLog(w, r, "GetObjectRetention", mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
object := vars["object"]
if vid := r.URL.Query().Get("versionId"); vid != "" && vid != "null" {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNoSuchVersion), r.URL, guessIsBrowserReq(r))
return
}
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
opts, err := getOpts(ctx, r, bucket, object)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if _, err = getObjectInfo(ctx, bucket, object, opts); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r))
}

191
cmd/object-lock.go Normal file
View File

@@ -0,0 +1,191 @@
/*
* MinIO Cloud Storage, (C) 2016, 2017 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"encoding/xml"
"fmt"
"io"
"sync"
"time"
)
// RetentionMode - object retention mode.
type RetentionMode string
const (
// Governance - governance mode.
Governance RetentionMode = "GOVERNANCE"
// Compliance - compliance mode.
Compliance RetentionMode = "COMPLIANCE"
)
// Retention - bucket level retention configuration.
type Retention struct {
Mode RetentionMode
Validity time.Duration
}
// IsEmpty - returns whether retention is empty or not.
func (r Retention) IsEmpty() bool {
return r.Mode == "" || r.Validity == 0
}
// Retain - check whether given date is retainable by validity time.
func (r Retention) Retain(created time.Time) bool {
return globalWORMEnabled || created.Add(r.Validity).After(time.Now())
}
// BucketRetentionConfig - map of bucket and retention configuration.
type BucketRetentionConfig struct {
sync.RWMutex
retentionMap map[string]Retention
}
// Set - set retention configuration.
func (config *BucketRetentionConfig) Set(bucketName string, retention Retention) {
config.Lock()
config.retentionMap[bucketName] = retention
config.Unlock()
}
// Get - Get retention configuration.
func (config *BucketRetentionConfig) Get(bucketName string) (r Retention, ok bool) {
config.RLock()
defer config.RUnlock()
r, ok = config.retentionMap[bucketName]
return r, ok
}
// Delete - delete retention configuration.
func (config *BucketRetentionConfig) Delete(bucketName string) {
config.Lock()
delete(config.retentionMap, bucketName)
config.Unlock()
}
func newBucketRetentionConfig() *BucketRetentionConfig {
return &BucketRetentionConfig{
retentionMap: map[string]Retention{},
}
}
// DefaultRetention - default retention configuration.
type DefaultRetention struct {
XMLName xml.Name `xml:"DefaultRetention"`
Mode RetentionMode `xml:"Mode"`
Days *uint64 `xml:"Days"`
Years *uint64 `xml:"Years"`
}
// UnmarshalXML - decodes XML data.
func (dr *DefaultRetention) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
// Make subtype to avoid recursive UnmarshalXML().
type defaultRetention DefaultRetention
retention := defaultRetention{}
if err := d.DecodeElement(&retention, &start); err != nil {
return err
}
switch string(retention.Mode) {
case "GOVERNANCE", "COMPLIANCE":
default:
return fmt.Errorf("unknown retention mode %v", retention.Mode)
}
if retention.Days == nil && retention.Years == nil {
return fmt.Errorf("either Days or Years must be specified")
}
if retention.Days != nil && retention.Years != nil {
return fmt.Errorf("either Days or Years must be specified, not both")
}
if retention.Days != nil {
if *retention.Days == 0 {
return fmt.Errorf("Days should not be zero")
}
} else if *retention.Years == 0 {
return fmt.Errorf("Years should not be zero")
}
*dr = DefaultRetention(retention)
return nil
}
// ObjectLockConfig - object lock configuration specified in
// https://docs.aws.amazon.com/AmazonS3/latest/API/Type_API_ObjectLockConfiguration.html
type ObjectLockConfig struct {
XMLNS string `xml:"xmlns,attr,omitempty"`
XMLName xml.Name `xml:"ObjectLockConfiguration"`
ObjectLockEnabled string `xml:"ObjectLockEnabled"`
Rule *struct {
DefaultRetention DefaultRetention `xml:"DefaultRetention"`
} `xml:"Rule,omitempty"`
}
// UnmarshalXML - decodes XML data.
func (config *ObjectLockConfig) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
// Make subtype to avoid recursive UnmarshalXML().
type objectLockConfig ObjectLockConfig
parsedConfig := objectLockConfig{}
if err := d.DecodeElement(&parsedConfig, &start); err != nil {
return err
}
if parsedConfig.ObjectLockEnabled != "Enabled" {
return fmt.Errorf("only 'Enabled' value is allowd to ObjectLockEnabled element")
}
*config = ObjectLockConfig(parsedConfig)
return nil
}
// ToRetention - convert to Retention type.
func (config *ObjectLockConfig) ToRetention() (r Retention) {
if config.Rule != nil {
r.Mode = config.Rule.DefaultRetention.Mode
utcNow := time.Now().UTC()
if config.Rule.DefaultRetention.Days != nil {
r.Validity = utcNow.AddDate(0, 0, int(*config.Rule.DefaultRetention.Days)).Sub(utcNow)
} else {
r.Validity = utcNow.AddDate(int(*config.Rule.DefaultRetention.Years), 0, 0).Sub(utcNow)
}
}
return r
}
func parseObjectLockConfig(reader io.Reader) (*ObjectLockConfig, error) {
config := ObjectLockConfig{}
if err := xml.NewDecoder(reader).Decode(&config); err != nil {
return nil, err
}
return &config, nil
}
func newObjectLockConfig() *ObjectLockConfig {
return &ObjectLockConfig{
ObjectLockEnabled: "Enabled",
}
}

View File

@@ -430,6 +430,25 @@ func (client *peerRESTClient) PutBucketNotification(bucket string, rulesMap even
return nil
}
// PutBucketObjectLockConfig - PUT bucket object lock configuration.
func (client *peerRESTClient) PutBucketObjectLockConfig(bucket string, retention Retention) error {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
var reader bytes.Buffer
err := gob.NewEncoder(&reader).Encode(&retention)
if err != nil {
return err
}
respBody, err := client.call(peerRESTMethodPutBucketObjectLockConfig, values, &reader, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
return nil
}
// DeletePolicy - delete a specific canned policy.
func (client *peerRESTClient) DeletePolicy(policyName string) (err error) {
values := make(url.Values)

View File

@@ -24,40 +24,41 @@ const (
)
const (
peerRESTMethodNetReadPerfInfo = "/netreadperfinfo"
peerRESTMethodCollectNetPerfInfo = "/collectnetperfinfo"
peerRESTMethodServerInfo = "/serverinfo"
peerRESTMethodCPULoadInfo = "/cpuloadinfo"
peerRESTMethodMemUsageInfo = "/memusageinfo"
peerRESTMethodDrivePerfInfo = "/driveperfinfo"
peerRESTMethodDeleteBucket = "/deletebucket"
peerRESTMethodServerUpdate = "/serverupdate"
peerRESTMethodSignalService = "/signalservice"
peerRESTMethodBackgroundHealStatus = "/backgroundhealstatus"
peerRESTMethodBackgroundOpsStatus = "/backgroundopsstatus"
peerRESTMethodGetLocks = "/getlocks"
peerRESTMethodBucketPolicyRemove = "/removebucketpolicy"
peerRESTMethodLoadUser = "/loaduser"
peerRESTMethodDeleteUser = "/deleteuser"
peerRESTMethodLoadPolicy = "/loadpolicy"
peerRESTMethodLoadPolicyMapping = "/loadpolicymapping"
peerRESTMethodDeletePolicy = "/deletepolicy"
peerRESTMethodLoadUsers = "/loadusers"
peerRESTMethodLoadGroup = "/loadgroup"
peerRESTMethodStartProfiling = "/startprofiling"
peerRESTMethodDownloadProfilingData = "/downloadprofilingdata"
peerRESTMethodBucketPolicySet = "/setbucketpolicy"
peerRESTMethodBucketNotificationPut = "/putbucketnotification"
peerRESTMethodBucketNotificationListen = "/listenbucketnotification"
peerRESTMethodReloadFormat = "/reloadformat"
peerRESTMethodTargetExists = "/targetexists"
peerRESTMethodSendEvent = "/sendevent"
peerRESTMethodTrace = "/trace"
peerRESTMethodBucketLifecycleSet = "/setbucketlifecycle"
peerRESTMethodBucketLifecycleRemove = "/removebucketlifecycle"
peerRESTMethodLog = "/log"
peerRESTMethodHardwareCPUInfo = "/cpuhardwareinfo"
peerRESTMethodHardwareNetworkInfo = "/networkhardwareinfo"
peerRESTMethodNetReadPerfInfo = "/netreadperfinfo"
peerRESTMethodCollectNetPerfInfo = "/collectnetperfinfo"
peerRESTMethodServerInfo = "/serverinfo"
peerRESTMethodCPULoadInfo = "/cpuloadinfo"
peerRESTMethodMemUsageInfo = "/memusageinfo"
peerRESTMethodDrivePerfInfo = "/driveperfinfo"
peerRESTMethodDeleteBucket = "/deletebucket"
peerRESTMethodServerUpdate = "/serverupdate"
peerRESTMethodSignalService = "/signalservice"
peerRESTMethodBackgroundHealStatus = "/backgroundhealstatus"
peerRESTMethodBackgroundOpsStatus = "/backgroundopsstatus"
peerRESTMethodGetLocks = "/getlocks"
peerRESTMethodBucketPolicyRemove = "/removebucketpolicy"
peerRESTMethodLoadUser = "/loaduser"
peerRESTMethodDeleteUser = "/deleteuser"
peerRESTMethodLoadPolicy = "/loadpolicy"
peerRESTMethodLoadPolicyMapping = "/loadpolicymapping"
peerRESTMethodDeletePolicy = "/deletepolicy"
peerRESTMethodLoadUsers = "/loadusers"
peerRESTMethodLoadGroup = "/loadgroup"
peerRESTMethodStartProfiling = "/startprofiling"
peerRESTMethodDownloadProfilingData = "/downloadprofilingdata"
peerRESTMethodBucketPolicySet = "/setbucketpolicy"
peerRESTMethodBucketNotificationPut = "/putbucketnotification"
peerRESTMethodBucketNotificationListen = "/listenbucketnotification"
peerRESTMethodReloadFormat = "/reloadformat"
peerRESTMethodTargetExists = "/targetexists"
peerRESTMethodSendEvent = "/sendevent"
peerRESTMethodTrace = "/trace"
peerRESTMethodBucketLifecycleSet = "/setbucketlifecycle"
peerRESTMethodBucketLifecycleRemove = "/removebucketlifecycle"
peerRESTMethodLog = "/log"
peerRESTMethodHardwareCPUInfo = "/cpuhardwareinfo"
peerRESTMethodHardwareNetworkInfo = "/networkhardwareinfo"
peerRESTMethodPutBucketObjectLockConfig = "putbucketobjectlockconfig"
)
const (

View File

@@ -506,6 +506,7 @@ func (s *peerRESTServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Requ
globalNotificationSys.RemoveNotification(bucketName)
globalPolicySys.Remove(bucketName)
globalBucketRetentionConfig.Delete(bucketName)
w.(http.Flusher).Flush()
}
@@ -756,6 +757,36 @@ func (s *peerRESTServer) PutBucketNotificationHandler(w http.ResponseWriter, r *
w.(http.Flusher).Flush()
}
// PutBucketObjectLockConfigHandler - handles PUT bucket object lock configuration.
func (s *peerRESTServer) PutBucketObjectLockConfigHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
vars := mux.Vars(r)
bucketName := vars[peerRESTBucket]
if bucketName == "" {
s.writeErrorResponse(w, errors.New("Bucket name is missing"))
return
}
var retention Retention
if r.ContentLength < 0 {
s.writeErrorResponse(w, errInvalidArgument)
return
}
err := gob.NewDecoder(r.Body).Decode(&retention)
if err != nil {
s.writeErrorResponse(w, err)
return
}
globalBucketRetentionConfig.Set(bucketName, retention)
w.(http.Flusher).Flush()
}
type listenBucketNotificationReq struct {
EventNames []event.Name `json:"eventNames"`
Pattern string `json:"pattern"`
@@ -992,6 +1023,7 @@ func (s *peerRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool {
func registerPeerRESTHandlers(router *mux.Router) {
server := &peerRESTServer{}
subrouter := router.PathPrefix(peerRESTPrefix).Subrouter()
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodPutBucketObjectLockConfig).HandlerFunc(httpTraceHdrs(server.PutBucketObjectLockConfigHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodNetReadPerfInfo).HandlerFunc(httpTraceHdrs(server.NetReadPerfInfoHandler)).Queries(restQueries(peerRESTNetPerfSize)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCollectNetPerfInfo).HandlerFunc(httpTraceHdrs(server.CollectNetPerfInfoHandler)).Queries(restQueries(peerRESTNetPerfSize)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLocks).HandlerFunc(httpTraceHdrs(server.GetLocksHandler))

View File

@@ -492,3 +492,11 @@ func getMinioMode() string {
func iamPolicyName() string {
return globalOpenIDConfig.ClaimPrefix + iampolicy.PolicyName
}
func isWORMEnabled(bucket string) (Retention, bool) {
if globalWORMEnabled {
return Retention{}, true
}
return globalBucketRetentionConfig.Get(bucket)
}

View File

@@ -668,8 +668,8 @@ next:
// If not a directory, remove the object.
if !hasSuffix(objectName, SlashSeparator) && objectName != "" {
// Deny if WORM is enabled
if globalWORMEnabled {
if _, err = objectAPI.GetObjectInfo(ctx, args.BucketName, objectName, ObjectOptions{}); err == nil {
if retention, isWORMBucket := isWORMEnabled(args.BucketName); isWORMBucket {
if oi, err := objectAPI.GetObjectInfo(ctx, args.BucketName, objectName, ObjectOptions{}); err == nil && retention.Retain(oi.ModTime) {
return toJSONError(ctx, errMethodNotAllowed)
}
}
@@ -1029,8 +1029,8 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
crypto.RemoveSensitiveEntries(metadata)
// Deny if WORM is enabled
if globalWORMEnabled {
if _, err = objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil {
if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket {
if oi, err := objectAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil && retention.Retain(oi.ModTime) {
writeWebErrorResponse(w, errMethodNotAllowed)
return
}

View File

@@ -761,8 +761,10 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
if xl.isObject(bucket, object) {
// Deny if WORM is enabled
if globalWORMEnabled {
return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object}
if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket {
if oi, err := xl.getObjectInfo(ctx, bucket, object); err == nil && retention.Retain(oi.ModTime) {
return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object}
}
}
// Rename if an object already exists to temporary location.

View File

@@ -653,8 +653,10 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
if xl.isObject(bucket, object) {
// Deny if WORM is enabled
if globalWORMEnabled {
return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object}
if retention, isWORMBucket := isWORMEnabled(bucket); isWORMBucket {
if oi, err := xl.getObjectInfo(ctx, bucket, object); err == nil && retention.Retain(oi.ModTime) {
return ObjectInfo{}, ObjectAlreadyExists{Bucket: bucket, Object: object}
}
}
// Rename if an object already exists to temporary location.