Refactor HTTP transports (#16222)

This commit is contained in:
Aditya Manthramurthy 2022-12-12 20:31:21 -08:00 committed by GitHub
parent 37e20f6ef2
commit 2d60bf8c50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 232 additions and 177 deletions

View File

@ -134,7 +134,7 @@ func init() {
globalForwarder = handlers.NewForwarder(&handlers.Forwarder{
PassHost: true,
RoundTripper: newHTTPTransport(1 * time.Hour),
RoundTripper: NewHTTPTransportWithTimeout(1 * time.Hour),
Logger: func(err error) {
if err != nil && !errors.Is(err, context.Canceled) {
logger.LogIf(GlobalContext, err)

View File

@ -551,7 +551,7 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
// Initialize remote instance transport once.
getRemoteInstanceTransportOnce.Do(func() {
getRemoteInstanceTransport = newHTTPTransport(apiConfig.RemoteTransportDeadline)
getRemoteInstanceTransport = NewHTTPTransportWithTimeout(apiConfig.RemoteTransportDeadline)
})
case config.CompressionSubSys:
cmpCfg, err := compress.LookupConfig(s[config.CompressionSubSys][config.Default])

View File

@ -19,7 +19,6 @@ package cmd
import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
@ -42,10 +41,8 @@ import (
"github.com/minio/minio/internal/bucket/bandwidth"
"github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/config"
"github.com/minio/minio/internal/fips"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/rest"
"github.com/minio/pkg/certs"
"github.com/minio/pkg/env"
)
@ -236,19 +233,9 @@ func serverHandleCmdArgs(ctx *cli.Context) {
}
// allow transport to be HTTP/1.1 for proxying.
globalProxyTransport = newCustomHTTPProxyTransport(&tls.Config{
RootCAs: globalRootCAs,
CipherSuites: fips.TLSCiphers(),
CurvePreferences: fips.TLSCurveIDs(),
ClientSessionCache: tls.NewLRUClientSessionCache(tlsClientSessionCacheSize),
}, rest.DefaultTimeout)()
globalProxyTransport = NewCustomHTTPProxyTransport()()
globalProxyEndpoints = GetProxyEndpoints(globalEndpoints)
globalInternodeTransport = newInternodeHTTPTransport(&tls.Config{
RootCAs: globalRootCAs,
CipherSuites: fips.TLSCiphers(),
CurvePreferences: fips.TLSCurveIDs(),
ClientSessionCache: tls.NewLRUClientSessionCache(tlsClientSessionCacheSize),
}, rest.DefaultTimeout)()
globalInternodeTransport = NewInternodeHTTPTransport()()
globalRemoteTargetTransport = NewRemoteTargetHTTPTransport()()
// On macOS, if a process already listens on LOCALIPADDR:PORT, net.Listen() falls back

View File

@ -66,7 +66,6 @@ import (
"github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/rest"
"github.com/minio/pkg/bucket/policy"
)
@ -112,7 +111,7 @@ func TestMain(m *testing.M) {
// Initialize globalConsoleSys system
globalConsoleSys = NewConsoleLogger(context.Background())
globalInternodeTransport = newInternodeHTTPTransport(nil, rest.DefaultTimeout)()
globalInternodeTransport = NewInternodeHTTPTransport()()
initHelp()

View File

@ -38,7 +38,6 @@ import (
"runtime/trace"
"strings"
"sync"
"syscall"
"time"
"github.com/coreos/go-oidc"
@ -60,6 +59,7 @@ import (
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/logger/message/audit"
"github.com/minio/minio/internal/mcontext"
"github.com/minio/minio/internal/rest"
"github.com/minio/pkg/certs"
"github.com/minio/pkg/env"
xnet "github.com/minio/pkg/net"
@ -568,163 +568,73 @@ func ToS3ETag(etag string) string {
return etag
}
func newInternodeHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration) func() http.RoundTripper {
// For more details about various values used here refer
// https://golang.org/pkg/net/http/#Transport documentation
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: xhttp.DialContextWithDNSCache(globalDNSCache, xhttp.NewInternodeDialContext(dialTimeout)),
MaxIdleConnsPerHost: 1024,
WriteBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
ReadBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
IdleConnTimeout: 15 * time.Second,
ResponseHeaderTimeout: 15 * time.Minute, // Set conservative timeouts for MinIO internode.
TLSHandshakeTimeout: 15 * time.Second,
ExpectContinueTimeout: 15 * time.Second,
TLSClientConfig: tlsConfig,
// Go net/http automatically unzip if content-type is
// gzip disable this feature, as we are always interested
// in raw stream.
DisableCompression: true,
}
// https://github.com/golang/go/issues/23559
// https://github.com/golang/go/issues/42534
// https://github.com/golang/go/issues/43989
// https://github.com/golang/go/issues/33425
// https://github.com/golang/go/issues/29246
// if tlsConfig != nil {
// trhttp2, _ := http2.ConfigureTransports(tr)
// if trhttp2 != nil {
// // ReadIdleTimeout is the timeout after which a health check using ping
// // frame will be carried out if no frame is received on the
// // connection. 5 minutes is sufficient time for any idle connection.
// trhttp2.ReadIdleTimeout = 5 * time.Minute
// // PingTimeout is the timeout after which the connection will be closed
// // if a response to Ping is not received.
// trhttp2.PingTimeout = dialTimeout
// // DisableCompression, if true, prevents the Transport from
// // requesting compression with an "Accept-Encoding: gzip"
// trhttp2.DisableCompression = true
// }
// }
return func() http.RoundTripper {
return tr
}
// NewInternodeHTTPTransport returns a transport for internode MinIO
// connections.
func NewInternodeHTTPTransport() func() http.RoundTripper {
return xhttp.ConnSettings{
DNSCache: globalDNSCache,
DialTimeout: rest.DefaultTimeout,
RootCAs: globalRootCAs,
CipherSuites: fips.TLSCiphers(),
CurvePreferences: fips.TLSCurveIDs(),
EnableHTTP2: false,
}.NewInternodeHTTPTransport()
}
// Used by only proxied requests, specifically only supports HTTP/1.1
func newCustomHTTPProxyTransport(tlsConfig *tls.Config, dialTimeout time.Duration) func() *http.Transport {
// For more details about various values used here refer
// https://golang.org/pkg/net/http/#Transport documentation
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: xhttp.DialContextWithDNSCache(globalDNSCache, xhttp.NewInternodeDialContext(dialTimeout)),
MaxIdleConnsPerHost: 1024,
WriteBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
ReadBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
IdleConnTimeout: 15 * time.Second,
ResponseHeaderTimeout: 30 * time.Minute, // Set larger timeouts for proxied requests.
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 10 * time.Second,
TLSClientConfig: tlsConfig,
// Go net/http automatically unzip if content-type is
// gzip disable this feature, as we are always interested
// in raw stream.
DisableCompression: true,
}
return func() *http.Transport {
return tr
}
}
func newCustomHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration) func() *http.Transport {
// For more details about various values used here refer
// https://golang.org/pkg/net/http/#Transport documentation
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: xhttp.DialContextWithDNSCache(globalDNSCache, xhttp.NewInternodeDialContext(dialTimeout)),
MaxIdleConnsPerHost: 1024,
WriteBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
ReadBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
IdleConnTimeout: 15 * time.Second,
ResponseHeaderTimeout: 3 * time.Minute,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 10 * time.Second,
TLSClientConfig: tlsConfig,
// Go net/http automatically unzip if content-type is
// gzip disable this feature, as we are always interested
// in raw stream.
DisableCompression: true,
}
// https://github.com/golang/go/issues/23559
// https://github.com/golang/go/issues/42534
// https://github.com/golang/go/issues/43989
// https://github.com/golang/go/issues/33425
// https://github.com/golang/go/issues/29246
// if tlsConfig != nil {
// trhttp2, _ := http2.ConfigureTransports(tr)
// if trhttp2 != nil {
// // ReadIdleTimeout is the timeout after which a health check using ping
// // frame will be carried out if no frame is received on the
// // connection. 5 minutes is sufficient time for any idle connection.
// trhttp2.ReadIdleTimeout = 5 * time.Minute
// // PingTimeout is the timeout after which the connection will be closed
// // if a response to Ping is not received.
// trhttp2.PingTimeout = dialTimeout
// // DisableCompression, if true, prevents the Transport from
// // requesting compression with an "Accept-Encoding: gzip"
// trhttp2.DisableCompression = true
// }
// }
return func() *http.Transport {
return tr
}
// NewCustomHTTPProxyTransport is used only for proxied requests, specifically
// only supports HTTP/1.1
func NewCustomHTTPProxyTransport() func() *http.Transport {
return xhttp.ConnSettings{
DNSCache: globalDNSCache,
DialTimeout: rest.DefaultTimeout,
RootCAs: globalRootCAs,
CipherSuites: fips.TLSCiphers(),
CurvePreferences: fips.TLSCurveIDs(),
EnableHTTP2: false,
}.NewCustomHTTPProxyTransport()
}
// NewHTTPTransportWithClientCerts returns a new http configuration
// used while communicating with the cloud backends.
func NewHTTPTransportWithClientCerts(clientCert, clientKey string) *http.Transport {
transport := newHTTPTransport(1 * time.Minute)
s := xhttp.ConnSettings{
DNSCache: globalDNSCache,
DialTimeout: 1 * time.Minute,
RootCAs: globalRootCAs,
EnableHTTP2: false,
}
if clientCert != "" && clientKey != "" {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
c, err := certs.NewManager(ctx, clientCert, clientKey, tls.LoadX509KeyPair)
transport, err := s.NewHTTPTransportWithClientCerts(ctx, clientCert, clientKey)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("failed to load client key and cert, please check your endpoint configuration: %s",
err.Error()))
}
if c != nil {
c.UpdateReloadDuration(10 * time.Second)
c.ReloadOnSignal(syscall.SIGHUP) // allow reloads upon SIGHUP
transport.TLSClientConfig.GetClientCertificate = c.GetClientCertificate
}
return transport
}
return transport
return s.NewHTTPTransportWithTimeout(1 * time.Minute)
}
// NewHTTPTransport returns a new http configuration
// used while communicating with the cloud backends.
func NewHTTPTransport() *http.Transport {
return newHTTPTransport(1 * time.Minute)
return NewHTTPTransportWithTimeout(1 * time.Minute)
}
// Default values for dial timeout
const defaultDialTimeout = 5 * time.Second
func newHTTPTransport(timeout time.Duration) *http.Transport {
tr := newCustomHTTPTransport(&tls.Config{
RootCAs: globalRootCAs,
ClientSessionCache: tls.NewLRUClientSessionCache(tlsClientSessionCacheSize),
}, defaultDialTimeout)()
// Customize response header timeout
tr.ResponseHeaderTimeout = timeout
return tr
// NewHTTPTransportWithTimeout allows setting a timeout.
func NewHTTPTransportWithTimeout(timeout time.Duration) *http.Transport {
return xhttp.ConnSettings{
DNSCache: globalDNSCache,
DialTimeout: defaultDialTimeout,
RootCAs: globalRootCAs,
EnableHTTP2: false,
}.NewHTTPTransportWithTimeout(timeout)
}
type dialContext func(ctx context.Context, network, addr string) (net.Conn, error)
@ -753,29 +663,11 @@ func newCustomDialContext() dialContext {
// NewRemoteTargetHTTPTransport returns a new http configuration
// used while communicating with the remote replication targets.
func NewRemoteTargetHTTPTransport() func() *http.Transport {
// For more details about various values used here refer
// https://golang.org/pkg/net/http/#Transport documentation
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: newCustomDialContext(),
MaxIdleConnsPerHost: 1024,
WriteBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
ReadBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
IdleConnTimeout: 15 * time.Second,
TLSHandshakeTimeout: 5 * time.Second,
ExpectContinueTimeout: 5 * time.Second,
TLSClientConfig: &tls.Config{
RootCAs: globalRootCAs,
ClientSessionCache: tls.NewLRUClientSessionCache(tlsClientSessionCacheSize),
},
// Go net/http automatically unzip if content-type is
// gzip disable this feature, as we are always interested
// in raw stream.
DisableCompression: true,
}
return func() *http.Transport {
return tr
}
return xhttp.ConnSettings{
DialContext: newCustomDialContext(),
RootCAs: globalRootCAs,
EnableHTTP2: false,
}.NewCustomHTTPProxyTransport()
}
// Load the json (typically from disk file).

View File

@ -42,7 +42,7 @@ func newWarmBackendMinIO(conf madmin.TierMinIO) (*warmBackendMinIO, error) {
creds := credentials.NewStaticV4(conf.AccessKey, conf.SecretKey, "")
getRemoteTierTargetInstanceTransportOnce.Do(func() {
getRemoteTierTargetInstanceTransport = newHTTPTransport(10 * time.Minute)
getRemoteTierTargetInstanceTransport = NewHTTPTransportWithTimeout(10 * time.Minute)
})
opts := &minio.Options{
Creds: creds,

View File

@ -117,7 +117,7 @@ func newWarmBackendS3(conf madmin.TierS3) (*warmBackendS3, error) {
creds = credentials.NewStaticV4(conf.AccessKey, conf.SecretKey, "")
}
getRemoteTierTargetInstanceTransportOnce.Do(func() {
getRemoteTierTargetInstanceTransport = newHTTPTransport(10 * time.Minute)
getRemoteTierTargetInstanceTransport = NewHTTPTransportWithTimeout(10 * time.Minute)
})
opts := &minio.Options{
Creds: creds,

177
internal/http/transports.go Normal file
View File

@ -0,0 +1,177 @@
// Copyright (c) 2015-2022 MinIO, Inc.
//
// # This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package http
import (
"context"
"crypto/tls"
"crypto/x509"
"net"
"net/http"
"syscall"
"time"
"github.com/minio/pkg/certs"
"github.com/rs/dnscache"
)
// tlsClientSessionCacheSize is the cache size for client sessions.
var tlsClientSessionCacheSize = 100
// ConnSettings - contains connection settings.
type ConnSettings struct {
// If this is non-nil, DNSCache and DialTimeout are ignored.
DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
// Dial settings, used if DialContext is nil.
DNSCache *dnscache.Resolver
DialTimeout time.Duration
// TLS Settings
RootCAs *x509.CertPool
CipherSuites []uint16
CurvePreferences []tls.CurveID
// HTTP2
EnableHTTP2 bool
}
func (s ConnSettings) getDefaultTransport() *http.Transport {
dialContext := s.DialContext
if dialContext == nil {
dialContext = DialContextWithDNSCache(s.DNSCache, NewInternodeDialContext(s.DialTimeout))
}
tlsClientConfig := tls.Config{
RootCAs: s.RootCAs,
CipherSuites: s.CipherSuites,
CurvePreferences: s.CurvePreferences,
ClientSessionCache: tls.NewLRUClientSessionCache(tlsClientSessionCacheSize),
}
// For more details about various values used here refer
// https://golang.org/pkg/net/http/#Transport documentation
tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: dialContext,
MaxIdleConnsPerHost: 1024,
WriteBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
ReadBufferSize: 32 << 10, // 32KiB moving up from 4KiB default
IdleConnTimeout: 15 * time.Second,
ResponseHeaderTimeout: 15 * time.Minute, // Conservative timeout is the default (for MinIO internode)
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 10 * time.Second,
TLSClientConfig: &tlsClientConfig,
ForceAttemptHTTP2: s.EnableHTTP2,
// Go net/http automatically unzip if content-type is
// gzip disable this feature, as we are always interested
// in raw stream.
DisableCompression: true,
}
// https://github.com/golang/go/issues/23559
// https://github.com/golang/go/issues/42534
// https://github.com/golang/go/issues/43989
// https://github.com/golang/go/issues/33425
// https://github.com/golang/go/issues/29246
// if tlsConfig != nil {
// trhttp2, _ := http2.ConfigureTransports(tr)
// if trhttp2 != nil {
// // ReadIdleTimeout is the timeout after which a health check using ping
// // frame will be carried out if no frame is received on the
// // connection. 5 minutes is sufficient time for any idle connection.
// trhttp2.ReadIdleTimeout = 5 * time.Minute
// // PingTimeout is the timeout after which the connection will be closed
// // if a response to Ping is not received.
// trhttp2.PingTimeout = dialTimeout
// // DisableCompression, if true, prevents the Transport from
// // requesting compression with an "Accept-Encoding: gzip"
// trhttp2.DisableCompression = true
// }
// }
return tr
}
// NewInternodeHTTPTransport returns transport for internode MinIO connections.
func (s ConnSettings) NewInternodeHTTPTransport() func() http.RoundTripper {
tr := s.getDefaultTransport()
// Settings specific to internode requests.
tr.TLSHandshakeTimeout = 15 * time.Minute
tr.ExpectContinueTimeout = 15 * time.Minute
return func() http.RoundTripper {
return tr
}
}
// NewCustomHTTPProxyTransport is used only for proxied requests, specifically
// only supports HTTP/1.1
func (s ConnSettings) NewCustomHTTPProxyTransport() func() *http.Transport {
s.EnableHTTP2 = false
tr := s.getDefaultTransport()
// Settings specific to proxied requests.
tr.ResponseHeaderTimeout = 30 * time.Minute
return func() *http.Transport {
return tr
}
}
// NewHTTPTransportWithTimeout allows setting a timeout for response headers
func (s ConnSettings) NewHTTPTransportWithTimeout(timeout time.Duration) *http.Transport {
tr := s.getDefaultTransport()
// Settings specific to this transport.
tr.ResponseHeaderTimeout = timeout
return tr
}
// NewHTTPTransportWithClientCerts returns a new http configuration used for
// communicating with client cert authentication.
func (s ConnSettings) NewHTTPTransportWithClientCerts(ctx context.Context, clientCert, clientKey string) (*http.Transport, error) {
transport := s.NewHTTPTransportWithTimeout(1 * time.Minute)
if clientCert != "" && clientKey != "" {
c, err := certs.NewManager(ctx, clientCert, clientKey, tls.LoadX509KeyPair)
if err != nil {
return nil, err
}
if c != nil {
c.UpdateReloadDuration(10 * time.Second)
c.ReloadOnSignal(syscall.SIGHUP) // allow reloads upon SIGHUP
transport.TLSClientConfig.GetClientCertificate = c.GetClientCertificate
}
}
return transport, nil
}
// NewRemoteTargetHTTPTransport returns a new http configuration
// used while communicating with the remote replication targets.
func (s ConnSettings) NewRemoteTargetHTTPTransport() func() *http.Transport {
tr := s.getDefaultTransport()
tr.TLSHandshakeTimeout = 5 * time.Second
tr.ExpectContinueTimeout = 5 * time.Second
tr.ResponseHeaderTimeout = 0
return func() *http.Transport {
return tr
}
}