Add rate limiter for S3 API layer (#9196)

- total number of S3 API calls per server
- maximum wait duration for any S3 API call

This implementation is primarily meant for situations
where HDDs are not capable enough to handle the incoming
workload and there is no way to throttle the client.

This feature allows MinIO server to throttle itself
such that we do not overwhelm the HDDs.
This commit is contained in:
Harshavardhana 2020-03-24 12:43:40 -07:00 committed by GitHub
parent 791821d590
commit 6f6a2214fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 250 additions and 73 deletions

View File

@ -211,6 +211,7 @@ const (
ErrInvalidResourceName
ErrServerNotInitialized
ErrOperationTimedOut
ErrOperationMaxedOut
ErrInvalidRequest
// MinIO storage class error codes
ErrInvalidStorageClass
@ -1085,6 +1086,11 @@ var errorCodes = errorCodeMap{
Description: "A timeout occurred while trying to lock a resource",
HTTPStatusCode: http.StatusRequestTimeout,
},
ErrOperationMaxedOut: {
Code: "XMinioServerTimedOut",
Description: "A timeout exceeded while waiting to proceed with the request",
HTTPStatusCode: http.StatusRequestTimeout,
},
ErrUnsupportedMetadata: {
Code: "InvalidArgument",
Description: "Your metadata headers are not supported.",

View File

@ -1,5 +1,5 @@
/*
* MinIO Cloud Storage, (C) 2016 MinIO, Inc.
* MinIO Cloud Storage, (C) 2016-2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,9 +18,14 @@ package cmd
import (
"net/http"
"strconv"
"time"
"github.com/gorilla/mux"
"github.com/minio/minio/cmd/config"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/env"
)
func newHTTPServerFn() *xhttp.Server {
@ -87,142 +92,229 @@ func registerAPIRouter(router *mux.Router, encryptionEnabled, allowSSEKMS bool)
}
routers = append(routers, apiRouter.PathPrefix("/{bucket}").Subrouter())
requestsMax, _ := strconv.Atoi(env.Get(config.EnvAPIRequestsMax, "0"))
if requestsMax < 0 {
requestsMax = 0 // 0 means unlimited
}
var (
enabled bool
requestsMaxPerServer int
requestsDeadline time.Duration
)
if len(globalEndpoints.Hosts()) == 0 {
requestsMaxPerServer = requestsMax
} else {
requestsMaxPerServer = requestsMax / len(globalEndpoints.Hosts())
}
enabled = requestsMaxPerServer > 0
if enabled {
var err error
requestsDeadline, err = time.ParseDuration(env.Get(config.EnvAPIRequestsDeadline, "10s"))
if err != nil {
logger.Info("(%s) parsing environment value MINIO_API_REQUESTS_DEADLINE, defaulting to 10 seconds", err)
requestsDeadline = 10 * time.Second
}
}
requestsMaxCh := make(chan struct{}, requestsMaxPerServer)
for _, bucket := range routers {
// Object operations
// HeadObject
bucket.Methods(http.MethodHead).Path("/{object:.+}").HandlerFunc(collectAPIStats("headobject", httpTraceAll(api.HeadObjectHandler)))
bucket.Methods(http.MethodHead).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("headobject", httpTraceAll(api.HeadObjectHandler)), enabled, requestsMaxCh, requestsDeadline))
// CopyObjectPart
bucket.Methods(http.MethodPut).Path("/{object:.+}").HeadersRegexp(xhttp.AmzCopySource, ".*?(\\/|%2F).*?").HandlerFunc(collectAPIStats("copyobjectpart", httpTraceAll(api.CopyObjectPartHandler))).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
bucket.Methods(http.MethodPut).Path("/{object:.+}").HeadersRegexp(xhttp.AmzCopySource, ".*?(\\/|%2F).*?").HandlerFunc(maxClients(collectAPIStats("copyobjectpart", httpTraceAll(api.CopyObjectPartHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
// PutObjectPart
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(collectAPIStats("putobjectpart", httpTraceHdrs(api.PutObjectPartHandler))).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("putobjectpart", httpTraceHdrs(api.PutObjectPartHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
// ListObjectParts
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(collectAPIStats("listobjectparts", httpTraceAll(api.ListObjectPartsHandler))).Queries("uploadId", "{uploadId:.*}")
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("listobjectparts", httpTraceAll(api.ListObjectPartsHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("uploadId", "{uploadId:.*}")
// CompleteMultipartUpload
bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc(collectAPIStats("completemutipartupload", httpTraceAll(api.CompleteMultipartUploadHandler))).Queries("uploadId", "{uploadId:.*}")
bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("completemutipartupload", httpTraceAll(api.CompleteMultipartUploadHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("uploadId", "{uploadId:.*}")
// NewMultipartUpload
bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc(collectAPIStats("newmultipartupload", httpTraceAll(api.NewMultipartUploadHandler))).Queries("uploads", "")
bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("newmultipartupload", httpTraceAll(api.NewMultipartUploadHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("uploads", "")
// AbortMultipartUpload
bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(collectAPIStats("abortmultipartupload", httpTraceAll(api.AbortMultipartUploadHandler))).Queries("uploadId", "{uploadId:.*}")
bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("abortmultipartupload", httpTraceAll(api.AbortMultipartUploadHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("uploadId", "{uploadId:.*}")
// GetObjectACL - this is a dummy call.
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(collectAPIStats("getobjectacl", httpTraceHdrs(api.GetObjectACLHandler))).Queries("acl", "")
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("getobjectacl", httpTraceHdrs(api.GetObjectACLHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("acl", "")
// PutObjectACL - this is a dummy call.
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(collectAPIStats("putobjectacl", httpTraceHdrs(api.PutObjectACLHandler))).Queries("acl", "")
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("putobjectacl", httpTraceHdrs(api.PutObjectACLHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("acl", "")
// GetObjectTagging
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(collectAPIStats("getobjecttagging", httpTraceHdrs(api.GetObjectTaggingHandler))).Queries("tagging", "")
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("getobjecttagging", httpTraceHdrs(api.GetObjectTaggingHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("tagging", "")
// PutObjectTagging
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(collectAPIStats("putobjecttagging", httpTraceHdrs(api.PutObjectTaggingHandler))).Queries("tagging", "")
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("putobjecttagging", httpTraceHdrs(api.PutObjectTaggingHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("tagging", "")
// DeleteObjectTagging
bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(collectAPIStats("deleteobjecttagging", httpTraceHdrs(api.DeleteObjectTaggingHandler))).Queries("tagging", "")
bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("deleteobjecttagging", httpTraceHdrs(api.DeleteObjectTaggingHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("tagging", "")
// SelectObjectContent
bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc(collectAPIStats("selectobjectcontent", httpTraceHdrs(api.SelectObjectContentHandler))).Queries("select", "").Queries("select-type", "2")
bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("selectobjectcontent", httpTraceHdrs(api.SelectObjectContentHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("select", "").Queries("select-type", "2")
// GetObjectRetention
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(collectAPIStats("getobjectretention", httpTraceAll(api.GetObjectRetentionHandler))).Queries("retention", "")
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("getobjectretention", httpTraceAll(api.GetObjectRetentionHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("retention", "")
// GetObjectLegalHold
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(collectAPIStats("getobjectlegalhold", httpTraceAll(api.GetObjectLegalHoldHandler))).Queries("legal-hold", "")
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("getobjectlegalhold", httpTraceAll(api.GetObjectLegalHoldHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("legal-hold", "")
// GetObject
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(collectAPIStats("getobject", httpTraceHdrs(api.GetObjectHandler)))
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("getobject", httpTraceHdrs(api.GetObjectHandler)), enabled, requestsMaxCh, requestsDeadline))
// CopyObject
bucket.Methods(http.MethodPut).Path("/{object:.+}").HeadersRegexp(xhttp.AmzCopySource, ".*?(\\/|%2F).*?").HandlerFunc(collectAPIStats("copyobject", httpTraceAll(api.CopyObjectHandler)))
bucket.Methods(http.MethodPut).Path("/{object:.+}").HeadersRegexp(xhttp.AmzCopySource, ".*?(\\/|%2F).*?").HandlerFunc(maxClients(collectAPIStats("copyobject", httpTraceAll(api.CopyObjectHandler)), enabled, requestsMaxCh, requestsDeadline))
// PutObjectRetention
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(collectAPIStats("putobjectretention", httpTraceAll(api.PutObjectRetentionHandler))).Queries("retention", "")
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("putobjectretention", httpTraceAll(api.PutObjectRetentionHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("retention", "")
// PutObjectLegalHold
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(collectAPIStats("putobjectlegalhold", httpTraceAll(api.PutObjectLegalHoldHandler))).Queries("legal-hold", "")
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("putobjectlegalhold", httpTraceAll(api.PutObjectLegalHoldHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("legal-hold", "")
// PutObject
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(collectAPIStats("putobject", httpTraceHdrs(api.PutObjectHandler)))
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("putobject", httpTraceHdrs(api.PutObjectHandler)), enabled, requestsMaxCh, requestsDeadline))
// DeleteObject
bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(collectAPIStats("deleteobject", httpTraceAll(api.DeleteObjectHandler)))
bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(
maxClients(collectAPIStats("deleteobject", httpTraceAll(api.DeleteObjectHandler)), enabled, requestsMaxCh, requestsDeadline))
/// Bucket operations
// GetBucketLocation
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketlocation", httpTraceAll(api.GetBucketLocationHandler))).Queries("location", "")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketlocation", httpTraceAll(api.GetBucketLocationHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("location", "")
// GetBucketPolicy
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketpolicy", httpTraceAll(api.GetBucketPolicyHandler))).Queries("policy", "")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketpolicy", httpTraceAll(api.GetBucketPolicyHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("policy", "")
// GetBucketLifecycle
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketlifecycle", httpTraceAll(api.GetBucketLifecycleHandler))).Queries("lifecycle", "")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketlifecycle", httpTraceAll(api.GetBucketLifecycleHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("lifecycle", "")
// GetBucketEncryption
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketencryption", httpTraceAll(api.GetBucketEncryptionHandler))).Queries("encryption", "")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketencryption", httpTraceAll(api.GetBucketEncryptionHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("encryption", "")
// Dummy Bucket Calls
// GetBucketACL -- this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketacl", httpTraceAll(api.GetBucketACLHandler))).Queries("acl", "")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketacl", httpTraceAll(api.GetBucketACLHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("acl", "")
// PutBucketACL -- this is a dummy call.
bucket.Methods(http.MethodPut).HandlerFunc(collectAPIStats("putbucketacl", httpTraceAll(api.PutBucketACLHandler))).Queries("acl", "")
bucket.Methods(http.MethodPut).HandlerFunc(
maxClients(collectAPIStats("putbucketacl", httpTraceAll(api.PutBucketACLHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("acl", "")
// GetBucketCors - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketcors", httpTraceAll(api.GetBucketCorsHandler))).Queries("cors", "")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketcors", httpTraceAll(api.GetBucketCorsHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("cors", "")
// GetBucketWebsiteHandler - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketwebsite", httpTraceAll(api.GetBucketWebsiteHandler))).Queries("website", "")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketwebsite", httpTraceAll(api.GetBucketWebsiteHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("website", "")
// GetBucketAccelerateHandler - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketaccelerate", httpTraceAll(api.GetBucketAccelerateHandler))).Queries("accelerate", "")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketaccelerate", httpTraceAll(api.GetBucketAccelerateHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("accelerate", "")
// GetBucketRequestPaymentHandler - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketrequestpayment", httpTraceAll(api.GetBucketRequestPaymentHandler))).Queries("requestPayment", "")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketrequestpayment", httpTraceAll(api.GetBucketRequestPaymentHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("requestPayment", "")
// GetBucketLoggingHandler - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketlogging", httpTraceAll(api.GetBucketLoggingHandler))).Queries("logging", "")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketlogging", httpTraceAll(api.GetBucketLoggingHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("logging", "")
// GetBucketLifecycleHandler - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketlifecycle", httpTraceAll(api.GetBucketLifecycleHandler))).Queries("lifecycle", "")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketlifecycle", httpTraceAll(api.GetBucketLifecycleHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("lifecycle", "")
// GetBucketReplicationHandler - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketreplication", httpTraceAll(api.GetBucketReplicationHandler))).Queries("replication", "")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketreplication", httpTraceAll(api.GetBucketReplicationHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("replication", "")
// GetBucketTaggingHandler - this is a dummy call.
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbuckettagging", httpTraceAll(api.GetBucketTaggingHandler))).Queries("tagging", "")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbuckettagging", httpTraceAll(api.GetBucketTaggingHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("tagging", "")
//DeleteBucketWebsiteHandler
bucket.Methods(http.MethodDelete).HandlerFunc(collectAPIStats("deletebucketwebsite", httpTraceAll(api.DeleteBucketWebsiteHandler))).Queries("website", "")
bucket.Methods(http.MethodDelete).HandlerFunc(
maxClients(collectAPIStats("deletebucketwebsite", httpTraceAll(api.DeleteBucketWebsiteHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("website", "")
// DeleteBucketTaggingHandler
bucket.Methods(http.MethodDelete).HandlerFunc(collectAPIStats("deletebuckettagging", httpTraceAll(api.DeleteBucketTaggingHandler))).Queries("tagging", "")
bucket.Methods(http.MethodDelete).HandlerFunc(
maxClients(collectAPIStats("deletebuckettagging", httpTraceAll(api.DeleteBucketTaggingHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("tagging", "")
// GetBucketObjectLockConfig
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketobjectlockconfiguration", httpTraceAll(api.GetBucketObjectLockConfigHandler))).Queries("object-lock", "")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketobjectlockconfiguration", httpTraceAll(api.GetBucketObjectLockConfigHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("object-lock", "")
// GetBucketVersioning
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketversioning", httpTraceAll(api.GetBucketVersioningHandler))).Queries("versioning", "")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketversioning", httpTraceAll(api.GetBucketVersioningHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("versioning", "")
// GetBucketNotification
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("getbucketnotification", httpTraceAll(api.GetBucketNotificationHandler))).Queries("notification", "")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("getbucketnotification", httpTraceAll(api.GetBucketNotificationHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("notification", "")
// ListenBucketNotification
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listenbucketnotification", httpTraceAll(api.ListenBucketNotificationHandler))).Queries("events", "{events:.*}")
// ListMultipartUploads
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listmultipartuploads", httpTraceAll(api.ListMultipartUploadsHandler))).Queries("uploads", "")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("listmultipartuploads", httpTraceAll(api.ListMultipartUploadsHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("uploads", "")
// ListObjectsV2M
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listobjectsv2M", httpTraceAll(api.ListObjectsV2MHandler))).Queries("list-type", "2", "metadata", "true")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("listobjectsv2M", httpTraceAll(api.ListObjectsV2MHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("list-type", "2", "metadata", "true")
// ListObjectsV2
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listobjectsv2", httpTraceAll(api.ListObjectsV2Handler))).Queries("list-type", "2")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("listobjectsv2", httpTraceAll(api.ListObjectsV2Handler)), enabled, requestsMaxCh, requestsDeadline)).Queries("list-type", "2")
// ListBucketVersions
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listbucketversions", httpTraceAll(api.ListBucketObjectVersionsHandler))).Queries("versions", "")
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("listbucketversions", httpTraceAll(api.ListBucketObjectVersionsHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("versions", "")
// ListObjectsV1 (Legacy)
bucket.Methods(http.MethodGet).HandlerFunc(collectAPIStats("listobjectsv1", httpTraceAll(api.ListObjectsV1Handler)))
bucket.Methods(http.MethodGet).HandlerFunc(
maxClients(collectAPIStats("listobjectsv1", httpTraceAll(api.ListObjectsV1Handler)), enabled, requestsMaxCh, requestsDeadline))
// PutBucketLifecycle
bucket.Methods(http.MethodPut).HandlerFunc(collectAPIStats("putbucketlifecycle", httpTraceAll(api.PutBucketLifecycleHandler))).Queries("lifecycle", "")
bucket.Methods(http.MethodPut).HandlerFunc(
maxClients(collectAPIStats("putbucketlifecycle", httpTraceAll(api.PutBucketLifecycleHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("lifecycle", "")
// PutBucketEncryption
bucket.Methods(http.MethodPut).HandlerFunc(collectAPIStats("putbucketencryption", httpTraceAll(api.PutBucketEncryptionHandler))).Queries("encryption", "")
bucket.Methods(http.MethodPut).HandlerFunc(
maxClients(collectAPIStats("putbucketencryption", httpTraceAll(api.PutBucketEncryptionHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("encryption", "")
// PutBucketPolicy
bucket.Methods(http.MethodPut).HandlerFunc(collectAPIStats("putbucketpolicy", httpTraceAll(api.PutBucketPolicyHandler))).Queries("policy", "")
bucket.Methods(http.MethodPut).HandlerFunc(
maxClients(collectAPIStats("putbucketpolicy", httpTraceAll(api.PutBucketPolicyHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("policy", "")
// PutBucketObjectLockConfig
bucket.Methods(http.MethodPut).HandlerFunc(collectAPIStats("putbucketobjectlockconfig", httpTraceAll(api.PutBucketObjectLockConfigHandler))).Queries("object-lock", "")
bucket.Methods(http.MethodPut).HandlerFunc(
maxClients(collectAPIStats("putbucketobjectlockconfig", httpTraceAll(api.PutBucketObjectLockConfigHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("object-lock", "")
// PutBucketVersioning
bucket.Methods(http.MethodPut).HandlerFunc(collectAPIStats("putbucketversioning", httpTraceAll(api.PutBucketVersioningHandler))).Queries("versioning", "")
bucket.Methods(http.MethodPut).HandlerFunc(
maxClients(collectAPIStats("putbucketversioning", httpTraceAll(api.PutBucketVersioningHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("versioning", "")
// PutBucketNotification
bucket.Methods(http.MethodPut).HandlerFunc(collectAPIStats("putbucketnotification", httpTraceAll(api.PutBucketNotificationHandler))).Queries("notification", "")
bucket.Methods(http.MethodPut).HandlerFunc(
maxClients(collectAPIStats("putbucketnotification", httpTraceAll(api.PutBucketNotificationHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("notification", "")
// PutBucket
bucket.Methods(http.MethodPut).HandlerFunc(collectAPIStats("putbucket", httpTraceAll(api.PutBucketHandler)))
bucket.Methods(http.MethodPut).HandlerFunc(
maxClients(collectAPIStats("putbucket", httpTraceAll(api.PutBucketHandler)), enabled, requestsMaxCh, requestsDeadline))
// HeadBucket
bucket.Methods(http.MethodHead).HandlerFunc(collectAPIStats("headbucket", httpTraceAll(api.HeadBucketHandler)))
bucket.Methods(http.MethodHead).HandlerFunc(
maxClients(collectAPIStats("headbucket", httpTraceAll(api.HeadBucketHandler)), enabled, requestsMaxCh, requestsDeadline))
// PostPolicy
bucket.Methods(http.MethodPost).HeadersRegexp(xhttp.ContentType, "multipart/form-data*").HandlerFunc(collectAPIStats("postpolicybucket", httpTraceHdrs(api.PostPolicyBucketHandler)))
bucket.Methods(http.MethodPost).HeadersRegexp(xhttp.ContentType, "multipart/form-data*").HandlerFunc(
maxClients(collectAPIStats("postpolicybucket", httpTraceHdrs(api.PostPolicyBucketHandler)), enabled, requestsMaxCh, requestsDeadline))
// DeleteMultipleObjects
bucket.Methods(http.MethodPost).HandlerFunc(collectAPIStats("deletemultipleobjects", httpTraceAll(api.DeleteMultipleObjectsHandler))).Queries("delete", "")
bucket.Methods(http.MethodPost).HandlerFunc(
maxClients(collectAPIStats("deletemultipleobjects", httpTraceAll(api.DeleteMultipleObjectsHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("delete", "")
// DeleteBucketPolicy
bucket.Methods(http.MethodDelete).HandlerFunc(collectAPIStats("deletebucketpolicy", httpTraceAll(api.DeleteBucketPolicyHandler))).Queries("policy", "")
bucket.Methods(http.MethodDelete).HandlerFunc(
maxClients(collectAPIStats("deletebucketpolicy", httpTraceAll(api.DeleteBucketPolicyHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("policy", "")
// DeleteBucketLifecycle
bucket.Methods(http.MethodDelete).HandlerFunc(collectAPIStats("deletebucketlifecycle", httpTraceAll(api.DeleteBucketLifecycleHandler))).Queries("lifecycle", "")
bucket.Methods(http.MethodDelete).HandlerFunc(
maxClients(collectAPIStats("deletebucketlifecycle", httpTraceAll(api.DeleteBucketLifecycleHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("lifecycle", "")
// DeleteBucketEncryption
bucket.Methods(http.MethodDelete).HandlerFunc(collectAPIStats("deletebucketencryption", httpTraceAll(api.DeleteBucketEncryptionHandler))).Queries("encryption", "")
bucket.Methods(http.MethodDelete).HandlerFunc(
maxClients(collectAPIStats("deletebucketencryption", httpTraceAll(api.DeleteBucketEncryptionHandler)), enabled, requestsMaxCh, requestsDeadline)).Queries("encryption", "")
// DeleteBucket
bucket.Methods(http.MethodDelete).HandlerFunc(collectAPIStats("deletebucket", httpTraceAll(api.DeleteBucketHandler)))
bucket.Methods(http.MethodDelete).HandlerFunc(
maxClients(collectAPIStats("deletebucket", httpTraceAll(api.DeleteBucketHandler)), enabled, requestsMaxCh, requestsDeadline))
}
/// Root operation
// ListBuckets
apiRouter.Methods(http.MethodGet).Path(SlashSeparator).HandlerFunc(collectAPIStats("listbuckets", httpTraceAll(api.ListBucketsHandler)))
apiRouter.Methods(http.MethodGet).Path(SlashSeparator).HandlerFunc(
maxClients(collectAPIStats("listbuckets", httpTraceAll(api.ListBucketsHandler)), enabled, requestsMaxCh, requestsDeadline))
// If none of the routes match add default error handler routes
apiRouter.NotFoundHandler = http.HandlerFunc(collectAPIStats("notfound", httpTraceAll(errorResponseHandler)))

View File

@ -75,7 +75,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
}
// Wait and proceed if there are active requests
waitForLowHTTPReq(int32(globalEndpoints.Nodes()))
waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()))
var res madmin.HealResultItem
var err error

View File

@ -62,9 +62,9 @@ func (s1 ServerSystemConfig) Diff(s2 ServerSystemConfig) error {
return fmt.Errorf("Expected platform '%s', found to be running '%s'",
s1.MinioPlatform, s2.MinioPlatform)
}
if s1.MinioEndpoints.Nodes() != s2.MinioEndpoints.Nodes() {
return fmt.Errorf("Expected number of endpoints %d, seen %d", s1.MinioEndpoints.Nodes(),
s2.MinioEndpoints.Nodes())
if s1.MinioEndpoints.NEndpoints() != s2.MinioEndpoints.NEndpoints() {
return fmt.Errorf("Expected number of endpoints %d, seen %d", s1.MinioEndpoints.NEndpoints(),
s2.MinioEndpoints.NEndpoints())
}
for i, ep := range s1.MinioEndpoints {
@ -110,7 +110,7 @@ func registerBootstrapRESTHandlers(router *mux.Router) {
httpTraceHdrs(server.VerifyHandler))
}
// client to talk to bootstrap Nodes.
// client to talk to bootstrap NEndpoints.
type bootstrapRESTClient struct {
endpoint Endpoint
restClient *rest.Client

View File

@ -33,6 +33,10 @@ const (
EnvPublicIPs = "MINIO_PUBLIC_IPS"
EnvEndpoints = "MINIO_ENDPOINTS"
// API sub-system
EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX"
EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE"
EnvUpdate = "MINIO_UPDATE"
EnvWorm = "MINIO_WORM" // legacy

View File

@ -107,7 +107,7 @@ func lifecycleRound(ctx context.Context, objAPI ObjectLayer) error {
break
}
waitForLowHTTPReq(int32(globalEndpoints.Nodes()))
waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()))
// Deletes a list of objects.
deleteErrs, err := objAPI.DeleteObjects(ctx, bucket.Name, objects)

View File

@ -220,14 +220,28 @@ func (l EndpointZones) HTTPS() bool {
return l[0].Endpoints.HTTPS()
}
// Nodes - returns all nodes count
func (l EndpointZones) Nodes() (count int) {
// NEndpoints - returns all nodes count
func (l EndpointZones) NEndpoints() (count int) {
for _, ep := range l {
count += len(ep.Endpoints)
}
return count
}
// Hosts - returns list of unique hosts
func (l EndpointZones) Hosts() []string {
foundSet := set.NewStringSet()
for _, ep := range l {
for _, endpoint := range ep.Endpoints {
if foundSet.Contains(endpoint.Host) {
continue
}
foundSet.Add(endpoint.Host)
}
}
return foundSet.ToSlice()
}
// Endpoints - list of same type of endpoint.
type Endpoints []Endpoint

View File

@ -28,6 +28,7 @@ import (
"net/url"
"regexp"
"strings"
"time"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
@ -358,6 +359,31 @@ func httpTraceHdrs(f http.HandlerFunc) http.HandlerFunc {
}
}
// maxClients throttles the S3 API calls
func maxClients(f http.HandlerFunc, enabled bool, requestsMaxCh chan struct{}, requestsDeadline time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !enabled {
f.ServeHTTP(w, r)
return
}
select {
case requestsMaxCh <- struct{}{}:
defer func() { <-requestsMaxCh }()
f.ServeHTTP(w, r)
case <-time.NewTimer(requestsDeadline).C:
// Send a http timeout message
writeErrorResponse(r.Context(), w,
errorCodes.ToAPIErr(ErrOperationMaxedOut),
r.URL, guessIsBrowserReq(r))
return
case <-r.Context().Done():
return
}
}
}
func collectAPIStats(api string, f http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {

View File

@ -638,7 +638,7 @@ func listIAMConfigItems(objectAPI ObjectLayer, pathPrefix string, dirs bool,
if !globalSafeMode {
// Slow down listing and loading for config items to
// reduce load on the server
waitForLowHTTPReq(int32(globalEndpoints.Nodes()))
waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()))
}
marker = lo.NextMarker

View File

@ -41,7 +41,7 @@ import (
trace "github.com/minio/minio/pkg/trace"
)
// client to talk to peer Nodes.
// client to talk to peer NEndpoints.
type peerRESTClient struct {
host *xnet.Host
restClient *rest.Client

View File

@ -472,7 +472,7 @@ func serverMain(ctx *cli.Context) {
func newObjectLayer(endpointZones EndpointZones) (newObject ObjectLayer, err error) {
// For FS only, directly use the disk.
if endpointZones.Nodes() == 1 {
if endpointZones.NEndpoints() == 1 {
// Initialize new FS object layer.
return NewFSObjectLayer(endpointZones[0].Endpoints[0].Path)
}

View File

@ -1584,7 +1584,7 @@ func getRandomDisks(N int) ([]string, error) {
// Initialize object layer with the supplied disks, objectLayer is nil upon any error.
func newTestObjectLayer(endpointZones EndpointZones) (newObject ObjectLayer, err error) {
// For FS only, directly use the disk.
if endpointZones.Nodes() == 1 {
if endpointZones.NEndpoints() == 1 {
// Initialize new FS object layer.
return NewFSObjectLayer(endpointZones[0].Endpoints[0].Path)
}

35
docs/throttle/README.md Normal file
View File

@ -0,0 +1,35 @@
# MinIO Server Throttling Guide [![Slack](https://slack.min.io/slack?type=svg)](https://slack.min.io) [![Docker Pulls](https://img.shields.io/docker/pulls/minio/minio.svg?maxAge=604800)](https://hub.docker.com/r/minio/minio/)
MinIO server allows to throttle incoming requests:
- limit the number of active requests allowed across the cluster
- limit the wait duration for each request in the queue
These values are enabled using environment variables *only*.
## Configuring connection limit
If you have traditional spinning (hdd) drives, some applications with high concurrency might require MinIO cluster to be tuned such that to avoid random I/O on the drives. The way to convert high concurrent I/O into a sequential I/O is by reducing the number of concurrent operations allowed per cluster. This allows MinIO cluster to be operationally resilient to such workloads, while also making sure the drives are at optimal efficiency and responsive.
Example: Limit a MinIO cluster to accept at max 1600 simultaneous S3 API requests across 8 servers.
```sh
export MINIO_API_REQUESTS_MAX=1600
export MINIO_ACCESS_KEY=your-access-key
export MINIO_SECRET_KEY=your-secret-key
minio server http://server{1...8}/mnt/hdd{1...16}
```
> NOTE: Setting MINIO_API_REQUESTS_MAX=0 means unlimited and that is the default behavior. These values need to be set based on your deployment requirements and application.
## Configuring connection (wait) deadline
This value works in conjunction with max connection setting, setting this value allows for long waiting requests to quickly time out when there is no slot available to perform the request.
This will reduce the pileup of waiting requests when clients are not configured with timeouts. Default wait time is *10 seconds* if *MINIO_API_REQUESTS_MAX* is enabled. This may need to be tuned to your application needs.
Example: Limit a MinIO cluster to accept at max 1600 simultaneous S3 API requests across 8 servers, and set the wait deadline of *2 minutes* per API operation.
```sh
export MINIO_API_REQUESTS_MAX=1600
export MINIO_API_REQUESTS_DEADLINE=2m
export MINIO_ACCESS_KEY=your-access-key
export MINIO_SECRET_KEY=your-secret-key
minio server http://server{1...8}/mnt/hdd{1...16}
```