mirror of
https://github.com/minio/minio.git
synced 2025-11-09 05:34:56 -05:00
Add API's for managing bucket quota (#9379)
This PR allows setting a "hard" or "fifo" quota restriction at the bucket level. Buckets that have reached the FIFO quota configured, will automatically be cleaned up in FIFO manner until bucket usage drops to configured quota. If a bucket is configured with a "hard" quota ceiling, all further writes are disallowed.
This commit is contained in:
150
cmd/admin-handlers-quota.go
Normal file
150
cmd/admin-handlers-quota.go
Normal file
@@ -0,0 +1,150 @@
|
||||
/*
|
||||
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"path"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/minio/minio/cmd/config"
|
||||
"github.com/minio/minio/pkg/env"
|
||||
iampolicy "github.com/minio/minio/pkg/iam/policy"
|
||||
)
|
||||
|
||||
const (
|
||||
bucketQuotaConfigFile = "quota.json"
|
||||
)
|
||||
|
||||
// PutBucketQuotaConfigHandler - PUT Bucket quota configuration.
|
||||
// ----------
|
||||
// Places a quota configuration on the specified bucket. The quota
|
||||
// specified in the quota configuration will be applied by default
|
||||
// to enforce total quota for the specified bucket.
|
||||
func (a adminAPIHandlers) PutBucketQuotaConfigHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "PutBucketQuotaConfig")
|
||||
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SetBucketQuotaAdminAction)
|
||||
if objectAPI == nil {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
bucket := vars["bucket"]
|
||||
|
||||
// Turn off quota commands if data usage info is unavailable.
|
||||
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminBucketQuotaDisabled), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
defer r.Body.Close()
|
||||
data, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL)
|
||||
return
|
||||
}
|
||||
quotaCfg, err := parseBucketQuota(data)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
configFile := path.Join(bucketConfigPrefix, bucket, bucketQuotaConfigFile)
|
||||
if err = saveConfig(ctx, objectAPI, configFile, data); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
if quotaCfg.Quota > 0 {
|
||||
globalBucketQuotaSys.Set(bucket, quotaCfg)
|
||||
globalNotificationSys.PutBucketQuotaConfig(ctx, bucket, quotaCfg)
|
||||
|
||||
} else {
|
||||
globalBucketQuotaSys.Remove(bucket)
|
||||
globalNotificationSys.RemoveBucketQuotaConfig(ctx, bucket)
|
||||
|
||||
}
|
||||
|
||||
// Write success response.
|
||||
writeSuccessResponseHeadersOnly(w)
|
||||
}
|
||||
|
||||
// GetBucketQuotaConfigHandler - gets bucket quota configuration
|
||||
func (a adminAPIHandlers) GetBucketQuotaConfigHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "GetBucketQuotaConfig")
|
||||
|
||||
objectAPI, _ := validateAdminUsersReq(ctx, w, r, iampolicy.GetBucketQuotaAdminAction)
|
||||
if objectAPI == nil {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
bucket := vars["bucket"]
|
||||
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
configFile := path.Join(bucketConfigPrefix, bucket, bucketQuotaConfigFile)
|
||||
configData, err := readConfig(ctx, objectAPI, configFile)
|
||||
if err != nil {
|
||||
if err != errConfigNotFound {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, BucketQuotaConfigNotFound{Bucket: bucket}), r.URL)
|
||||
return
|
||||
}
|
||||
// Write success response.
|
||||
writeSuccessResponseJSON(w, configData)
|
||||
}
|
||||
|
||||
// RemoveBucketQuotaConfigHandler - removes Bucket quota configuration.
|
||||
// ----------
|
||||
// Removes quota configuration on the specified bucket.
|
||||
func (a adminAPIHandlers) RemoveBucketQuotaConfigHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "RemoveBucketQuotaConfig")
|
||||
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SetBucketQuotaAdminAction)
|
||||
if objectAPI == nil {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
||||
return
|
||||
}
|
||||
vars := mux.Vars(r)
|
||||
bucket := vars["bucket"]
|
||||
|
||||
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
configFile := path.Join(bucketConfigPrefix, bucket, bucketQuotaConfigFile)
|
||||
if err := deleteConfig(ctx, objectAPI, configFile); err != nil {
|
||||
if err != errConfigNotFound {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, BucketQuotaConfigNotFound{Bucket: bucket}), r.URL)
|
||||
return
|
||||
}
|
||||
globalBucketQuotaSys.Remove(bucket)
|
||||
globalNotificationSys.RemoveBucketQuotaConfig(ctx, bucket)
|
||||
// Write success response.
|
||||
writeSuccessNoContent(w)
|
||||
}
|
||||
@@ -86,7 +86,7 @@ func prepareAdminXLTestBed(ctx context.Context) (*adminXLTestBed, error) {
|
||||
|
||||
// Setup admin mgmt REST API handlers.
|
||||
adminRouter := mux.NewRouter()
|
||||
registerAdminRouter(adminRouter, true, true)
|
||||
registerAdminRouter(adminRouter, true, true, false)
|
||||
|
||||
return &adminXLTestBed{
|
||||
xlDirs: xlDirs,
|
||||
|
||||
@@ -35,7 +35,7 @@ const (
|
||||
type adminAPIHandlers struct{}
|
||||
|
||||
// registerAdminRouter - Add handler functions for each service REST API routes.
|
||||
func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool) {
|
||||
func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps, enableBucketQuotaOps bool) {
|
||||
|
||||
adminAPI := adminAPIHandlers{}
|
||||
// Admin router
|
||||
@@ -166,7 +166,19 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool)
|
||||
|
||||
// Set Group Status
|
||||
adminRouter.Methods(http.MethodPut).Path(adminVersion+"/set-group-status").HandlerFunc(httpTraceHdrs(adminAPI.SetGroupStatus)).Queries("group", "{group:.*}").Queries("status", "{status:.*}")
|
||||
}
|
||||
|
||||
// Quota operations
|
||||
if enableConfigOps && enableBucketQuotaOps {
|
||||
// GetBucketQuotaConfig
|
||||
adminRouter.Methods(http.MethodGet).Path(adminVersion+"/get-bucket-quota").HandlerFunc(
|
||||
httpTraceHdrs(adminAPI.GetBucketQuotaConfigHandler)).Queries("bucket", "{bucket:.*}")
|
||||
// PutBucketQuotaConfig
|
||||
adminRouter.Methods(http.MethodPut).Path(adminVersion+"/set-bucket-quota").HandlerFunc(
|
||||
httpTraceHdrs(adminAPI.PutBucketQuotaConfigHandler)).Queries("bucket", "{bucket:.*}")
|
||||
// RemoveBucketQuotaConfig
|
||||
adminRouter.Methods(http.MethodDelete).Path(adminVersion+"/remove-bucket-quota").HandlerFunc(
|
||||
httpTraceHdrs(adminAPI.RemoveBucketQuotaConfigHandler)).Queries("bucket", "{bucket:.*}")
|
||||
}
|
||||
|
||||
// -- Top APIs --
|
||||
|
||||
@@ -235,6 +235,10 @@ const (
|
||||
ErrAdminCredentialsMismatch
|
||||
ErrInsecureClientRequest
|
||||
ErrObjectTampered
|
||||
// Bucket Quota error codes
|
||||
ErrAdminBucketQuotaExceeded
|
||||
ErrAdminNoSuchQuotaConfiguration
|
||||
ErrAdminBucketQuotaDisabled
|
||||
|
||||
ErrHealNotImplemented
|
||||
ErrHealNoSuchProcess
|
||||
@@ -1089,6 +1093,21 @@ var errorCodes = errorCodeMap{
|
||||
Description: "Credentials in config mismatch with server environment variables",
|
||||
HTTPStatusCode: http.StatusServiceUnavailable,
|
||||
},
|
||||
ErrAdminBucketQuotaExceeded: {
|
||||
Code: "XMinioAdminBucketQuotaExceeded",
|
||||
Description: "Bucket quota exceeded",
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
},
|
||||
ErrAdminNoSuchQuotaConfiguration: {
|
||||
Code: "XMinioAdminNoSuchQuotaConfiguration",
|
||||
Description: "The quota configuration does not exist",
|
||||
HTTPStatusCode: http.StatusNotFound,
|
||||
},
|
||||
ErrAdminBucketQuotaDisabled: {
|
||||
Code: "XMinioAdminBucketQuotaDisabled",
|
||||
Description: "Quota specified but disk usage crawl is disabled on MinIO server",
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
},
|
||||
ErrInsecureClientRequest: {
|
||||
Code: "XMinioInsecureClientRequest",
|
||||
Description: "Cannot respond to plain-text request from TLS-encrypted server",
|
||||
@@ -1783,6 +1802,10 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
|
||||
apiErr = ErrNoSuchLifecycleConfiguration
|
||||
case BucketSSEConfigNotFound:
|
||||
apiErr = ErrNoSuchBucketSSEConfig
|
||||
case BucketQuotaConfigNotFound:
|
||||
apiErr = ErrAdminNoSuchQuotaConfiguration
|
||||
case BucketQuotaExceeded:
|
||||
apiErr = ErrAdminBucketQuotaExceeded
|
||||
case *event.ErrInvalidEventName:
|
||||
apiErr = ErrEventNotification
|
||||
case *event.ErrInvalidARN:
|
||||
|
||||
@@ -89,7 +89,7 @@ func lifecycleRound(ctx context.Context, objAPI ObjectLayer) error {
|
||||
}
|
||||
// Find the action that need to be executed
|
||||
if l.ComputeAction(obj.Name, obj.UserTags, obj.ModTime) == lifecycle.DeleteAction {
|
||||
if bucketHasLockConfig && enforceRetentionForLifecycle(ctx, obj) {
|
||||
if bucketHasLockConfig && enforceRetentionForDeletion(ctx, obj) {
|
||||
continue
|
||||
}
|
||||
objects = append(objects, obj.Name)
|
||||
|
||||
@@ -245,7 +245,7 @@ func (c *diskCache) purge(ctx context.Context) {
|
||||
// need to be cleaned up.
|
||||
expiry := UTCNow().Add(-cacheExpiryDays)
|
||||
// defaulting max hits count to 100
|
||||
scorer, err := newFileScorer(int64(toFree), time.Now().Unix(), 100)
|
||||
scorer, err := newFileScorer(toFree, time.Now().Unix(), 100)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return
|
||||
|
||||
@@ -285,7 +285,7 @@ func isMetadataSame(m1, m2 map[string]string) bool {
|
||||
}
|
||||
|
||||
type fileScorer struct {
|
||||
saveBytes int64
|
||||
saveBytes uint64
|
||||
now int64
|
||||
maxHits int
|
||||
// 1/size for consistent score.
|
||||
@@ -294,21 +294,21 @@ type fileScorer struct {
|
||||
// queue is a linked list of files we want to delete.
|
||||
// The list is kept sorted according to score, highest at top, lowest at bottom.
|
||||
queue list.List
|
||||
queuedBytes int64
|
||||
queuedBytes uint64
|
||||
}
|
||||
|
||||
type queuedFile struct {
|
||||
name string
|
||||
size int64
|
||||
size uint64
|
||||
score float64
|
||||
}
|
||||
|
||||
// newFileScorer allows to collect files to save a specific number of bytes.
|
||||
// Each file is assigned a score based on its age, size and number of hits.
|
||||
// A list of files is maintained
|
||||
func newFileScorer(saveBytes int64, now int64, maxHits int) (*fileScorer, error) {
|
||||
if saveBytes <= 0 {
|
||||
return nil, errors.New("newFileScorer: saveBytes <= 0")
|
||||
func newFileScorer(saveBytes uint64, now int64, maxHits int) (*fileScorer, error) {
|
||||
if saveBytes == 0 {
|
||||
return nil, errors.New("newFileScorer: saveBytes = 0")
|
||||
}
|
||||
if now < 0 {
|
||||
return nil, errors.New("newFileScorer: now < 0")
|
||||
@@ -325,7 +325,7 @@ func (f *fileScorer) addFile(name string, lastAccess time.Time, size int64, hits
|
||||
// Calculate how much we want to delete this object.
|
||||
file := queuedFile{
|
||||
name: name,
|
||||
size: size,
|
||||
size: uint64(size),
|
||||
}
|
||||
score := float64(f.now - lastAccess.Unix())
|
||||
// Size as fraction of how much we want to save, 0->1.
|
||||
@@ -353,7 +353,11 @@ func (f *fileScorer) addFile(name string, lastAccess time.Time, size int64, hits
|
||||
// Returns true if there still is a need to delete files (saveBytes >0),
|
||||
// false if no more bytes needs to be saved.
|
||||
func (f *fileScorer) adjustSaveBytes(n int64) bool {
|
||||
f.saveBytes += n
|
||||
if n < 0 {
|
||||
f.saveBytes -= ^uint64(n - 1)
|
||||
} else {
|
||||
f.saveBytes += uint64(n)
|
||||
}
|
||||
if f.saveBytes <= 0 {
|
||||
f.queue.Init()
|
||||
f.saveBytes = 0
|
||||
|
||||
@@ -176,10 +176,11 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
|
||||
}
|
||||
|
||||
enableIAMOps := globalEtcdClient != nil
|
||||
enableBucketQuotaOps := env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn
|
||||
|
||||
// Enable IAM admin APIs if etcd is enabled, if not just enable basic
|
||||
// operations such as profiling, server info etc.
|
||||
registerAdminRouter(router, enableConfigOps, enableIAMOps)
|
||||
registerAdminRouter(router, enableConfigOps, enableIAMOps, enableBucketQuotaOps)
|
||||
|
||||
// Add healthcheck router
|
||||
registerHealthCheckRouter(router)
|
||||
|
||||
@@ -36,6 +36,7 @@ import (
|
||||
xhttp "github.com/minio/minio/cmd/http"
|
||||
"github.com/minio/minio/pkg/auth"
|
||||
objectlock "github.com/minio/minio/pkg/bucket/object/lock"
|
||||
|
||||
"github.com/minio/minio/pkg/certs"
|
||||
"github.com/minio/minio/pkg/event"
|
||||
"github.com/minio/minio/pkg/pubsub"
|
||||
@@ -217,6 +218,9 @@ var (
|
||||
|
||||
globalBucketObjectLockConfig = objectlock.NewBucketObjectLockConfig()
|
||||
|
||||
globalBucketQuotaSys = NewBucketQuotaSys()
|
||||
globalBucketStorageCache bucketStorageCache
|
||||
|
||||
// Disk cache drives
|
||||
globalCacheConfig cache.Config
|
||||
|
||||
|
||||
@@ -567,6 +567,7 @@ func (sys *NotificationSys) SetBucketPolicy(ctx context.Context, bucketName stri
|
||||
func (sys *NotificationSys) DeleteBucket(ctx context.Context, bucketName string) {
|
||||
globalNotificationSys.RemoveNotification(bucketName)
|
||||
globalBucketObjectLockConfig.Remove(bucketName)
|
||||
globalBucketQuotaSys.Remove(bucketName)
|
||||
globalPolicySys.Remove(bucketName)
|
||||
globalLifecycleSys.Remove(bucketName)
|
||||
|
||||
@@ -619,6 +620,23 @@ func (sys *NotificationSys) RemoveBucketObjectLockConfig(ctx context.Context, bu
|
||||
}()
|
||||
}
|
||||
|
||||
// RemoveBucketQuotaConfig - calls RemoveBucketQuotaConfig RPC call on all peers.
|
||||
func (sys *NotificationSys) RemoveBucketQuotaConfig(ctx context.Context, bucketName string) {
|
||||
go func() {
|
||||
ng := WithNPeers(len(sys.peerClients))
|
||||
for idx, client := range sys.peerClients {
|
||||
if client == nil {
|
||||
continue
|
||||
}
|
||||
client := client
|
||||
ng.Go(ctx, func() error {
|
||||
return client.RemoveBucketQuotaConfig(bucketName)
|
||||
}, idx, *client.host)
|
||||
}
|
||||
ng.Wait()
|
||||
}()
|
||||
}
|
||||
|
||||
// SetBucketLifecycle - calls SetBucketLifecycle on all peers.
|
||||
func (sys *NotificationSys) SetBucketLifecycle(ctx context.Context, bucketName string,
|
||||
bucketLifecycle *lifecycle.Lifecycle) {
|
||||
@@ -906,6 +924,26 @@ func (sys *NotificationSys) PutBucketObjectLockConfig(ctx context.Context, bucke
|
||||
}
|
||||
}
|
||||
|
||||
// PutBucketQuotaConfig - put bucket quota configuration to all peers.
|
||||
func (sys *NotificationSys) PutBucketQuotaConfig(ctx context.Context, bucketName string, q madmin.BucketQuota) {
|
||||
g := errgroup.WithNErrs(len(sys.peerClients))
|
||||
for index, client := range sys.peerClients {
|
||||
if client == nil {
|
||||
continue
|
||||
}
|
||||
index := index
|
||||
g.Go(func() error {
|
||||
return sys.peerClients[index].PutBucketQuotaConfig(bucketName, q)
|
||||
}, index)
|
||||
}
|
||||
for i, err := range g.Wait() {
|
||||
if err != nil {
|
||||
logger.GetReqInfo(ctx).AppendTags("remotePeer", sys.peerClients[i].host.String())
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NetOBDInfo - Net OBD information
|
||||
func (sys *NotificationSys) NetOBDInfo(ctx context.Context) madmin.ServerNetOBDInfo {
|
||||
var sortedGlobalEndpoints []string
|
||||
|
||||
@@ -269,6 +269,20 @@ func (e BucketSSEConfigNotFound) Error() string {
|
||||
return "No bucket encryption found for bucket: " + e.Bucket
|
||||
}
|
||||
|
||||
// BucketQuotaConfigNotFound - no bucket quota config found.
|
||||
type BucketQuotaConfigNotFound GenericError
|
||||
|
||||
func (e BucketQuotaConfigNotFound) Error() string {
|
||||
return "No quota config found for bucket : " + e.Bucket
|
||||
}
|
||||
|
||||
// BucketQuotaExceeded - bucket quota exceeded.
|
||||
type BucketQuotaExceeded GenericError
|
||||
|
||||
func (e BucketQuotaExceeded) Error() string {
|
||||
return "Bucket quota exceeded for bucket: " + e.Bucket
|
||||
}
|
||||
|
||||
/// Bucket related errors.
|
||||
|
||||
// BucketNameInvalid - bucketname provided is invalid.
|
||||
|
||||
@@ -899,6 +899,10 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
}
|
||||
length = actualSize
|
||||
}
|
||||
if err := enforceBucketQuota(ctx, dstBucket, actualSize); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
|
||||
var compressMetadata map[string]string
|
||||
// No need to compress for remote etcd calls
|
||||
@@ -1328,7 +1332,10 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
sha256hex = getContentSha256Cksum(r, serviceS3)
|
||||
}
|
||||
}
|
||||
|
||||
if err := enforceBucketQuota(ctx, bucket, size); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
// Check if bucket encryption is enabled
|
||||
_, encEnabled := globalBucketSSEConfigSys.Get(bucket)
|
||||
// This request header needs to be set prior to setting ObjectOptions
|
||||
@@ -1762,7 +1769,10 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if err := enforceBucketQuota(ctx, dstBucket, actualPartSize); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
// Special care for CopyObjectPart
|
||||
if partRangeErr := checkCopyPartRangeWithSize(rs, actualPartSize); partRangeErr != nil {
|
||||
writeCopyPartErr(ctx, w, partRangeErr, r.URL, guessIsBrowserReq(r))
|
||||
@@ -2043,6 +2053,11 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
||||
}
|
||||
}
|
||||
|
||||
if err := enforceBucketQuota(ctx, bucket, size); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
|
||||
actualSize := size
|
||||
|
||||
// get encryption options
|
||||
|
||||
@@ -98,9 +98,9 @@ func enforceRetentionBypassForDeleteWeb(ctx context.Context, r *http.Request, bu
|
||||
return ErrNone
|
||||
}
|
||||
|
||||
// enforceRetentionForLifecycle checks if it is appropriate to remove an
|
||||
// object according to locking configuration when this is lifecycle asking.
|
||||
func enforceRetentionForLifecycle(ctx context.Context, objInfo ObjectInfo) (locked bool) {
|
||||
// enforceRetentionForDeletion checks if it is appropriate to remove an
|
||||
// object according to locking configuration when this is lifecycle/ bucket quota asking.
|
||||
func enforceRetentionForDeletion(ctx context.Context, objInfo ObjectInfo) (locked bool) {
|
||||
lhold := objectlock.GetObjectLegalHoldMeta(objInfo.UserDefined)
|
||||
if lhold.Status.Valid() && lhold.Status == objectlock.LegalHoldOn {
|
||||
return true
|
||||
|
||||
@@ -543,6 +543,18 @@ func (client *peerRESTClient) cycleServerBloomFilter(ctx context.Context, req bl
|
||||
return &resp, gob.NewDecoder(respBody).Decode(&resp)
|
||||
}
|
||||
|
||||
// RemoveBucketQuotaConfig - Remove bucket quota config on the peer node.
|
||||
func (client *peerRESTClient) RemoveBucketQuotaConfig(bucket string) error {
|
||||
values := make(url.Values)
|
||||
values.Set(peerRESTBucket, bucket)
|
||||
respBody, err := client.call(peerRESTMethodBucketQuotaConfigRemove, values, nil, -1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetBucketPolicy - Set bucket policy on the peer node.
|
||||
func (client *peerRESTClient) SetBucketPolicy(bucket string, bucketPolicy *policy.Policy) error {
|
||||
values := make(url.Values)
|
||||
@@ -662,6 +674,25 @@ func (client *peerRESTClient) PutBucketObjectLockConfig(bucket string, retention
|
||||
return nil
|
||||
}
|
||||
|
||||
// PutBucketQuotaConfig - PUT bucket quota configuration.
|
||||
func (client *peerRESTClient) PutBucketQuotaConfig(bucket string, q madmin.BucketQuota) error {
|
||||
values := make(url.Values)
|
||||
values.Set(peerRESTBucket, bucket)
|
||||
|
||||
var reader bytes.Buffer
|
||||
err := gob.NewEncoder(&reader).Encode(&q)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
respBody, err := client.call(peerRESTMethodPutBucketQuotaConfig, values, &reader, -1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeletePolicy - delete a specific canned policy.
|
||||
func (client *peerRESTClient) DeletePolicy(policyName string) (err error) {
|
||||
values := make(url.Values)
|
||||
|
||||
@@ -63,6 +63,8 @@ const (
|
||||
peerRESTMethodBucketEncryptionRemove = "/removebucketencryption"
|
||||
peerRESTMethodPutBucketObjectLockConfig = "/putbucketobjectlockconfig"
|
||||
peerRESTMethodBucketObjectLockConfigRemove = "/removebucketobjectlockconfig"
|
||||
peerRESTMethodPutBucketQuotaConfig = "/putbucketquotaconfig"
|
||||
peerRESTMethodBucketQuotaConfigRemove = "/removebucketquotaconfig"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -599,6 +599,7 @@ func (s *peerRESTServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Requ
|
||||
globalNotificationSys.RemoveNotification(bucketName)
|
||||
globalPolicySys.Remove(bucketName)
|
||||
globalBucketObjectLockConfig.Remove(bucketName)
|
||||
globalBucketQuotaSys.Remove(bucketName)
|
||||
globalLifecycleSys.Remove(bucketName)
|
||||
|
||||
w.(http.Flusher).Flush()
|
||||
@@ -875,6 +876,54 @@ func (s *peerRESTServer) PutBucketObjectLockConfigHandler(w http.ResponseWriter,
|
||||
w.(http.Flusher).Flush()
|
||||
}
|
||||
|
||||
// PutBucketQuotaConfigHandler - handles PUT bucket quota configuration.
|
||||
func (s *peerRESTServer) PutBucketQuotaConfigHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
s.writeErrorResponse(w, errors.New("Invalid request"))
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
bucketName := vars[peerRESTBucket]
|
||||
if bucketName == "" {
|
||||
s.writeErrorResponse(w, errors.New("Bucket name is missing"))
|
||||
return
|
||||
}
|
||||
|
||||
var quota madmin.BucketQuota
|
||||
if r.ContentLength < 0 {
|
||||
s.writeErrorResponse(w, errInvalidArgument)
|
||||
return
|
||||
}
|
||||
|
||||
err := gob.NewDecoder(r.Body).Decode("a)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
globalBucketQuotaSys.Set(bucketName, quota)
|
||||
w.(http.Flusher).Flush()
|
||||
}
|
||||
|
||||
// RemoveBucketQuotaConfigHandler - handles DELETE bucket quota configuration.
|
||||
func (s *peerRESTServer) RemoveBucketQuotaConfigHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
s.writeErrorResponse(w, errors.New("Invalid request"))
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
bucketName := vars[peerRESTBucket]
|
||||
if bucketName == "" {
|
||||
s.writeErrorResponse(w, errors.New("Bucket name is missing"))
|
||||
return
|
||||
}
|
||||
|
||||
globalBucketQuotaSys.Remove(bucketName)
|
||||
w.(http.Flusher).Flush()
|
||||
}
|
||||
|
||||
// ServerUpdateHandler - updates the current server.
|
||||
func (s *peerRESTServer) ServerUpdateHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
@@ -1181,4 +1230,6 @@ func registerPeerRESTHandlers(router *mux.Router) {
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLog).HandlerFunc(server.ConsoleLogHandler)
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodPutBucketObjectLockConfig).HandlerFunc(httpTraceHdrs(server.PutBucketObjectLockConfigHandler)).Queries(restQueries(peerRESTBucket)...)
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketObjectLockConfigRemove).HandlerFunc(httpTraceHdrs(server.RemoveBucketObjectLockConfigHandler)).Queries(restQueries(peerRESTBucket)...)
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodPutBucketQuotaConfig).HandlerFunc(httpTraceHdrs(server.PutBucketQuotaConfigHandler)).Queries(restQueries(peerRESTBucket)...)
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketQuotaConfigRemove).HandlerFunc(httpTraceHdrs(server.RemoveBucketQuotaConfigHandler)).Queries(restQueries(peerRESTBucket)...)
|
||||
}
|
||||
|
||||
267
cmd/quota.go
Normal file
267
cmd/quota.go
Normal file
@@ -0,0 +1,267 @@
|
||||
/*
|
||||
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/cmd/config"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/env"
|
||||
"github.com/minio/minio/pkg/event"
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
)
|
||||
|
||||
// BucketQuotaSys - map of bucket and quota configuration.
|
||||
type BucketQuotaSys struct {
|
||||
sync.RWMutex
|
||||
quotaMap map[string]madmin.BucketQuota
|
||||
}
|
||||
|
||||
// Set - set quota configuration.
|
||||
func (sys *BucketQuotaSys) Set(bucketName string, q madmin.BucketQuota) {
|
||||
sys.Lock()
|
||||
sys.quotaMap[bucketName] = q
|
||||
sys.Unlock()
|
||||
}
|
||||
|
||||
// Get - Get quota configuration.
|
||||
func (sys *BucketQuotaSys) Get(bucketName string) (q madmin.BucketQuota, ok bool) {
|
||||
sys.RLock()
|
||||
defer sys.RUnlock()
|
||||
q, ok = sys.quotaMap[bucketName]
|
||||
return
|
||||
}
|
||||
|
||||
// Remove - removes quota configuration.
|
||||
func (sys *BucketQuotaSys) Remove(bucketName string) {
|
||||
sys.Lock()
|
||||
delete(sys.quotaMap, bucketName)
|
||||
sys.Unlock()
|
||||
}
|
||||
|
||||
// Exists - bucketName has Quota config set
|
||||
func (sys *BucketQuotaSys) Exists(bucketName string) bool {
|
||||
sys.RLock()
|
||||
_, ok := sys.quotaMap[bucketName]
|
||||
sys.RUnlock()
|
||||
return ok
|
||||
}
|
||||
|
||||
// Keys - list of buckets with quota configuration
|
||||
func (sys *BucketQuotaSys) Keys() []string {
|
||||
sys.RLock()
|
||||
defer sys.RUnlock()
|
||||
var keys []string
|
||||
for k := range sys.quotaMap {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
// NewBucketQuotaSys returns initialized BucketQuotaSys
|
||||
func NewBucketQuotaSys() *BucketQuotaSys {
|
||||
return &BucketQuotaSys{quotaMap: map[string]madmin.BucketQuota{}}
|
||||
}
|
||||
|
||||
// parseBucketQuota parses BucketQuota from json
|
||||
func parseBucketQuota(data []byte) (quotaCfg madmin.BucketQuota, err error) {
|
||||
err = json.Unmarshal(data, "aCfg)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if !quotaCfg.Type.IsValid() {
|
||||
return quotaCfg, fmt.Errorf("Invalid quota type %s", quotaCfg.Type)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type bucketStorageCache struct {
|
||||
bucketsSizes map[string]uint64
|
||||
lastUpdate time.Time
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (b *bucketStorageCache) check(ctx context.Context, q madmin.BucketQuota, bucket string, size int64) error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if time.Since(b.lastUpdate) > 10*time.Second {
|
||||
dui, err := loadDataUsageFromBackend(ctx, newObjectLayerWithoutSafeModeFn())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.lastUpdate = time.Now()
|
||||
b.bucketsSizes = dui.BucketsSizes
|
||||
}
|
||||
currUsage := b.bucketsSizes[bucket]
|
||||
if (currUsage + uint64(size)) > q.Quota {
|
||||
return BucketQuotaExceeded{Bucket: bucket}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func enforceBucketQuota(ctx context.Context, bucket string, size int64) error {
|
||||
if size < 0 {
|
||||
return nil
|
||||
}
|
||||
q, ok := globalBucketQuotaSys.Get(bucket)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
return globalBucketStorageCache.check(ctx, q, bucket, size)
|
||||
}
|
||||
|
||||
func initBucketQuotaSys(buckets []BucketInfo, objAPI ObjectLayer) error {
|
||||
for _, bucket := range buckets {
|
||||
ctx := logger.SetReqInfo(GlobalContext, &logger.ReqInfo{BucketName: bucket.Name})
|
||||
configFile := path.Join(bucketConfigPrefix, bucket.Name, bucketQuotaConfigFile)
|
||||
configData, err := readConfig(ctx, objAPI, configFile)
|
||||
if err != nil {
|
||||
if errors.Is(err, errConfigNotFound) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
quotaCfg, err := parseBucketQuota(configData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
globalBucketQuotaSys.Set(bucket.Name, quotaCfg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
bgQuotaInterval = 1 * time.Hour
|
||||
)
|
||||
|
||||
// initQuotaEnforcement starts the routine that deletes objects in bucket
|
||||
// that exceeds the FIFO quota
|
||||
func initQuotaEnforcement(ctx context.Context, objAPI ObjectLayer) {
|
||||
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn {
|
||||
go startBucketQuotaEnforcement(ctx, objAPI)
|
||||
}
|
||||
}
|
||||
|
||||
func startBucketQuotaEnforcement(ctx context.Context, objAPI ObjectLayer) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.NewTimer(bgQuotaInterval).C:
|
||||
logger.LogIf(ctx, enforceFIFOQuota(ctx, objAPI))
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// enforceFIFOQuota deletes objects in FIFO order until sufficient objects
|
||||
// have been deleted so as to bring bucket usage within quota
|
||||
func enforceFIFOQuota(ctx context.Context, objectAPI ObjectLayer) error {
|
||||
// Turn off quota enforcement if data usage info is unavailable.
|
||||
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff {
|
||||
return nil
|
||||
}
|
||||
for _, bucket := range globalBucketQuotaSys.Keys() {
|
||||
// Check if the current bucket has quota restrictions, if not skip it
|
||||
cfg, ok := globalBucketQuotaSys.Get(bucket)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if cfg.Type != madmin.FIFOQuota {
|
||||
continue
|
||||
}
|
||||
_, bucketHasLockConfig := globalBucketObjectLockConfig.Get(bucket)
|
||||
|
||||
dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var toFree uint64
|
||||
if dataUsageInfo.BucketsSizes[bucket] > cfg.Quota {
|
||||
toFree = dataUsageInfo.BucketsSizes[bucket] - cfg.Quota
|
||||
}
|
||||
if toFree <= 0 {
|
||||
continue
|
||||
}
|
||||
// Allocate new results channel to receive ObjectInfo.
|
||||
objInfoCh := make(chan ObjectInfo)
|
||||
|
||||
// Walk through all objects
|
||||
if err := objectAPI.Walk(ctx, bucket, "", objInfoCh); err != nil {
|
||||
return err
|
||||
}
|
||||
// reuse the fileScorer used by disk cache to score entries by
|
||||
// ModTime to find the oldest objects in bucket to delete. In
|
||||
// the context of bucket quota enforcement - number of hits are
|
||||
// irrelevant.
|
||||
scorer, err := newFileScorer(toFree, time.Now().Unix(), 1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for obj := range objInfoCh {
|
||||
// skip objects currently under retention
|
||||
if bucketHasLockConfig && enforceRetentionForDeletion(ctx, obj) {
|
||||
continue
|
||||
}
|
||||
scorer.addFile(obj.Name, obj.ModTime, obj.Size, 1)
|
||||
}
|
||||
var objects []string
|
||||
numKeys := len(scorer.fileNames())
|
||||
for i, key := range scorer.fileNames() {
|
||||
objects = append(objects, key)
|
||||
if len(objects) < maxObjectList && (i < numKeys-1) {
|
||||
// skip deletion until maxObjectList or end of slice
|
||||
continue
|
||||
}
|
||||
|
||||
if len(objects) == 0 {
|
||||
break
|
||||
}
|
||||
// Deletes a list of objects.
|
||||
deleteErrs, err := objectAPI.DeleteObjects(ctx, bucket, objects)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
} else {
|
||||
for i := range deleteErrs {
|
||||
if deleteErrs[i] != nil {
|
||||
logger.LogIf(ctx, deleteErrs[i])
|
||||
continue
|
||||
}
|
||||
// Notify object deleted event.
|
||||
sendEvent(eventArgs{
|
||||
EventName: event.ObjectRemovedDelete,
|
||||
BucketName: bucket,
|
||||
Object: ObjectInfo{
|
||||
Name: objects[i],
|
||||
},
|
||||
Host: "Internal: [FIFO-QUOTA-EXPIRY]",
|
||||
})
|
||||
}
|
||||
objects = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -81,7 +81,7 @@ var globalHandlers = []HandlerFunc{
|
||||
}
|
||||
|
||||
// configureServer handler returns final handler for the http server.
|
||||
func configureServerHandler(endpointZones EndpointZones) (http.Handler, error) {
|
||||
func configureServerHandler(endpointZones EndpointZones, enableBucketQuotaOps bool) (http.Handler, error) {
|
||||
// Initialize router. `SkipClean(true)` stops gorilla/mux from
|
||||
// normalizing URL path minio/minio#3256
|
||||
router := mux.NewRouter().SkipClean(true).UseEncodedPath()
|
||||
@@ -95,7 +95,7 @@ func configureServerHandler(endpointZones EndpointZones) (http.Handler, error) {
|
||||
registerSTSRouter(router)
|
||||
|
||||
// Add Admin router, all APIs are enabled in server mode.
|
||||
registerAdminRouter(router, true, true)
|
||||
registerAdminRouter(router, true, true, enableBucketQuotaOps)
|
||||
|
||||
// Add healthcheck router
|
||||
registerHealthCheckRouter(router)
|
||||
|
||||
@@ -275,6 +275,10 @@ func initAllSubsystems(buckets []BucketInfo, newObject ObjectLayer) (err error)
|
||||
if err = initBucketObjectLockConfig(buckets, newObject); err != nil {
|
||||
return fmt.Errorf("Unable to initialize object lock system: %w", err)
|
||||
}
|
||||
// Initialize bucket quota system.
|
||||
if err = initBucketQuotaSys(buckets, newObject); err != nil {
|
||||
return fmt.Errorf("Unable to initialize bucket quota system: %w", err)
|
||||
}
|
||||
|
||||
// Initialize lifecycle system.
|
||||
if err = globalLifecycleSys.Init(buckets, newObject); err != nil {
|
||||
@@ -308,6 +312,7 @@ func startBackgroundOps(ctx context.Context, objAPI ObjectLayer) {
|
||||
|
||||
initDataUsageStats(ctx, objAPI)
|
||||
initDailyLifecycle(ctx, objAPI)
|
||||
initQuotaEnforcement(ctx, objAPI)
|
||||
}
|
||||
|
||||
// serverMain handler called for 'minio server' command.
|
||||
@@ -384,7 +389,8 @@ func serverMain(ctx *cli.Context) {
|
||||
|
||||
// Configure server.
|
||||
var handler http.Handler
|
||||
handler, err = configureServerHandler(globalEndpoints)
|
||||
enableBucketQuotaOps := env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn
|
||||
handler, err = configureServerHandler(globalEndpoints, enableBucketQuotaOps)
|
||||
if err != nil {
|
||||
logger.Fatal(config.ErrUnexpectedError(err), "Unable to configure one of server's RPC services")
|
||||
}
|
||||
|
||||
@@ -322,7 +322,7 @@ func UnstartedTestServer(t TestErrHandler, instanceType string) TestServer {
|
||||
testServer.AccessKey = credentials.AccessKey
|
||||
testServer.SecretKey = credentials.SecretKey
|
||||
|
||||
httpHandler, err := configureServerHandler(testServer.Disks)
|
||||
httpHandler, err := configureServerHandler(testServer.Disks, false)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to configure one of the RPC services <ERROR> %s", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user