add configurable VRF interface and user-timeout (#17108)

This commit is contained in:
Harshavardhana 2023-05-03 14:12:25 -07:00 committed by GitHub
parent 90e2cc3d4c
commit 9571b0825e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 258 additions and 152 deletions

View File

@ -212,6 +212,7 @@ var (
globalTLSCerts *certs.Manager
globalHTTPServer *xhttp.Server
globalTCPOptions xhttp.TCPOptions
globalHTTPServerErrorCh = make(chan error)
globalOSSignalCh = make(chan os.Signal, 1)

View File

@ -207,19 +207,6 @@ func isHostIP(ipAddress string) bool {
return net.ParseIP(host) != nil
}
// checkPortAvailability - check if given host and port is already in use.
// Note: The check method tries to listen on given port and closes it.
// It is possible to have a disconnected client in this tiny window of time.
func checkPortAvailability(host, port string) (err error) {
l, err := net.Listen("tcp", net.JoinHostPort(host, port))
if err != nil {
return err
}
// As we are able to listen on this network, the port is not in use.
// Close the listener and continue check other networks.
return l.Close()
}
// extractHostPort - extracts host/port from many address formats
// such as, ":9000", "localhost:9000", "http://localhost:9000/"
func extractHostPort(hostAddr string) (string, string, error) {

View File

@ -20,9 +20,7 @@ package cmd
import (
"errors"
"fmt"
"net"
"reflect"
"runtime"
"testing"
"github.com/minio/minio-go/v7/pkg/set"
@ -180,61 +178,6 @@ func TestGetAPIEndpoints(t *testing.T) {
}
}
// Ask the kernel for a free open port.
func getFreePort() string {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
panic(err)
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
panic(err)
}
defer l.Close()
return fmt.Sprintf("%d", l.Addr().(*net.TCPAddr).Port)
}
// Tests for port availability logic written for server startup sequence.
func TestCheckPortAvailability(t *testing.T) {
// Make a port is not available.
port := getFreePort()
listener, err := net.Listen("tcp", net.JoinHostPort("", port))
if err != nil {
t.Fatalf("Unable to listen on port %v", port)
}
defer listener.Close()
testCases := []struct {
host string
port string
expectedErr error
}{
{"", port, fmt.Errorf("listen tcp :%v: bind: address already in use", port)},
{"127.0.0.1", port, fmt.Errorf("listen tcp 127.0.0.1:%v: bind: address already in use", port)},
{"", getFreePort(), nil},
}
for _, testCase := range testCases {
// On MS Windows and Mac, skip checking error case due to https://github.com/golang/go/issues/7598
if (runtime.GOOS == globalWindowsOSName || runtime.GOOS == globalMacOSName || runtime.GOOS == "solaris") && testCase.expectedErr != nil {
continue
}
err := checkPortAvailability(testCase.host, testCase.port)
switch {
case testCase.expectedErr == nil:
if err != nil {
t.Fatalf("error: expected = <nil>, got = %v", err)
}
case err == nil:
t.Fatalf("error: expected = %v, got = <nil>", testCase.expectedErr)
case testCase.expectedErr.Error() != err.Error():
t.Fatalf("error: expected = %v, got = %v", testCase.expectedErr, err)
}
}
}
func TestCheckLocalServerAddr(t *testing.T) {
testCases := []struct {
serverAddr string

View File

@ -107,6 +107,19 @@ var ServerFlags = []cli.Flag{
Value: 10 * time.Minute,
EnvVar: "MINIO_CONN_WRITE_DEADLINE",
},
cli.DurationFlag{
Name: "conn-user-timeout",
Usage: "custom TCP_USER_TIMEOUT for socket buffers",
Hidden: true,
Value: 10 * time.Minute,
EnvVar: "MINIO_CONN_USER_TIMEOUT",
},
cli.StringFlag{
Name: "interface",
Usage: "bind to right VRF device for MinIO services",
Hidden: true,
EnvVar: "MINIO_INTERFACE",
},
cli.StringSliceFlag{
Name: "ftp",
Usage: "enable and configure an FTP(Secure) server",
@ -264,11 +277,16 @@ func serverHandleCmdArgs(ctx *cli.Context) {
},
})
globalTCPOptions = xhttp.TCPOptions{
UserTimeout: int(ctx.Duration("conn-user-timeout").Milliseconds()),
Interface: ctx.String("interface"),
}
// On macOS, if a process already listens on LOCALIPADDR:PORT, net.Listen() falls back
// to IPv6 address ie minio will start listening on IPv6 address whereas another
// (non-)minio process is listening on IPv4 of given port.
// To avoid this error situation we check for port availability.
logger.FatalIf(checkPortAvailability(globalMinioHost, globalMinioPort), "Unable to start the server")
logger.FatalIf(xhttp.CheckPortAvailability(globalMinioHost, globalMinioPort, globalTCPOptions), "Unable to start the server")
globalIsErasure = (setupType == ErasureSetupType)
globalIsDistErasure = (setupType == DistErasureSetupType)
@ -570,7 +588,8 @@ func serverMain(ctx *cli.Context) {
UseIdleTimeout(ctx.Duration("idle-timeout")).
UseReadHeaderTimeout(ctx.Duration("read-header-timeout")).
UseBaseContext(GlobalContext).
UseCustomLogger(log.New(io.Discard, "", 0)) // Turn-off random logging by Go stdlib
UseCustomLogger(log.New(io.Discard, "", 0)). // Turn-off random logging by Go stdlib
UseTCPOptions(globalTCPOptions)
go func() {
globalHTTPServerErrorCh <- httpServer.Start(GlobalContext)

View File

@ -403,7 +403,7 @@ func parseReleaseData(data string) (sha256Sum []byte, releaseTime time.Time, rel
func getUpdateTransport(timeout time.Duration) http.RoundTripper {
var updateTransport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: xhttp.NewCustomDialContext(timeout),
DialContext: xhttp.NewCustomDialContext(timeout, globalTCPOptions),
IdleConnTimeout: timeout,
TLSHandshakeTimeout: timeout,
ExpectContinueTimeout: timeout,

View File

@ -574,6 +574,7 @@ func GetDefaultConnSettings() xhttp.ConnSettings {
DNSCache: globalDNSCache,
DialTimeout: rest.DefaultTimeout,
RootCAs: globalRootCAs,
TCPOptions: globalTCPOptions,
}
}
@ -587,6 +588,7 @@ func NewInternodeHTTPTransport() func() http.RoundTripper {
CipherSuites: fips.TLSCiphers(),
CurvePreferences: fips.TLSCurveIDs(),
EnableHTTP2: false,
TCPOptions: globalTCPOptions,
}.NewInternodeHTTPTransport()
}
@ -600,6 +602,7 @@ func NewCustomHTTPProxyTransport() func() *http.Transport {
CipherSuites: fips.TLSCiphers(),
CurvePreferences: fips.TLSCurveIDs(),
EnableHTTP2: false,
TCPOptions: globalTCPOptions,
}.NewCustomHTTPProxyTransport()
}
@ -610,6 +613,7 @@ func NewHTTPTransportWithClientCerts(clientCert, clientKey string) *http.Transpo
DNSCache: globalDNSCache,
DialTimeout: defaultDialTimeout,
RootCAs: globalRootCAs,
TCPOptions: globalTCPOptions,
EnableHTTP2: false,
}
@ -643,6 +647,7 @@ func NewHTTPTransportWithTimeout(timeout time.Duration) *http.Transport {
DNSCache: globalDNSCache,
DialTimeout: defaultDialTimeout,
RootCAs: globalRootCAs,
TCPOptions: globalTCPOptions,
EnableHTTP2: false,
}.NewHTTPTransportWithTimeout(timeout)
}
@ -677,6 +682,7 @@ func NewRemoteTargetHTTPTransport() func() *http.Transport {
DialContext: newCustomDialContext(),
DNSCache: globalDNSCache,
RootCAs: globalRootCAs,
TCPOptions: globalTCPOptions,
EnableHTTP2: false,
}.NewRemoteTargetHTTPTransport()
}

View File

@ -0,0 +1,58 @@
//go:build linux
// +build linux
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package http
import (
"context"
"net"
"syscall"
"time"
)
// CheckPortAvailability - check if given host and port is already in use.
// Note: The check method tries to listen on given port and closes it.
// It is possible to have a disconnected client in this tiny window of time.
func CheckPortAvailability(host, port string, opts TCPOptions) (err error) {
lc := &net.ListenConfig{
Control: func(network, address string, c syscall.RawConn) error {
c.Control(func(fdPtr uintptr) {
if opts.Interface != "" {
// When interface is specified look for specifically port availability on
// the specified interface if any.
_ = syscall.SetsockoptString(int(fdPtr), syscall.SOL_SOCKET, syscall.SO_BINDTODEVICE, opts.Interface)
}
})
return nil
},
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
l, err := lc.Listen(ctx, "tcp", net.JoinHostPort(host, port))
if err != nil {
return err
}
// As we are able to listen on this network, the port is not in use.
// Close the listener and continue check other networks.
return l.Close()
}

View File

@ -1,7 +1,7 @@
//go:build linux || darwin || dragonfly || freebsd || netbsd || openbsd || rumprun
// +build linux darwin dragonfly freebsd netbsd openbsd rumprun
//go:build !linux
// +build !linux
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@ -21,10 +21,26 @@
package http
import (
"context"
"net"
"time"
)
// Unix listener with special TCP options.
var listenCfg = net.ListenConfig{
Control: setTCPParameters,
// CheckPortAvailability - check if given host and port is already in use.
// Note: The check method tries to listen on given port and closes it.
// It is possible to have a disconnected client in this tiny window of time.
func CheckPortAvailability(host, port string, opts TCPOptions) (err error) {
lc := &net.ListenConfig{}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
l, err := lc.Listen(ctx, "tcp", net.JoinHostPort(host, port))
if err != nil {
return err
}
// As we are able to listen on this network, the port is not in use.
// Close the listener and continue check other networks.
return l.Close()
}

View File

@ -0,0 +1,64 @@
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package http
import (
"fmt"
"net"
"runtime"
"strconv"
"testing"
)
// Tests for port availability logic written for server startup sequence.
func TestCheckPortAvailability(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip()
}
l, err := net.Listen("tcp", "localhost:0") // ask kernel for a free port.
if err != nil {
t.Fatal(err)
}
defer l.Close()
port := l.Addr().(*net.TCPAddr).Port
testCases := []struct {
host string
port int
expectedErr error
}{
{"", port, fmt.Errorf("listen tcp :%v: bind: address already in use", port)},
{"127.0.0.1", port, fmt.Errorf("listen tcp 127.0.0.1:%v: bind: address already in use", port)},
}
for _, testCase := range testCases {
err := CheckPortAvailability(testCase.host, strconv.Itoa(testCase.port), TCPOptions{})
switch {
case testCase.expectedErr == nil:
if err != nil {
t.Fatalf("error: expected = <nil>, got = %v", err)
}
case err == nil:
t.Fatalf("error: expected = %v, got = <nil>", testCase.expectedErr)
case testCase.expectedErr.Error() != err.Error():
t.Fatalf("error: expected = %v, got = %v", testCase.expectedErr, err)
}
}
}

View File

@ -1,7 +1,7 @@
//go:build linux
// +build linux
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@ -29,7 +29,8 @@ import (
"golang.org/x/sys/unix"
)
func setTCPParameters(network, address string, c syscall.RawConn) error {
func setTCPParametersFn(opts TCPOptions) func(network, address string, c syscall.RawConn) error {
return func(network, address string, c syscall.RawConn) error {
c.Control(func(fdPtr uintptr) {
// got socket file descriptor to set parameters.
fd := int(fdPtr)
@ -69,31 +70,40 @@ func setTCPParameters(network, address string, c syscall.RawConn) error {
// https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die/
// This is a sensitive configuration, it is better to set it to high values, > 60 secs since it can
// affect clients reading data with a very slow pace (disappropriate with socket buffer sizes)
_ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_USER_TIMEOUT, 10*60*1000)
_ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_USER_TIMEOUT, opts.UserTimeout)
if opts.Interface != "" {
// Create socket on specific vrf device.
// To catch all kinds of special cases this filters specifically for loopback networks.
if ip := net.ParseIP(address); ip != nil && !ip.IsLoopback() {
_ = syscall.SetsockoptString(fd, syscall.SOL_SOCKET, syscall.SO_BINDTODEVICE, opts.Interface)
}
}
})
return nil
}
}
// DialContext is a function to make custom Dial for internode communications
type DialContext func(ctx context.Context, network, address string) (net.Conn, error)
// NewInternodeDialContext setups a custom dialer for internode communication
func NewInternodeDialContext(dialTimeout time.Duration) DialContext {
func NewInternodeDialContext(dialTimeout time.Duration, opts TCPOptions) DialContext {
return func(ctx context.Context, network, addr string) (net.Conn, error) {
dialer := &net.Dialer{
Timeout: dialTimeout,
Control: setTCPParameters,
Control: setTCPParametersFn(opts),
}
return dialer.DialContext(ctx, network, addr)
}
}
// NewCustomDialContext setups a custom dialer for any external communication and proxies.
func NewCustomDialContext(dialTimeout time.Duration) DialContext {
func NewCustomDialContext(dialTimeout time.Duration, opts TCPOptions) DialContext {
return func(ctx context.Context, network, addr string) (net.Conn, error) {
dialer := &net.Dialer{
Timeout: dialTimeout,
Control: setTCPParameters,
Control: setTCPParametersFn(opts),
}
return dialer.DialContext(ctx, network, addr)
}

View File

@ -30,8 +30,10 @@ import (
// TODO: if possible implement for non-linux platforms, not a priority at the moment
//
//nolint:unused
func setTCPParameters(string, string, syscall.RawConn) error {
func setTCPParametersFn(opts TCPOptions) func(network, address string, c syscall.RawConn) error {
return func(network, address string, c syscall.RawConn) error {
return nil
}
}
// DialContext is a function to make custom Dial for internode communications
@ -41,7 +43,7 @@ type DialContext func(ctx context.Context, network, address string) (net.Conn, e
var NewInternodeDialContext = NewCustomDialContext
// NewCustomDialContext configures a custom dialer for internode communications
func NewCustomDialContext(dialTimeout time.Duration) DialContext {
func NewCustomDialContext(dialTimeout time.Duration, _ TCPOptions) DialContext {
return func(ctx context.Context, network, addr string) (net.Conn, error) {
dialer := &net.Dialer{
Timeout: dialTimeout,

View File

@ -1,26 +0,0 @@
//go:build windows || plan9 || solaris
// +build windows plan9 solaris
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package http
import "net"
// Windows, plan9 specific listener.
var listenCfg = net.ListenConfig{}

View File

@ -118,11 +118,17 @@ func (listener *httpListener) Addrs() (addrs []net.Addr) {
return addrs
}
// TCPOptions specify customizable TCP optimizations on raw socket
type TCPOptions struct {
UserTimeout int // this value is expected to be in milliseconds
Interface string // this is a VRF device passed via `--interface` flag
}
// newHTTPListener - creates new httpListener object which is interface compatible to net.Listener.
// httpListener is capable to
// * listen to multiple addresses
// * controls incoming connections only doing HTTP protocol
func newHTTPListener(ctx context.Context, serverAddrs []string) (listener *httpListener, err error) {
func newHTTPListener(ctx context.Context, serverAddrs []string, opts TCPOptions) (listener *httpListener, err error) {
var tcpListeners []*net.TCPListener
// Close all opened listeners on error
@ -147,11 +153,16 @@ func newHTTPListener(ctx context.Context, serverAddrs []string) (listener *httpL
}
}
// Silently ignore failure to bind on DNS cached ipv6 loopback iff user specifies "localhost"
// Unix listener with special TCP options.
listenCfg := net.ListenConfig{
Control: setTCPParametersFn(opts),
}
// Silently ignore failure to bind on DNS cached ipv6 loopback if user specifies "localhost"
for _, serverAddr := range serverAddrs {
var l net.Listener
if l, err = listenCfg.Listen(ctx, "tcp", serverAddr); err != nil {
if isLocalhost && strings.HasPrefix(serverAddr, "[::1]") {
if isLocalhost && strings.HasPrefix(serverAddr, "[::1") {
continue
}
return nil, err
@ -160,7 +171,7 @@ func newHTTPListener(ctx context.Context, serverAddrs []string) (listener *httpL
tcpListener, ok := l.(*net.TCPListener)
if !ok {
err = fmt.Errorf("unexpected listener type found %v, expected net.TCPListener", l)
if isLocalhost && strings.HasPrefix(serverAddr, "[::1]") {
if isLocalhost && strings.HasPrefix(serverAddr, "[::1") {
continue
}
return nil, err

View File

@ -153,6 +153,7 @@ func TestNewHTTPListener(t *testing.T) {
for _, testCase := range testCases {
listener, err := newHTTPListener(context.Background(),
testCase.serverAddrs,
TCPOptions{},
)
if !testCase.expectedErr {
if err != nil {
@ -189,6 +190,7 @@ func TestHTTPListenerStartClose(t *testing.T) {
for i, testCase := range testCases {
listener, err := newHTTPListener(context.Background(),
testCase.serverAddrs,
TCPOptions{},
)
if err != nil {
if strings.Contains(err.Error(), "The requested address is not valid in its context") {
@ -239,6 +241,7 @@ func TestHTTPListenerAddr(t *testing.T) {
for i, testCase := range testCases {
listener, err := newHTTPListener(context.Background(),
testCase.serverAddrs,
TCPOptions{},
)
if err != nil {
if strings.Contains(err.Error(), "The requested address is not valid in its context") {
@ -286,6 +289,7 @@ func TestHTTPListenerAddrs(t *testing.T) {
for i, testCase := range testCases {
listener, err := newHTTPListener(context.Background(),
testCase.serverAddrs,
TCPOptions{},
)
if err != nil {
if strings.Contains(err.Error(), "The requested address is not valid in its context") {

View File

@ -61,6 +61,7 @@ const (
type Server struct {
http.Server
Addrs []string // addresses on which the server listens for new connection.
TCPOptions TCPOptions // all the configurable TCP conn specific configurable options.
ShutdownTimeout time.Duration // timeout used for graceful server shutdown.
listenerMutex sync.Mutex // to guard 'listener' field.
listener *httpListener // HTTP listener for all 'Addrs' field.
@ -87,6 +88,7 @@ func (srv *Server) Start(ctx context.Context) (err error) {
listener, err = newHTTPListener(
ctx,
srv.Addrs,
srv.TCPOptions,
)
if err != nil {
return err
@ -213,6 +215,12 @@ func (srv *Server) UseCustomLogger(l *log.Logger) *Server {
return srv
}
// UseTCPOptions use custom TCP options on raw socket
func (srv *Server) UseTCPOptions(opts TCPOptions) *Server {
srv.TCPOptions = opts
return srv
}
// NewServer - creates new HTTP server using given arguments.
func NewServer(addrs []string) *Server {
httpServer := &Server{

View File

@ -49,12 +49,15 @@ type ConnSettings struct {
// HTTP2
EnableHTTP2 bool
// TCP Options
TCPOptions TCPOptions
}
func (s ConnSettings) getDefaultTransport() *http.Transport {
dialContext := s.DialContext
if dialContext == nil {
dialContext = DialContextWithDNSCache(s.DNSCache, NewInternodeDialContext(s.DialTimeout))
dialContext = DialContextWithDNSCache(s.DNSCache, NewInternodeDialContext(s.DialTimeout, s.TCPOptions))
}
tlsClientConfig := tls.Config{