fix: limit HTTP transport tuables to affordable values (#9383)

Close connections pro-actively in transient calls
This commit is contained in:
Klaus Post 2020-04-17 20:20:56 +02:00 committed by GitHub
parent d92db198d1
commit c4464e36c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 40 additions and 28 deletions

View File

@ -986,7 +986,7 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) {
// Use buffered channel to take care of burst sends or slow w.Write() // Use buffered channel to take care of burst sends or slow w.Write()
traceCh := make(chan interface{}, 4000) traceCh := make(chan interface{}, 4000)
peers := getRestClients(globalEndpoints) peers := newPeerRestClients(globalEndpoints)
globalHTTPTrace.Subscribe(traceCh, ctx.Done(), func(entry interface{}) bool { globalHTTPTrace.Subscribe(traceCh, ctx.Done(), func(entry interface{}) bool {
return mustTrace(entry, trcAll, trcErr) return mustTrace(entry, trcAll, trcErr)
@ -1051,7 +1051,7 @@ func (a adminAPIHandlers) ConsoleLogHandler(w http.ResponseWriter, r *http.Reque
logCh := make(chan interface{}, 4000) logCh := make(chan interface{}, 4000)
peers := getRestClients(globalEndpoints) peers := newPeerRestClients(globalEndpoints)
globalConsoleSys.Subscribe(logCh, ctx.Done(), node, limitLines, logKind, nil) globalConsoleSys.Subscribe(logCh, ctx.Done(), node, limitLines, logKind, nil)
@ -1482,7 +1482,9 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
func fetchLambdaInfo(cfg config.Config) []map[string][]madmin.TargetIDStatus { func fetchLambdaInfo(cfg config.Config) []map[string][]madmin.TargetIDStatus {
// Fetch the configured targets // Fetch the configured targets
targetList, err := notify.FetchRegisteredTargets(cfg, GlobalContext.Done(), NewGatewayHTTPTransport(), true, false) tr := NewGatewayHTTPTransport()
defer tr.CloseIdleConnections()
targetList, err := notify.FetchRegisteredTargets(cfg, GlobalContext.Done(), tr, true, false)
if err != nil && err != notify.ErrTargetsOffline { if err != nil && err != notify.ErrTargetsOffline {
logger.LogIf(GlobalContext, err) logger.LogIf(GlobalContext, err)
return nil return nil
@ -1605,12 +1607,8 @@ func checkConnection(endpointStr string, timeout time.Duration) error {
return pErr return pErr
} }
tr := newCustomHTTPTransport( tr := newCustomHTTPTransport(&tls.Config{RootCAs: globalRootCAs}, timeout)()
&tls.Config{RootCAs: globalRootCAs}, defer tr.CloseIdleConnections()
timeout,
0, /* Default value */
)()
if dErr := u.DialHTTP(tr); dErr != nil { if dErr := u.DialHTTP(tr); dErr != nil {
if urlErr, ok := dErr.(*url.Error); ok { if urlErr, ok := dErr.(*url.Error); ok {
// To treat "connection refused" errors as un reachable endpoint. // To treat "connection refused" errors as un reachable endpoint.

View File

@ -247,7 +247,7 @@ func newBootstrapRESTClient(endpoint Endpoint) (*bootstrapRESTClient, error) {
} }
} }
trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout, rest.DefaultRESTTimeout) trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)
restClient, err := rest.NewClient(serverURL, trFn, newAuthToken) restClient, err := rest.NewClient(serverURL, trFn, newAuthToken)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -286,7 +286,7 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
// Use buffered channel to take care of burst sends or slow w.Write() // Use buffered channel to take care of burst sends or slow w.Write()
listenCh := make(chan interface{}, 4000) listenCh := make(chan interface{}, 4000)
peers := getRestClients(globalEndpoints) peers := newPeerRestClients(globalEndpoints)
globalHTTPListen.Subscribe(listenCh, ctx.Done(), func(evI interface{}) bool { globalHTTPListen.Subscribe(listenCh, ctx.Done(), func(evI interface{}) bool {
ev, ok := evI.(event.Event) ev, ok := evI.(event.Event)

View File

@ -170,7 +170,7 @@ func newlockRESTClient(endpoint Endpoint) *lockRESTClient {
} }
} }
trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout, rest.DefaultRESTTimeout) trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)
restClient, err := rest.NewClient(serverURL, trFn, newAuthToken) restClient, err := rest.NewClient(serverURL, trFn, newAuthToken)
if err != nil { if err != nil {
logger.LogIf(GlobalContext, err) logger.LogIf(GlobalContext, err)

View File

@ -1171,7 +1171,7 @@ func NewNotificationSys(endpoints EndpointZones) *NotificationSys {
targetList: event.NewTargetList(), targetList: event.NewTargetList(),
bucketRulesMap: make(map[string]event.RulesMap), bucketRulesMap: make(map[string]event.RulesMap),
bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap), bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap),
peerClients: getRestClients(endpoints), peerClients: newPeerRestClients(endpoints),
} }
} }

View File

@ -27,6 +27,7 @@ import (
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
@ -663,6 +664,10 @@ func getCpObjTagsFromHeader(ctx context.Context, r *http.Request, tags string) (
return tags, nil return tags, nil
} }
// getRemoteInstanceTransport contains a singleton roundtripper.
var getRemoteInstanceTransport http.RoundTripper
var getRemoteInstanceTransportOnce sync.Once
// Returns a minio-go Client configured to access remote host described by destDNSRecord // Returns a minio-go Client configured to access remote host described by destDNSRecord
// Applicable only in a federated deployment // Applicable only in a federated deployment
var getRemoteInstanceClient = func(r *http.Request, host string) (*miniogo.Core, error) { var getRemoteInstanceClient = func(r *http.Request, host string) (*miniogo.Core, error) {
@ -673,7 +678,10 @@ var getRemoteInstanceClient = func(r *http.Request, host string) (*miniogo.Core,
if err != nil { if err != nil {
return nil, err return nil, err
} }
core.SetCustomTransport(NewGatewayHTTPTransport()) getRemoteInstanceTransportOnce.Do(func() {
getRemoteInstanceTransport = NewGatewayHTTPTransport()
})
core.SetCustomTransport(getRemoteInstanceTransport)
return core, nil return core, nil
} }

View File

@ -1002,7 +1002,8 @@ func getRemoteHosts(endpointZones EndpointZones) []*xnet.Host {
return remoteHosts return remoteHosts
} }
func getRestClients(endpoints EndpointZones) []*peerRESTClient { // newPeerRestClients creates new peer clients.
func newPeerRestClients(endpoints EndpointZones) []*peerRESTClient {
peerHosts := getRemoteHosts(endpoints) peerHosts := getRemoteHosts(endpoints)
restClients := make([]*peerRESTClient, len(peerHosts)) restClients := make([]*peerRESTClient, len(peerHosts))
for i, host := range peerHosts { for i, host := range peerHosts {
@ -1019,7 +1020,6 @@ func getRestClients(endpoints EndpointZones) []*peerRESTClient {
// Returns a peer rest client. // Returns a peer rest client.
func newPeerRESTClient(peer *xnet.Host) (*peerRESTClient, error) { func newPeerRESTClient(peer *xnet.Host) (*peerRESTClient, error) {
scheme := "http" scheme := "http"
if globalIsSSL { if globalIsSSL {
scheme = "https" scheme = "https"
@ -1039,7 +1039,7 @@ func newPeerRESTClient(peer *xnet.Host) (*peerRESTClient, error) {
} }
} }
trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout, rest.DefaultRESTTimeout) trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)
restClient, err := rest.NewClient(serverURL, trFn, newAuthToken) restClient, err := rest.NewClient(serverURL, trFn, newAuthToken)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -201,12 +201,12 @@ func IsServerResolvable(endpoint Endpoint) error {
} }
httpClient := &http.Client{ httpClient := &http.Client{
Transport: newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout, rest.DefaultRESTTimeout)(), Transport: newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)(),
} }
defer httpClient.CloseIdleConnections()
resp, err := httpClient.Do(req) resp, err := httpClient.Do(req)
if err != nil { if err != nil {
httpClient.CloseIdleConnections()
return err return err
} }
defer xhttp.DrainBody(resp.Body) defer xhttp.DrainBody(resp.Body)

View File

@ -572,7 +572,7 @@ func newStorageRESTClient(endpoint Endpoint) *storageRESTClient {
} }
} }
trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout, rest.DefaultRESTTimeout) trFn := newCustomHTTPTransport(tlsConfig, rest.DefaultRESTTimeout)
restClient, err := rest.NewClient(serverURL, trFn, newAuthToken) restClient, err := rest.NewClient(serverURL, trFn, newAuthToken)
if err != nil { if err != nil {
logger.LogIf(GlobalContext, err) logger.LogIf(GlobalContext, err)

View File

@ -173,7 +173,6 @@ const (
// Default values used while communicating for internode communication. // Default values used while communicating for internode communication.
defaultDialTimeout = 5 * time.Second defaultDialTimeout = 5 * time.Second
defaultDialKeepAlive = 15 * time.Second
) )
// isMaxObjectSize - verify if max object size // isMaxObjectSize - verify if max object size
@ -464,14 +463,16 @@ func newCustomDialContext(dialTimeout, dialKeepAlive time.Duration) dialContext
} }
} }
func newCustomHTTPTransport(tlsConfig *tls.Config, dialTimeout, dialKeepAlive time.Duration) func() *http.Transport { func newCustomHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration) func() *http.Transport {
// For more details about various values used here refer // For more details about various values used here refer
// https://golang.org/pkg/net/http/#Transport documentation // https://golang.org/pkg/net/http/#Transport documentation
tr := &http.Transport{ tr := &http.Transport{
Proxy: http.ProxyFromEnvironment, Proxy: http.ProxyFromEnvironment,
DialContext: newCustomDialContext(dialTimeout, dialKeepAlive), DialContext: newCustomDialContext(dialTimeout, 15*time.Second),
MaxIdleConnsPerHost: 256, MaxIdleConnsPerHost: 16,
IdleConnTimeout: time.Minute, MaxIdleConns: 16,
MaxConnsPerHost: 64, // This is used per drive/rpc host. More requests will block until free.
IdleConnTimeout: 1 * time.Minute,
ResponseHeaderTimeout: 3 * time.Minute, // Set conservative timeouts for MinIO internode. ResponseHeaderTimeout: 3 * time.Minute, // Set conservative timeouts for MinIO internode.
TLSHandshakeTimeout: 10 * time.Second, TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 10 * time.Second, ExpectContinueTimeout: 10 * time.Second,
@ -493,9 +494,14 @@ func newCustomHTTPTransport(tlsConfig *tls.Config, dialTimeout, dialKeepAlive ti
func NewGatewayHTTPTransport() *http.Transport { func NewGatewayHTTPTransport() *http.Transport {
tr := newCustomHTTPTransport(&tls.Config{ tr := newCustomHTTPTransport(&tls.Config{
RootCAs: globalRootCAs, RootCAs: globalRootCAs,
}, defaultDialTimeout, defaultDialKeepAlive)() }, defaultDialTimeout)()
// Set aggressive timeouts for gateway // Set aggressive timeouts for gateway
tr.ResponseHeaderTimeout = 30 * time.Second tr.ResponseHeaderTimeout = 30 * time.Second
// Allow more requests to be in flight.
tr.MaxConnsPerHost = 256
tr.MaxIdleConnsPerHost = 16
tr.MaxIdleConns = 256
return tr return tr
} }

View File

@ -2109,10 +2109,10 @@ func (web *webAPIHandlers) LoginSTS(r *http.Request, args *LoginSTSArgs, reply *
clnt := &http.Client{ clnt := &http.Client{
Transport: NewGatewayHTTPTransport(), Transport: NewGatewayHTTPTransport(),
} }
defer clnt.CloseIdleConnections()
resp, err := clnt.Do(req) resp, err := clnt.Do(req)
if err != nil { if err != nil {
clnt.CloseIdleConnections()
return toJSONError(ctx, err) return toJSONError(ctx, err)
} }
defer xhttp.DrainBody(resp.Body) defer xhttp.DrainBody(resp.Body)