cleanup object-lock/bucket tagging for gateways (#9548)

This PR is to ensure that we call the relevant object
layer APIs for necessary S3 API level functionalities
allowing gateway implementations to return proper
errors as NotImplemented{}

This allows for all our tests in mint to behave
appropriately and can be handled appropriately as
well.
This commit is contained in:
Harshavardhana
2020-05-08 13:44:44 -07:00
committed by GitHub
parent 6885c72f32
commit a1de9cec58
49 changed files with 681 additions and 375 deletions

View File

@@ -19,10 +19,12 @@ package cmd
import (
"bytes"
"context"
"encoding/xml"
"errors"
"math"
"net/http"
"path"
"sync"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth"
@@ -30,6 +32,54 @@ import (
"github.com/minio/minio/pkg/bucket/policy"
)
// BucketObjectLockSys - map of bucket and retention configuration.
type BucketObjectLockSys struct {
sync.RWMutex
retentionMap map[string]*objectlock.Retention
}
// Set - set retention configuration.
func (sys *BucketObjectLockSys) Set(bucketName string, retention *objectlock.Retention) {
if globalIsGateway {
// no-op
return
}
sys.Lock()
sys.retentionMap[bucketName] = retention
sys.Unlock()
}
// Get - Get retention configuration.
func (sys *BucketObjectLockSys) Get(bucketName string) (r *objectlock.Retention, ok bool) {
if globalIsGateway {
// When gateway is enabled, no cached value
// is used to validate bucket object lock configuration
objAPI := newObjectLayerWithoutSafeModeFn()
if objAPI == nil {
return
}
lc, err := objAPI.GetBucketObjectLockConfig(GlobalContext, bucketName)
if err != nil {
return
}
return lc.ToRetention(), true
}
sys.RLock()
defer sys.RUnlock()
r, ok = sys.retentionMap[bucketName]
return r, ok
}
// Remove - removes retention sysuration.
func (sys *BucketObjectLockSys) Remove(bucketName string) {
sys.Lock()
delete(sys.retentionMap, bucketName)
sys.Unlock()
}
// Similar to enforceRetentionBypassForDelete but for WebUI
func enforceRetentionBypassForDeleteWeb(ctx context.Context, r *http.Request, bucket, object string, getObjectInfoFn GetObjectInfoFn, govBypassPerms bool) APIErrorCode {
opts, err := getOpts(ctx, r, bucket, object)
@@ -295,7 +345,7 @@ func checkPutObjectLockAllowed(ctx context.Context, r *http.Request, bucket, obj
retentionRequested := objectlock.IsObjectLockRetentionRequested(r.Header)
legalHoldRequested := objectlock.IsObjectLockLegalHoldRequested(r.Header)
retentionCfg, isWORMBucket := globalBucketObjectLockConfig.Get(bucket)
retentionCfg, isWORMBucket := globalBucketObjectLockSys.Get(bucket)
if !isWORMBucket {
if legalHoldRequested || retentionRequested {
return mode, retainDate, legalHold, ErrInvalidBucketObjectLockConfiguration
@@ -378,7 +428,74 @@ func checkPutObjectLockAllowed(ctx context.Context, r *http.Request, bucket, obj
return mode, retainDate, legalHold, ErrNone
}
func initBucketObjectLockConfig(buckets []BucketInfo, objAPI ObjectLayer) error {
func readBucketObjectLockConfig(ctx context.Context, objAPI ObjectLayer, bucket string) (*objectlock.Config, error) {
meta, err := loadBucketMetadata(ctx, objAPI, bucket)
if err != nil && err != errMetaDataConverted {
return nil, toObjectErr(err, bucket)
}
if !meta.LockEnabled {
return nil, BucketObjectLockConfigNotFound{Bucket: bucket}
}
configFile := path.Join(bucketConfigPrefix, bucket, objectLockConfig)
configData, err := readConfig(ctx, objAPI, configFile)
if err != nil {
if err != errConfigNotFound {
return nil, toObjectErr(err, bucket)
}
return objectlock.NewObjectLockConfig(), nil
}
cfg, err := objectlock.ParseObjectLockConfig(bytes.NewReader(configData))
if err != nil {
return nil, toObjectErr(err, bucket)
}
return cfg, nil
}
func saveBucketObjectLockConfig(ctx context.Context, objAPI ObjectLayer, bucket string, config *objectlock.Config) error {
meta, err := loadBucketMetadata(ctx, objAPI, bucket)
if err != nil && err != errMetaDataConverted {
return toObjectErr(err, bucket)
}
if !meta.LockEnabled {
return BucketObjectLockConfigNotFound{Bucket: bucket}
}
data, err := xml.Marshal(config)
if err != nil {
return toObjectErr(err, bucket)
}
configFile := path.Join(bucketConfigPrefix, bucket, objectLockConfig)
if err = saveConfig(ctx, objAPI, configFile, data); err != nil {
return toObjectErr(err, bucket)
}
return nil
}
// NewBucketObjectLockSys returns initialized BucketObjectLockSys
func NewBucketObjectLockSys() *BucketObjectLockSys {
return &BucketObjectLockSys{
retentionMap: make(map[string]*objectlock.Retention),
}
}
// Init - initializes bucket object lock config system for all buckets
func (sys *BucketObjectLockSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error {
if objAPI == nil {
return errServerNotInitialized
}
// In gateway mode, we always fetch the bucket object lock configuration from the gateway backend.
// So, this is a no-op for gateway servers.
if globalIsGateway {
return nil
}
// Load BucketObjectLockSys once during boot.
return sys.load(buckets, objAPI)
}
func (sys *BucketObjectLockSys) load(buckets []BucketInfo, objAPI ObjectLayer) error {
for _, bucket := range buckets {
ctx := logger.SetReqInfo(GlobalContext, &logger.ReqInfo{BucketName: bucket.Name})
meta, err := loadBucketMetadata(ctx, objAPI, bucket.Name)
@@ -398,7 +515,7 @@ func initBucketObjectLockConfig(buckets []BucketInfo, objAPI ObjectLayer) error
configData, err := readConfig(ctx, objAPI, configFile)
if err != nil {
if errors.Is(err, errConfigNotFound) {
globalBucketObjectLockConfig.Set(bucket.Name, &objectlock.Retention{})
globalBucketObjectLockSys.Set(bucket.Name, &objectlock.Retention{})
continue
}
return err
@@ -412,7 +529,7 @@ func initBucketObjectLockConfig(buckets []BucketInfo, objAPI ObjectLayer) error
if config.Rule != nil {
retention = config.ToRetention()
}
globalBucketObjectLockConfig.Set(bucket.Name, retention)
globalBucketObjectLockSys.Set(bucket.Name, retention)
}
return nil
}