reduce number of middleware handlers (#13546)

- combine similar looking functionalities into single
  handlers, and remove unnecessary proxying of the
  requests at handler layer.

- remove bucket forwarding handler as part of default setup
  add it only if bucket federation is enabled.

Improvements observed for 1kiB object reads.
```
-------------------
Operation: GET
Operations: 4538555 -> 4595804
* Average: +1.26% (+0.2 MiB/s) throughput, +1.26% (+190.2) obj/s
* Fastest: +4.67% (+0.7 MiB/s) throughput, +4.67% (+739.8) obj/s
* 50% Median: +1.15% (+0.2 MiB/s) throughput, +1.15% (+173.9) obj/s
```
This commit is contained in:
Harshavardhana 2021-11-01 08:04:03 -07:00 committed by GitHub
parent 8ed7346273
commit 6d53e3c2d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 125 additions and 273 deletions

View File

@ -487,6 +487,26 @@ func setAuthHandler(h http.Handler) http.Handler {
// handler for validating incoming authorization headers.
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
aType := getRequestAuthType(r)
if aType == authTypeSigned || aType == authTypeSignedV2 || aType == authTypeStreamingSigned {
// Verify if date headers are set, if not reject the request
amzDate, errCode := parseAmzDateHeader(r)
if errCode != ErrNone {
// All our internal APIs are sensitive towards Date
// header, for all requests where Date header is not
// present we will reject such clients.
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(errCode), r.URL)
atomic.AddUint64(&globalHTTPStats.rejectedRequestsTime, 1)
return
}
// Verify if the request date header is shifted by less than globalMaxSkewTime parameter in the past
// or in the future, reject request otherwise.
curTime := UTCNow()
if curTime.Sub(amzDate) > globalMaxSkewTime || amzDate.Sub(curTime) > globalMaxSkewTime {
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrRequestTimeTooSkewed), r.URL)
atomic.AddUint64(&globalHTTPStats.rejectedRequestsTime, 1)
return
}
}
if isSupportedS3AuthType(aType) || aType == authTypeJWT || aType == authTypeSTS {
h.ServeHTTP(w, r)
return

View File

@ -25,21 +25,13 @@ const crossDomainXML = `<?xml version="1.0"?><!DOCTYPE cross-domain-policy SYSTE
// Standard path where an app would find cross domain policy information.
const crossDomainXMLEntity = "/crossdomain.xml"
// Cross domain policy implements http.Handler interface, implementing a custom ServerHTTP.
type crossDomainPolicy struct {
handler http.Handler
}
// A cross-domain policy file is an XML document that grants a web client, such as Adobe Flash Player
// or Adobe Acrobat (though not necessarily limited to these), permission to handle data across domains.
// When clients request content hosted on a particular source domain and that content make requests
// directed towards a domain other than its own, the remote domain needs to host a cross-domain
// policy file that grants access to the source domain, allowing the client to continue the transaction.
func setCrossDomainPolicy(h http.Handler) http.Handler {
return crossDomainPolicy{handler: h}
}
func (c crossDomainPolicy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Look for 'crossdomain.xml' in the incoming request.
switch r.URL.Path {
case crossDomainXMLEntity:
@ -48,6 +40,6 @@ func (c crossDomainPolicy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Request completed, no need to serve to other handlers.
return
}
// Continue to serve the request further.
c.handler.ServeHTTP(w, r)
h.ServeHTTP(w, r)
})
}

View File

@ -18,8 +18,6 @@
package cmd
import (
"context"
"errors"
"fmt"
"net"
"net/http"
@ -36,7 +34,6 @@ import (
"github.com/dustin/go-humanize"
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio/internal/config"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/mountinfo"
"github.com/minio/pkg/env"
@ -743,72 +740,6 @@ func GetProxyEndpointLocalIndex(proxyEps []ProxyEndpoint) int {
return -1
}
func httpDo(clnt *http.Client, req *http.Request, f func(*http.Response, error) error) error {
ctx, cancel := context.WithTimeout(GlobalContext, 200*time.Millisecond)
defer cancel()
// Run the HTTP request in a goroutine and pass the response to f.
c := make(chan error, 1)
req = req.WithContext(ctx)
go func() { c <- f(clnt.Do(req)) }()
select {
case <-ctx.Done():
<-c // Wait for f to return.
return ctx.Err()
case err := <-c:
return err
}
}
func getOnlineProxyEndpointIdx() int {
type reqIndex struct {
Request *http.Request
Idx int
}
proxyRequests := make(map[*http.Client]reqIndex, len(globalProxyEndpoints))
for i, proxyEp := range globalProxyEndpoints {
proxyEp := proxyEp
serverURL := &url.URL{
Scheme: proxyEp.Scheme,
Host: proxyEp.Host,
Path: pathJoin(healthCheckPathPrefix, healthCheckLivenessPath),
}
req, err := http.NewRequest(http.MethodGet, serverURL.String(), nil)
if err != nil {
continue
}
proxyRequests[&http.Client{
Transport: proxyEp.Transport,
}] = reqIndex{
Request: req,
Idx: i,
}
}
for c, r := range proxyRequests {
if err := httpDo(c, r.Request, func(resp *http.Response, err error) error {
if err != nil {
return err
}
xhttp.DrainBody(resp.Body)
if resp.StatusCode != http.StatusOK {
return errors.New(resp.Status)
}
if v := resp.Header.Get(xhttp.MinIOServerStatus); v == unavailable {
return errors.New(v)
}
return nil
}); err != nil {
continue
}
return r.Idx
}
return -1
}
// GetProxyEndpoints - get all endpoints that can be used to proxy list request.
func GetProxyEndpoints(endpointServerPools EndpointServerPools) []ProxyEndpoint {
var proxyEps []ProxyEndpoint

View File

@ -269,7 +269,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
addrs = append(addrs, globalMinioAddr)
}
httpServer := xhttp.NewServer(addrs, criticalErrorHandler{corsHandler(router)}, getCert)
httpServer := xhttp.NewServer(addrs, setCriticalErrorHandler(corsHandler(router)), getCert)
httpServer.BaseContext = func(listener net.Listener) context.Context {
return GlobalContext
}

View File

@ -18,7 +18,6 @@
package cmd
import (
"context"
"net"
"net/http"
"path"
@ -37,42 +36,39 @@ import (
"github.com/minio/minio/internal/logger"
)
// Adds limiting body size middleware
const (
// Maximum allowed form data field values. 64MiB is a guessed practical value
// which is more than enough to accommodate any form data fields and headers.
const requestFormDataSize = 64 * humanize.MiByte
requestFormDataSize = 64 * humanize.MiByte
// For any HTTP request, request body should be not more than 16GiB + requestFormDataSize
// where, 16GiB is the maximum allowed object size for object upload.
const requestMaxBodySize = globalMaxObjectSize + requestFormDataSize
requestMaxBodySize = globalMaxObjectSize + requestFormDataSize
func setRequestSizeLimitHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Restricting read data to a given maximum length
r.Body = http.MaxBytesReader(w, r.Body, requestMaxBodySize)
h.ServeHTTP(w, r)
})
}
const (
// Maximum size for http headers - See: https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
maxHeaderSize = 8 * 1024
// Maximum size for user-defined metadata - See: https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
maxUserDataSize = 2 * 1024
)
// ServeHTTP restricts the size of the http header to 8 KB and the size
// of the user-defined metadata to 2 KB.
func setRequestHeaderSizeLimitHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if isHTTPHeaderSizeTooLarge(r.Header) {
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrMetadataTooLarge), r.URL)
atomic.AddUint64(&globalHTTPStats.rejectedRequestsHeader, 1)
return
// ReservedMetadataPrefix is the prefix of a metadata key which
// is reserved and for internal use only.
const (
ReservedMetadataPrefix = "X-Minio-Internal-"
ReservedMetadataPrefixLower = "x-minio-internal-"
)
// containsReservedMetadata returns true if the http.Header contains
// keys which are treated as metadata but are reserved for internal use
// and must not set by clients
func containsReservedMetadata(header http.Header) bool {
for key := range header {
if strings.HasPrefix(strings.ToLower(key), ReservedMetadataPrefixLower) {
return true
}
h.ServeHTTP(w, r)
})
}
return false
}
// isHTTPHeaderSizeTooLarge returns true if the provided
@ -96,37 +92,25 @@ func isHTTPHeaderSizeTooLarge(header http.Header) bool {
return false
}
// ReservedMetadataPrefix is the prefix of a metadata key which
// is reserved and for internal use only.
const (
ReservedMetadataPrefix = "X-Minio-Internal-"
ReservedMetadataPrefixLower = "x-minio-internal-"
)
// ServeHTTP fails if the request contains at least one reserved header which
// would be treated as metadata.
func filterReservedMetadata(h http.Handler) http.Handler {
// Limits body and header to specific allowed maximum limits as per S3/MinIO API requirements.
func setRequestLimitHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Reject unsupported reserved metadata first before validation.
if containsReservedMetadata(r.Header) {
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrUnsupportedMetadata), r.URL)
return
}
if isHTTPHeaderSizeTooLarge(r.Header) {
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrMetadataTooLarge), r.URL)
atomic.AddUint64(&globalHTTPStats.rejectedRequestsHeader, 1)
return
}
// Restricting read data to a given maximum length
r.Body = http.MaxBytesReader(w, r.Body, requestMaxBodySize)
h.ServeHTTP(w, r)
})
}
// containsReservedMetadata returns true if the http.Header contains
// keys which are treated as metadata but are reserved for internal use
// and must not set by clients
func containsReservedMetadata(header http.Header) bool {
for key := range header {
if strings.HasPrefix(strings.ToLower(key), ReservedMetadataPrefixLower) {
return true
}
}
return false
}
// Reserved bucket.
const (
minioReservedBucket = "minio"
@ -134,24 +118,6 @@ const (
loginPathPrefix = SlashSeparator + "login"
)
func setRedirectHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !shouldProxy() || guessIsRPCReq(r) || guessIsBrowserReq(r) ||
guessIsHealthCheckReq(r) || guessIsMetricsReq(r) || isAdminReq(r) {
h.ServeHTTP(w, r)
return
}
// if this server is still initializing, proxy the request
// to any other online servers to avoid 503 for any incoming
// API calls.
if idx := getOnlineProxyEndpointIdx(); idx >= 0 {
proxyRequest(context.TODO(), w, r, globalProxyEndpoints[idx])
return
}
h.ServeHTTP(w, r)
})
}
func guessIsBrowserReq(r *http.Request) bool {
aType := getRequestAuthType(r)
return strings.Contains(r.Header.Get("User-Agent"), "Mozilla") &&
@ -174,10 +140,6 @@ func setBrowserRedirectHandler(h http.Handler) http.Handler {
})
}
func shouldProxy() bool {
return newObjectLayerFn() == nil
}
// Fetch redirect location if urlPath satisfies certain
// criteria. Some special names are considered to be
// redirectable, this is purely internal function and
@ -261,31 +223,21 @@ func isAdminReq(r *http.Request) bool {
return strings.HasPrefix(r.URL.Path, adminPathPrefix)
}
// Adds verification for incoming paths.
func setReservedBucketHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// For all other requests reject access to reserved buckets
bucketName, _ := request2BucketObjectName(r)
if isMinioReservedBucket(bucketName) || isMinioMetaBucket(bucketName) {
if !guessIsRPCReq(r) && !guessIsBrowserReq(r) && !guessIsHealthCheckReq(r) && !guessIsMetricsReq(r) && !isAdminReq(r) {
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrAllAccessDisabled), r.URL)
return
}
}
h.ServeHTTP(w, r)
})
}
// Supported Amz date formats.
// Supported amz date formats.
var amzDateFormats = []string{
// Do not change this order, x-amz-date format is usually in
// iso8601Format rest are meant for relaxed handling of other
// odd SDKs that might be out there.
iso8601Format,
time.RFC1123,
time.RFC1123Z,
iso8601Format,
// Add new AMZ date formats here.
}
// Supported Amz date headers.
var amzDateHeaders = []string{
// Do not chane this order, x-amz-date value should be
// validated first.
"x-amz-date",
"date",
}
@ -314,34 +266,6 @@ func parseAmzDateHeader(req *http.Request) (time.Time, APIErrorCode) {
return time.Time{}, ErrMissingDateHeader
}
// setTimeValidityHandler to validate parsable time over http header
func setTimeValidityHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
aType := getRequestAuthType(r)
if aType == authTypeSigned || aType == authTypeSignedV2 || aType == authTypeStreamingSigned {
// Verify if date headers are set, if not reject the request
amzDate, errCode := parseAmzDateHeader(r)
if errCode != ErrNone {
// All our internal APIs are sensitive towards Date
// header, for all requests where Date header is not
// present we will reject such clients.
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(errCode), r.URL)
atomic.AddUint64(&globalHTTPStats.rejectedRequestsTime, 1)
return
}
// Verify if the request date header is shifted by less than globalMaxSkewTime parameter in the past
// or in the future, reject request otherwise.
curTime := UTCNow()
if curTime.Sub(amzDate) > globalMaxSkewTime || amzDate.Sub(curTime) > globalMaxSkewTime {
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrRequestTimeTooSkewed), r.URL)
atomic.AddUint64(&globalHTTPStats.rejectedRequestsTime, 1)
return
}
}
h.ServeHTTP(w, r)
})
}
// setHttpStatsHandler sets a http Stats handler to gather HTTP statistics
func setHTTPStatsHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@ -420,6 +344,23 @@ func setRequestValidityHandler(h http.Handler) http.Handler {
atomic.AddUint64(&globalHTTPStats.rejectedRequestsInvalid, 1)
return
}
// For all other requests reject access to reserved buckets
bucketName, _ := request2BucketObjectName(r)
if isMinioReservedBucket(bucketName) || isMinioMetaBucket(bucketName) {
if !guessIsRPCReq(r) && !guessIsBrowserReq(r) && !guessIsHealthCheckReq(r) && !guessIsMetricsReq(r) && !isAdminReq(r) {
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrAllAccessDisabled), r.URL)
return
}
}
// Deny SSE-C requests if not made over TLS
if !globalIsTLS && (crypto.SSEC.IsRequested(r.Header) || crypto.SSECopy.IsRequested(r.Header)) {
if r.Method == http.MethodHead {
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInsecureSSECustomerRequest))
} else {
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrInsecureSSECustomerRequest), r.URL)
}
return
}
h.ServeHTTP(w, r)
})
}
@ -429,8 +370,7 @@ func setRequestValidityHandler(h http.Handler) http.Handler {
// is obtained from centralized etcd configuration service.
func setBucketForwardingHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if globalDNSConfig == nil || len(globalDomainNames) == 0 || !globalBucketFederation ||
guessIsHealthCheckReq(r) || guessIsMetricsReq(r) ||
if guessIsHealthCheckReq(r) || guessIsMetricsReq(r) ||
guessIsRPCReq(r) || guessIsLoginSTSReq(r) || isAdminReq(r) {
h.ServeHTTP(w, r)
return
@ -491,29 +431,23 @@ func setBucketForwardingHandler(h http.Handler) http.Handler {
})
}
// customHeaderHandler sets x-amz-request-id header.
// Previously, this value was set right before a response was sent to
// the client. So, logger and Error response XML were not using this
// value. This is set here so that this header can be logged as
// part of the log entry, Error response XML and auditing.
func addCustomHeaders(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Set custom headers such as x-amz-request-id for each request.
w.Header().Set(xhttp.AmzRequestID, mustGetRequestID(UTCNow()))
h.ServeHTTP(logger.NewResponseWriter(w), r)
})
}
// addSecurityHeaders adds various HTTP(S) response headers.
// addCustomHeaders adds various HTTP(S) response headers.
// Security Headers enable various security protections behaviors in the client's browser.
func addSecurityHeaders(h http.Handler) http.Handler {
func addCustomHeaders(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
header := w.Header()
header.Set("X-XSS-Protection", "1; mode=block") // Prevents against XSS attacks
header.Set("Content-Security-Policy", "block-all-mixed-content") // prevent mixed (HTTP / HTTPS content)
header.Set("X-Content-Type-Options", "nosniff") // Prevent mime-sniff
header.Set("Strict-Transport-Security", "max-age=31536000; includeSubDomains") // HSTS mitigates variants of MITM attacks
h.ServeHTTP(w, r)
// Previously, this value was set right before a response was sent to
// the client. So, logger and Error response XML were not using this
// value. This is set here so that this header can be logged as
// part of the log entry, Error response XML and auditing.
// Set custom headers such as x-amz-request-id for each request.
w.Header().Set(xhttp.AmzRequestID, mustGetRequestID(UTCNow()))
h.ServeHTTP(logger.NewResponseWriter(w), r)
})
}
@ -521,9 +455,8 @@ func addSecurityHeaders(h http.Handler) http.Handler {
// `panic(logger.ErrCritical)` as done by `logger.CriticalIf`.
//
// It should be always the first / highest HTTP handler.
type criticalErrorHandler struct{ handler http.Handler }
func (h criticalErrorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func setCriticalErrorHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
if err := recover(); err == logger.ErrCritical { // handle
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrInternalError), r.URL)
@ -532,21 +465,6 @@ func (h criticalErrorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
panic(err) // forward other panic calls
}
}()
h.handler.ServeHTTP(w, r)
}
// sseTLSHandler enforces certain rules for SSE requests which are made / must be made over TLS.
func setSSETLSHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Deny SSE-C requests if not made over TLS
if !globalIsTLS && (crypto.SSEC.IsRequested(r.Header) || crypto.SSECopy.IsRequested(r.Header)) {
if r.Method == http.MethodHead {
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInsecureSSECustomerRequest))
} else {
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrInsecureSSECustomerRequest), r.URL)
}
return
}
h.ServeHTTP(w, r)
})
}

View File

@ -155,7 +155,7 @@ func TestSSETLSHandler(t *testing.T) {
r.Header = test.Header
r.URL = test.URL
h := setSSETLSHandler(okHandler)
h := setRequestValidityHandler(okHandler)
h.ServeHTTP(w, r)
switch {

View File

@ -28,6 +28,10 @@ import (
const unavailable = "offline"
func shouldProxy() bool {
return newObjectLayerFn() == nil
}
// ClusterCheckHandler returns if the server is ready for requests.
func ClusterCheckHandler(w http.ResponseWriter, r *http.Request) {
if globalIsGateway {

View File

@ -40,41 +40,24 @@ func registerDistErasureRouters(router *mux.Router, endpointServerPools Endpoint
// List of some generic handlers which are applied for all incoming requests.
var globalHandlers = []mux.MiddlewareFunc{
// filters HTTP headers which are treated as metadata and are reserved
// for internal use only.
filterReservedMetadata,
// Enforce rules specific for TLS requests
setSSETLSHandler,
// Auth handler verifies incoming authorization headers and
// routes them accordingly. Client receives a HTTP error for
// invalid/unsupported signatures.
setAuthHandler,
//
// Validates all incoming requests to have a valid date header.
setTimeValidityHandler,
// Validates if incoming request is for restricted buckets.
setReservedBucketHandler,
setAuthHandler,
// Redirect some pre-defined browser request paths to a static location prefix.
setBrowserRedirectHandler,
// Adds 'crossdomain.xml' policy handler to serve legacy flash clients.
setCrossDomainPolicy,
// Limits all header sizes to a maximum fixed limit
setRequestHeaderSizeLimitHandler,
// Limits all requests size to a maximum fixed limit
setRequestSizeLimitHandler,
// Limits all body and header sizes to a maximum fixed limit
setRequestLimitHandler,
// Network statistics
setHTTPStatsHandler,
// Validate all the incoming requests.
setRequestValidityHandler,
// Forward path style requests to actual host in a bucket federated setup.
setBucketForwardingHandler,
// set HTTP security headers such as Content-Security-Policy.
addSecurityHeaders,
// set x-amz-request-id header.
addCustomHeaders,
// add redirect handler to redirect
// requests when object layer is not
// initialized.
setRedirectHandler,
// Add new handlers here.
}
@ -104,6 +87,10 @@ func configureServerHandler(endpointServerPools EndpointServerPools) (http.Handl
// Add API router
registerAPIRouter(router)
// Enable bucket forwarding handler only if bucket federation is enabled.
if globalDNSConfig != nil && globalBucketFederation {
globalHandlers = append(globalHandlers, setBucketForwardingHandler)
}
router.Use(globalHandlers...)
return router, nil

View File

@ -511,7 +511,7 @@ func serverMain(ctx *cli.Context) {
addrs = append(addrs, globalMinioAddr)
}
httpServer := xhttp.NewServer(addrs, criticalErrorHandler{corsHandler(handler)}, getCert)
httpServer := xhttp.NewServer(addrs, setCriticalErrorHandler(corsHandler(handler)), getCert)
httpServer.BaseContext = func(listener net.Listener) context.Context {
return GlobalContext
}

View File

@ -334,7 +334,7 @@ func UnstartedTestServer(t TestErrHandler, instanceType string) TestServer {
}
// Run TestServer.
testServer.Server = httptest.NewUnstartedServer(criticalErrorHandler{corsHandler(httpHandler)})
testServer.Server = httptest.NewUnstartedServer(setCriticalErrorHandler(corsHandler(httpHandler)))
globalObjLayerMutex.Lock()
globalObjectAPI = objLayer