From f3a52cc195418875c8200c2106a4bd2a58ceac58 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 23 Apr 2024 21:08:47 -0700 Subject: [PATCH] simplify listener implementation setup customizations in right place (#19589) --- cmd/common-main.go | 2 + cmd/globals.go | 6 +-- cmd/server-main.go | 18 ++++++-- cmd/update.go | 2 +- cmd/utils.go | 25 ----------- internal/http/dial_linux.go | 45 ++++++++++---------- internal/http/dial_others.go | 7 +-- internal/http/listener.go | 82 ++++++++++++++++-------------------- internal/http/transports.go | 4 +- 9 files changed, 84 insertions(+), 107 deletions(-) diff --git a/cmd/common-main.go b/cmd/common-main.go index 440928f42..7c29a92d5 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -401,6 +401,8 @@ func buildServerCtxt(ctx *cli.Context, ctxt *serverCtxt) (err error) { ctxt.ConnWriteDeadline = ctx.Duration("conn-write-deadline") ctxt.ConnClientReadDeadline = ctx.Duration("conn-client-read-deadline") ctxt.ConnClientWriteDeadline = ctx.Duration("conn-client-write-deadline") + ctxt.SendBufSize = ctx.Int("send-buf-size") + ctxt.RecvBufSize = ctx.Int("recv-buf-size") ctxt.ShutdownTimeout = ctx.Duration("shutdown-timeout") ctxt.IdleTimeout = ctx.Duration("idle-timeout") diff --git a/cmd/globals.go b/cmd/globals.go index 3dfa37ee0..c52d9eb5f 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -171,7 +171,8 @@ type serverCtxt struct { ReadHeaderTimeout time.Duration MaxIdleConnsPerHost int - CrossDomainXML string + SendBufSize, RecvBufSize int + CrossDomainXML string // The layout of disks as interpreted Layout disksLayout } @@ -446,9 +447,6 @@ var ( subnetAdminPublicKey = []byte("-----BEGIN PUBLIC KEY-----\nMIIBCgKCAQEAyC+ol5v0FP+QcsR6d1KypR/063FInmNEFsFzbEwlHQyEQN3O7kNI\nwVDN1vqp1wDmJYmv4VZGRGzfFw1q+QV7K1TnysrEjrqpVxfxzDQCoUadAp8IxLLc\ns2fjyDNxnZjoC6fTID9C0khKnEa5fPZZc3Ihci9SiCGkPmyUyCGVSxWXIKqL2Lrj\nyDc0pGeEhWeEPqw6q8X2jvTC246tlzqpDeNsPbcv2KblXRcKniQNbBrizT37CKHQ\nM6hc9kugrZbFuo8U5/4RQvZPJnx/DVjLDyoKo2uzuVQs4s+iBrA5sSSLp8rPED/3\n6DgWw3e244Dxtrg972dIT1IOqgn7KUJzVQIDAQAB\n-----END PUBLIC KEY-----") subnetAdminPublicKeyDev = []byte("-----BEGIN PUBLIC KEY-----\nMIIBCgKCAQEArhQYXQd6zI4uagtVfthAPOt6i4AYHnEWCoNeAovM4MNl42I9uQFh\n3VHkbWj9Gpx9ghf6PgRgK+8FcFvy+StmGcXpDCiFywXX24uNhcZjscX1C4Esk0BW\nidfI2eXYkOlymD4lcK70SVgJvC693Qa7Z3FE1KU8Nfv2bkxEE4bzOkojX9t6a3+J\nR8X6Z2U8EMlH1qxJPgiPogELhWP0qf2Lq7GwSAflo1Tj/ytxvD12WrnE0Rrj/8yP\nSnp7TbYm91KocKMExlmvx3l2XPLxeU8nf9U0U+KOmorejD3MDMEPF+tlk9LB3JWP\nZqYYe38rfALVTn4RVJriUcNOoEpEyC0WEwIDAQAB\n-----END PUBLIC KEY-----") - globalConnReadDeadline time.Duration - globalConnWriteDeadline time.Duration - // dynamic sleeper to avoid thundering herd for trash folder expunge routine deleteCleanupSleeper = newDynamicSleeper(5, 25*time.Millisecond, false) diff --git a/cmd/server-main.go b/cmd/server-main.go index c4f83c77b..7acb8c71e 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -34,6 +34,7 @@ import ( "time" "github.com/coreos/go-systemd/v22/daemon" + "github.com/dustin/go-humanize" "github.com/minio/cli" "github.com/minio/madmin-go/v3" "github.com/minio/minio-go/v7" @@ -178,6 +179,18 @@ var ServerFlags = []cli.Flag{ Hidden: true, EnvVar: "MINIO_MEMLIMIT", }, + cli.IntFlag{ + Name: "send-buf-size", + Value: 4 * humanize.MiByte, + EnvVar: "MINIO_SEND_BUF_SIZE", + Hidden: true, + }, + cli.IntFlag{ + Name: "recv-buf-size", + Value: 4 * humanize.MiByte, + EnvVar: "MINIO_RECV_BUF_SIZE", + Hidden: true, + }, } var gatewayCmd = cli.Command{ @@ -367,6 +380,8 @@ func serverHandleCmdArgs(ctxt serverCtxt) { ClientReadTimeout: ctxt.ConnClientReadDeadline, ClientWriteTimeout: ctxt.ConnClientWriteDeadline, Interface: ctxt.Interface, + SendBufSize: ctxt.SendBufSize, + RecvBufSize: ctxt.RecvBufSize, } // allow transport to be HTTP/1.1 for proxying. @@ -388,9 +403,6 @@ func serverHandleCmdArgs(ctxt serverCtxt) { // (non-)minio process is listening on IPv4 of given port. // To avoid this error situation we check for port availability. logger.FatalIf(xhttp.CheckPortAvailability(globalMinioHost, globalMinioPort, globalTCPOptions), "Unable to start the server") - - globalConnReadDeadline = ctxt.ConnReadDeadline - globalConnWriteDeadline = ctxt.ConnWriteDeadline } func initAllSubsystems(ctx context.Context) { diff --git a/cmd/update.go b/cmd/update.go index 08b445847..481350026 100644 --- a/cmd/update.go +++ b/cmd/update.go @@ -420,7 +420,7 @@ func parseReleaseData(data string) (sha256Sum []byte, releaseTime time.Time, rel func getUpdateTransport(timeout time.Duration) http.RoundTripper { var updateTransport http.RoundTripper = &http.Transport{ Proxy: http.ProxyFromEnvironment, - DialContext: xhttp.NewCustomDialContext(timeout, globalTCPOptions), + DialContext: xhttp.NewInternodeDialContext(timeout, globalTCPOptions), IdleConnTimeout: timeout, TLSHandshakeTimeout: timeout, ExpectContinueTimeout: timeout, diff --git a/cmd/utils.go b/cmd/utils.go index aa1ad58e3..531272df6 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -26,7 +26,6 @@ import ( "errors" "fmt" "io" - "net" "net/http" "net/url" "os" @@ -48,7 +47,6 @@ import ( "github.com/minio/minio/internal/config" "github.com/minio/minio/internal/config/api" xtls "github.com/minio/minio/internal/config/identity/tls" - "github.com/minio/minio/internal/deadlineconn" "github.com/minio/minio/internal/fips" "github.com/minio/minio/internal/handlers" "github.com/minio/minio/internal/hash" @@ -636,7 +634,6 @@ const defaultDialTimeout = 5 * time.Second // NewHTTPTransportWithTimeout allows setting a timeout. func NewHTTPTransportWithTimeout(timeout time.Duration) *http.Transport { return xhttp.ConnSettings{ - DialContext: newCustomDialContext(), LookupHost: globalDNSCache.LookupHost, DialTimeout: defaultDialTimeout, RootCAs: globalRootCAs, @@ -647,32 +644,10 @@ func NewHTTPTransportWithTimeout(timeout time.Duration) *http.Transport { }.NewHTTPTransportWithTimeout(timeout) } -// newCustomDialContext setups a custom dialer for any external communication and proxies. -func newCustomDialContext() xhttp.DialContext { - return func(ctx context.Context, network, addr string) (net.Conn, error) { - dialer := &net.Dialer{ - Timeout: 15 * time.Second, - KeepAlive: 30 * time.Second, - } - - conn, err := dialer.DialContext(ctx, network, addr) - if err != nil { - return nil, err - } - - dconn := deadlineconn.New(conn). - WithReadDeadline(globalConnReadDeadline). - WithWriteDeadline(globalConnWriteDeadline) - - return dconn, nil - } -} - // NewRemoteTargetHTTPTransport returns a new http configuration // used while communicating with the remote replication targets. func NewRemoteTargetHTTPTransport(insecure bool) func() *http.Transport { return xhttp.ConnSettings{ - DialContext: newCustomDialContext(), LookupHost: globalDNSCache.LookupHost, RootCAs: globalRootCAs, CipherSuites: fips.TLSCiphersBackwardCompatible(), diff --git a/internal/http/dial_linux.go b/internal/http/dial_linux.go index fd2fb61c1..dec72f90d 100644 --- a/internal/http/dial_linux.go +++ b/internal/http/dial_linux.go @@ -39,9 +39,16 @@ func setTCPParametersFn(opts TCPOptions) func(network, address string, c syscall _ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEPORT, 1) + { + // Enable big buffers + _ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_SNDBUF, opts.SendBufSize) + + _ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_RCVBUF, opts.RecvBufSize) + } + // Enable TCP open - // https://lwn.net/Articles/508865/ - 16k queue size. - _ = syscall.SetsockoptInt(fd, syscall.SOL_TCP, unix.TCP_FASTOPEN, 16*1024) + // https://lwn.net/Articles/508865/ - 32k queue size. + _ = syscall.SetsockoptInt(fd, syscall.SOL_TCP, unix.TCP_FASTOPEN, 32*1024) // Enable TCP fast connect // TCPFastOpenConnect sets the underlying socket to use @@ -53,17 +60,22 @@ func setTCPParametersFn(opts TCPOptions) func(network, address string, c syscall // "Set TCP_QUICKACK. If you find a case where that makes things worse, let me know." _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_QUICKACK, 1) - // The time (in seconds) the connection needs to remain idle before - // TCP starts sending keepalive probes - _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, 15) + /// Enable keep-alive + { + _ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1) - // Number of probes. - // ~ cat /proc/sys/net/ipv4/tcp_keepalive_probes (defaults to 9, we reduce it to 5) - _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPCNT, 5) + // The time (in seconds) the connection needs to remain idle before + // TCP starts sending keepalive probes + _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, 15) - // Wait time after successful probe in seconds. - // ~ cat /proc/sys/net/ipv4/tcp_keepalive_intvl (defaults to 75 secs, we reduce it to 15 secs) - _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, 15) + // Number of probes. + // ~ cat /proc/sys/net/ipv4/tcp_keepalive_probes (defaults to 9, we reduce it to 5) + _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPCNT, 5) + + // Wait time after successful probe in seconds. + // ~ cat /proc/sys/net/ipv4/tcp_keepalive_intvl (defaults to 75 secs, we reduce it to 15 secs) + _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, 15) + } // Set tcp user timeout in addition to the keep-alive - tcp-keepalive is not enough to close a socket // with dead end because tcp-keepalive is not fired when there is data in the socket buffer. @@ -100,14 +112,3 @@ func NewInternodeDialContext(dialTimeout time.Duration, opts TCPOptions) DialCon return dialer.DialContext(ctx, network, addr) } } - -// NewCustomDialContext setups a custom dialer for any external communication and proxies. -func NewCustomDialContext(dialTimeout time.Duration, opts TCPOptions) DialContext { - return func(ctx context.Context, network, addr string) (net.Conn, error) { - dialer := &net.Dialer{ - Timeout: dialTimeout, - Control: setTCPParametersFn(opts), - } - return dialer.DialContext(ctx, network, addr) - } -} diff --git a/internal/http/dial_others.go b/internal/http/dial_others.go index ccbcd24c7..fe548867f 100644 --- a/internal/http/dial_others.go +++ b/internal/http/dial_others.go @@ -39,11 +39,8 @@ func setTCPParametersFn(opts TCPOptions) func(network, address string, c syscall // DialContext is a function to make custom Dial for internode communications type DialContext func(ctx context.Context, network, address string) (net.Conn, error) -// NewInternodeDialContext setups a custom dialer for internode communication -var NewInternodeDialContext = NewCustomDialContext - -// NewCustomDialContext configures a custom dialer for internode communications -func NewCustomDialContext(dialTimeout time.Duration, _ TCPOptions) DialContext { +// NewInternodeDialContext configures a custom dialer for internode communications +func NewInternodeDialContext(dialTimeout time.Duration, _ TCPOptions) DialContext { return func(ctx context.Context, network, addr string) (net.Conn, error) { dialer := &net.Dialer{ Timeout: dialTimeout, diff --git a/internal/http/listener.go b/internal/http/listener.go index 7e818ad3e..a16e0076a 100644 --- a/internal/http/listener.go +++ b/internal/http/listener.go @@ -35,11 +35,11 @@ type acceptResult struct { // httpListener - HTTP listener capable of handling multiple server addresses. type httpListener struct { - opts TCPOptions - tcpListeners []*net.TCPListener // underlying TCP listeners. - acceptCh chan acceptResult // channel where all TCP listeners write accepted connection. - ctx context.Context - ctxCanceler context.CancelFunc + opts TCPOptions + listeners []net.Listener // underlying TCP listeners. + acceptCh chan acceptResult // channel where all TCP listeners write accepted connection. + ctx context.Context + ctxCanceler context.CancelFunc } // start - starts separate goroutine for each TCP listener. A valid new connection is passed to httpListener.acceptCh. @@ -57,18 +57,15 @@ func (listener *httpListener) start() { } // Closure to handle TCPListener until done channel is closed. - handleListener := func(idx int, tcpListener *net.TCPListener) { + handleListener := func(idx int, listener net.Listener) { for { - tcpConn, err := tcpListener.AcceptTCP() - if tcpConn != nil { - tcpConn.SetKeepAlive(true) - } - send(acceptResult{tcpConn, err, idx}) + conn, err := listener.Accept() + send(acceptResult{conn, err, idx}) } } // Start separate goroutine for each TCP listener to handle connection. - for idx, tcpListener := range listener.tcpListeners { + for idx, tcpListener := range listener.listeners { go handleListener(idx, tcpListener) } } @@ -91,8 +88,8 @@ func (listener *httpListener) Accept() (conn net.Conn, err error) { func (listener *httpListener) Close() (err error) { listener.ctxCanceler() - for i := range listener.tcpListeners { - listener.tcpListeners[i].Close() + for i := range listener.listeners { + listener.listeners[i].Close() } return nil @@ -100,8 +97,8 @@ func (listener *httpListener) Close() (err error) { // Addr - net.Listener interface compatible method returns net.Addr. In case of multiple TCP listeners, it returns '0.0.0.0' as IP address. func (listener *httpListener) Addr() (addr net.Addr) { - addr = listener.tcpListeners[0].Addr() - if len(listener.tcpListeners) == 1 { + addr = listener.listeners[0].Addr() + if len(listener.listeners) == 1 { return addr } @@ -116,8 +113,8 @@ func (listener *httpListener) Addr() (addr net.Addr) { // Addrs - returns all address information of TCP listeners. func (listener *httpListener) Addrs() (addrs []net.Addr) { - for i := range listener.tcpListeners { - addrs = append(addrs, listener.tcpListeners[i].Addr()) + for i := range listener.listeners { + addrs = append(addrs, listener.listeners[i].Addr()) } return addrs @@ -125,11 +122,16 @@ func (listener *httpListener) Addrs() (addrs []net.Addr) { // TCPOptions specify customizable TCP optimizations on raw socket type TCPOptions struct { - UserTimeout int // this value is expected to be in milliseconds - ClientReadTimeout time.Duration // When the net.Conn is idle for more than ReadTimeout duration, we close the connection on the client proactively. - ClientWriteTimeout time.Duration // When the net.Conn is idle for more than WriteTimeout duration, we close the connection on the client proactively. - Interface string // this is a VRF device passed via `--interface` flag - Trace func(msg string) // Trace when starting. + UserTimeout int // this value is expected to be in milliseconds + // When the net.Conn is idle for more than ReadTimeout duration, we close the connection on the client proactively. + ClientReadTimeout time.Duration + // When the net.Conn is idle for more than WriteTimeout duration, we close the connection on the client proactively. + ClientWriteTimeout time.Duration + + SendBufSize int // SO_SNDBUF size for the socket connection, NOTE: this sets server and client connection + RecvBufSize int // SO_RECVBUF size for the socket connection, NOTE: this sets server and client connection + Interface string // This is a VRF device passed via `--interface` flag + Trace func(msg string) // Trace when starting. } // newHTTPListener - creates new httpListener object which is interface compatible to net.Listener. @@ -137,7 +139,7 @@ type TCPOptions struct { // * listen to multiple addresses // * controls incoming connections only doing HTTP protocol func newHTTPListener(ctx context.Context, serverAddrs []string, opts TCPOptions) (listener *httpListener, listenErrs []error) { - tcpListeners := make([]*net.TCPListener, 0, len(serverAddrs)) + listeners := make([]net.Listener, 0, len(serverAddrs)) listenErrs = make([]error, len(serverAddrs)) // Unix listener with special TCP options. @@ -146,46 +148,36 @@ func newHTTPListener(ctx context.Context, serverAddrs []string, opts TCPOptions) } for i, serverAddr := range serverAddrs { - var ( - l net.Listener - e error - ) - if l, e = listenCfg.Listen(ctx, "tcp", serverAddr); e != nil { + l, e := listenCfg.Listen(ctx, "tcp", serverAddr) + if e != nil { if opts.Trace != nil { - opts.Trace(fmt.Sprint("listenCfg.Listen: ", e.Error())) + opts.Trace(fmt.Sprint("listenCfg.Listen: ", e)) } listenErrs[i] = e continue } - tcpListener, ok := l.(*net.TCPListener) - if !ok { - listenErrs[i] = fmt.Errorf("unexpected listener type found %v, expected net.TCPListener", l) - if opts.Trace != nil { - opts.Trace(fmt.Sprint("net.TCPListener: ", listenErrs[i].Error())) - } - continue - } if opts.Trace != nil { - opts.Trace(fmt.Sprint("adding listener to ", tcpListener.Addr())) + opts.Trace(fmt.Sprint("adding listener to ", l.Addr())) } - tcpListeners = append(tcpListeners, tcpListener) + + listeners = append(listeners, l) } - if len(tcpListeners) == 0 { + if len(listeners) == 0 { // No listeners initialized, no need to continue return } listener = &httpListener{ - tcpListeners: tcpListeners, - acceptCh: make(chan acceptResult, len(tcpListeners)), - opts: opts, + listeners: listeners, + acceptCh: make(chan acceptResult, len(listeners)), + opts: opts, } listener.ctx, listener.ctxCanceler = context.WithCancel(ctx) if opts.Trace != nil { - opts.Trace(fmt.Sprint("opening ", len(listener.tcpListeners), " listeners")) + opts.Trace(fmt.Sprint("opening ", len(listener.listeners), " listeners")) } listener.start() diff --git a/internal/http/transports.go b/internal/http/transports.go index a2da2dbbb..1c3fe5c76 100644 --- a/internal/http/transports.go +++ b/internal/http/transports.go @@ -72,8 +72,8 @@ func (s ConnSettings) getDefaultTransport(maxIdleConnsPerHost int) *http.Transpo Proxy: http.ProxyFromEnvironment, DialContext: dialContext, MaxIdleConnsPerHost: maxIdleConnsPerHost, - WriteBufferSize: 32 << 10, // 32KiB moving up from 4KiB default - ReadBufferSize: 32 << 10, // 32KiB moving up from 4KiB default + WriteBufferSize: 64 << 10, // 64KiB moving up from 4KiB default + ReadBufferSize: 64 << 10, // 64KiB moving up from 4KiB default IdleConnTimeout: 15 * time.Second, ResponseHeaderTimeout: 15 * time.Minute, // Conservative timeout is the default (for MinIO internode) TLSHandshakeTimeout: 10 * time.Second,