mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
allow quota enforcement to rely on older values (#17351)
PUT calls cannot afford to have large latency build-ups due to contentious usage.json, or worse letting them fail with some unexpected error, this can happen when this file is concurrently being updated via scanner or it is being healed during a disk replacement heal. However, these are fairly quick in theory, stressed clusters can quickly show visible latency this can add up leading to invalid errors returned during PUT. It is perhaps okay for us to relax this error return requirement instead, make sure that we log that we are proceeding to take in the requests while the quota is using an older value for the quota enforcement. These things will reconcile themselves eventually, via scanner making sure to overwrite the usage.json. Bonus: make sure that storage-rest-client sets ExpectTimeouts to be 'true', such that DiskInfo() call with contextTimeout does not prematurely disconnect the servers leading to a longer healthCheck, back-off routine. This can easily pile up while also causing active callers to disconnect, leading to quorum loss. DiskInfo is actively used in the PUT, Multipart call path for upgrading parity when disks are down, it in-turn shouldn't cause more disks to go down.
This commit is contained in:
parent
75c6fc4f02
commit
2f9e2147f5
@ -47,9 +47,13 @@ func NewBucketQuotaSys() *BucketQuotaSys {
|
||||
// Init initialize bucket quota.
|
||||
func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) {
|
||||
sys.bucketStorageCache.Once.Do(func() {
|
||||
sys.bucketStorageCache.TTL = 1 * time.Second
|
||||
// Set this to 10 secs since its enough, as scanner
|
||||
// does not update the bucket usage values frequently.
|
||||
sys.bucketStorageCache.TTL = 10 * time.Second
|
||||
// Rely on older value if usage loading fails from disk.
|
||||
sys.bucketStorageCache.Relax = true
|
||||
sys.bucketStorageCache.Update = func() (interface{}, error) {
|
||||
ctx, done := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
ctx, done := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer done()
|
||||
|
||||
return loadDataUsageFromBackend(ctx, objAPI)
|
||||
@ -60,16 +64,20 @@ func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) {
|
||||
// GetBucketUsageInfo return bucket usage info for a given bucket
|
||||
func (sys *BucketQuotaSys) GetBucketUsageInfo(bucket string) (BucketUsageInfo, error) {
|
||||
v, err := sys.bucketStorageCache.Get()
|
||||
if err != nil {
|
||||
return BucketUsageInfo{}, err
|
||||
if err != nil && v != nil {
|
||||
logger.LogIf(GlobalContext, fmt.Errorf("unable to retrieve usage information for bucket: %s, relying on older value cached in-memory: err(%v)", bucket, err))
|
||||
}
|
||||
if v == nil {
|
||||
logger.LogIf(GlobalContext, errors.New("unable to retrieve usage information for bucket: %s, no reliable usage value available - quota will not be enforced"))
|
||||
}
|
||||
|
||||
var bui BucketUsageInfo
|
||||
|
||||
dui, ok := v.(DataUsageInfo)
|
||||
if !ok {
|
||||
return BucketUsageInfo{}, fmt.Errorf("internal error: Unexpected DUI data type: %T", v)
|
||||
if ok {
|
||||
bui = dui.BucketsUsage[bucket]
|
||||
}
|
||||
|
||||
bui := dui.BucketsUsage[bucket]
|
||||
return bui, nil
|
||||
}
|
||||
|
||||
@ -100,6 +108,10 @@ func (sys *BucketQuotaSys) enforceQuotaHard(ctx context.Context, bucket string,
|
||||
}
|
||||
|
||||
if q != nil && q.Type == madmin.HardQuota && q.Quota > 0 {
|
||||
if uint64(size) >= q.Quota { // check if file size already exceeds the quota
|
||||
return BucketQuotaExceeded{Bucket: bucket}
|
||||
}
|
||||
|
||||
bui, err := sys.GetBucketUsageInfo(bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -151,10 +151,8 @@ func newlockRESTClient(endpoint Endpoint) *lockRESTClient {
|
||||
}
|
||||
|
||||
restClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken())
|
||||
restClient.ExpectTimeouts = true
|
||||
// Use a separate client to avoid recursive calls.
|
||||
healthClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken())
|
||||
healthClient.ExpectTimeouts = true
|
||||
healthClient.NoMetrics = true
|
||||
restClient.HealthCheckFn = func() bool {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), restClient.HealthCheckTimeout)
|
||||
|
@ -200,8 +200,8 @@ func (di *distLockInstance) GetRLock(ctx context.Context, timeout *dynamicTimeou
|
||||
}) {
|
||||
timeout.LogFailure()
|
||||
defer cancel()
|
||||
if err := newCtx.Err(); err == context.Canceled {
|
||||
return LockContext{ctx: ctx, cancel: func() {}}, err
|
||||
if errors.Is(newCtx.Err(), context.Canceled) {
|
||||
return LockContext{ctx: ctx, cancel: func() {}}, newCtx.Err()
|
||||
}
|
||||
return LockContext{ctx: ctx, cancel: func() {}}, OperationTimedOut{}
|
||||
}
|
||||
@ -255,8 +255,8 @@ func (li *localLockInstance) GetLock(ctx context.Context, timeout *dynamicTimeou
|
||||
li.ns.unlock(li.volume, li.paths[si], readLock)
|
||||
}
|
||||
}
|
||||
if err := ctx.Err(); err == context.Canceled {
|
||||
return LockContext{}, err
|
||||
if errors.Is(ctx.Err(), context.Canceled) {
|
||||
return LockContext{}, ctx.Err()
|
||||
}
|
||||
return LockContext{}, OperationTimedOut{}
|
||||
}
|
||||
|
@ -782,7 +782,6 @@ func newPeerRESTClient(peer *xnet.Host) *peerRESTClient {
|
||||
restClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken())
|
||||
// Use a separate client to avoid recursive calls.
|
||||
healthClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken())
|
||||
healthClient.ExpectTimeouts = true
|
||||
healthClient.NoMetrics = true
|
||||
|
||||
// Construct a new health function.
|
||||
|
@ -349,7 +349,6 @@ func newPeerS3Client(peer *xnet.Host) *peerS3Client {
|
||||
restClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken())
|
||||
// Use a separate client to avoid recursive calls.
|
||||
healthClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken())
|
||||
healthClient.ExpectTimeouts = true
|
||||
healthClient.NoMetrics = true
|
||||
|
||||
// Construct a new health function.
|
||||
|
@ -828,7 +828,6 @@ func newStorageRESTClient(endpoint Endpoint, healthcheck bool) *storageRESTClien
|
||||
if healthcheck {
|
||||
// Use a separate client to avoid recursive calls.
|
||||
healthClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken())
|
||||
healthClient.ExpectTimeouts = true
|
||||
healthClient.NoMetrics = true
|
||||
restClient.HealthCheckFn = func() bool {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), restClient.HealthCheckTimeout)
|
||||
|
@ -86,10 +86,6 @@ type Client struct {
|
||||
// Should only be modified before any calls are made.
|
||||
MaxErrResponseSize int64
|
||||
|
||||
// ExpectTimeouts indicates if context timeouts are expected.
|
||||
// This will not mark the client offline in these cases.
|
||||
ExpectTimeouts bool
|
||||
|
||||
// Avoid metrics update if set to true
|
||||
NoMetrics bool
|
||||
|
||||
@ -249,12 +245,14 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod
|
||||
req.ContentLength = length
|
||||
}
|
||||
|
||||
_, expectTimeouts := ctx.Deadline()
|
||||
|
||||
req, update := setupReqStatsUpdate(req)
|
||||
defer update()
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
if xnet.IsNetworkOrHostDown(err, c.ExpectTimeouts) {
|
||||
if xnet.IsNetworkOrHostDown(err, expectTimeouts) {
|
||||
if !c.NoMetrics {
|
||||
atomic.AddUint64(&globalStats.errs, 1)
|
||||
}
|
||||
@ -284,7 +282,7 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod
|
||||
// Limit the ReadAll(), just in case, because of a bug, the server responds with large data.
|
||||
b, err := io.ReadAll(io.LimitReader(resp.Body, c.MaxErrResponseSize))
|
||||
if err != nil {
|
||||
if xnet.IsNetworkOrHostDown(err, c.ExpectTimeouts) {
|
||||
if xnet.IsNetworkOrHostDown(err, expectTimeouts) {
|
||||
if !c.NoMetrics {
|
||||
atomic.AddUint64(&globalStats.errs, 1)
|
||||
}
|
||||
@ -299,8 +297,8 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod
|
||||
}
|
||||
return nil, errors.New(resp.Status)
|
||||
}
|
||||
if !c.NoMetrics && !c.ExpectTimeouts {
|
||||
resp.Body = &respBodyMonitor{ReadCloser: resp.Body, expectTimeouts: c.ExpectTimeouts}
|
||||
if !c.NoMetrics {
|
||||
resp.Body = &respBodyMonitor{ReadCloser: resp.Body, expectTimeouts: expectTimeouts}
|
||||
}
|
||||
return resp.Body, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user