diff --git a/cmd/common-main.go b/cmd/common-main.go index 66fe5a487..2c13d2917 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -51,7 +51,6 @@ import ( "github.com/minio/minio/internal/color" "github.com/minio/minio/internal/config" "github.com/minio/minio/internal/handlers" - xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/kms" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/certs" @@ -59,6 +58,7 @@ import ( "github.com/minio/pkg/ellipses" "github.com/minio/pkg/env" xnet "github.com/minio/pkg/net" + "github.com/rs/dnscache" ) // serverDebugLog will enable debug printing @@ -71,17 +71,34 @@ func init() { logger.Init(GOPATH, GOROOT) logger.RegisterError(config.FmtError) - if IsKubernetes() || IsDocker() || IsBOSH() || IsDCOS() || IsPCFTile() { - // 30 seconds matches the orchestrator DNS TTLs, have - // a 5 second timeout to lookup from DNS servers. - globalDNSCache = xhttp.NewDNSCache(30*time.Second, 5*time.Second, logger.LogOnceIf) - } else { - // On bare-metals DNS do not change often, so it is - // safe to assume a higher timeout upto 10 minutes. - globalDNSCache = xhttp.NewDNSCache(10*time.Minute, 5*time.Second, logger.LogOnceIf) - } initGlobalContext() + options := dnscache.ResolverRefreshOptions{ + ClearUnused: true, + PersistOnFailure: false, + } + + // Call to refresh will refresh names in cache. If you pass true, it will also + // remove cached names not looked up since the last call to Refresh. It is a good idea + // to call this method on a regular interval. + go func() { + var t *time.Ticker + if IsKubernetes() || IsDocker() || IsBOSH() || IsDCOS() || IsPCFTile() { + t = time.NewTicker(1 * time.Minute) + } else { + t = time.NewTicker(10 * time.Minute) + } + defer t.Stop() + for { + select { + case <-t.C: + globalDNSCache.RefreshWithOptions(options) + case <-GlobalContext.Done(): + return + } + } + }() + globalForwarder = handlers.NewForwarder(&handlers.Forwarder{ PassHost: true, RoundTripper: newGatewayHTTPTransport(1 * time.Hour), diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index 1190572a7..d969a88d9 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -154,8 +154,6 @@ func ValidateGatewayArguments(serverAddr, endpointAddr string) error { // StartGateway - handler for 'minio gateway '. func StartGateway(ctx *cli.Context, gw Gateway) { - defer globalDNSCache.Stop() - signal.Notify(globalOSSignalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT) go handleSignals() @@ -268,7 +266,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) { return GlobalContext } go func() { - globalHTTPServerErrorCh <- httpServer.Start() + globalHTTPServerErrorCh <- httpServer.Start(GlobalContext) }() globalObjLayerMutex.Lock() diff --git a/cmd/globals.go b/cmd/globals.go index 49198e3cc..74eae64f9 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -30,6 +30,7 @@ import ( "github.com/minio/minio/internal/bucket/bandwidth" "github.com/minio/minio/internal/handlers" "github.com/minio/minio/internal/kms" + "github.com/rs/dnscache" "github.com/dustin/go-humanize" "github.com/minio/minio/internal/auth" @@ -309,7 +310,9 @@ var ( globalProxyTransport http.RoundTripper - globalDNSCache *xhttp.DNSCache + globalDNSCache = &dnscache.Resolver{ + Timeout: 5 * time.Second, + } globalForwarder *handlers.Forwarder diff --git a/cmd/server-main.go b/cmd/server-main.go index 1cb14ae51..f5b0a565b 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -431,8 +431,6 @@ func (lw nullWriter) Write(b []byte) (int, error) { // serverMain handler called for 'minio server' command. func serverMain(ctx *cli.Context) { - defer globalDNSCache.Stop() - signal.Notify(globalOSSignalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT) go handleSignals() @@ -496,14 +494,15 @@ func serverMain(ctx *cli.Context) { getCert = globalTLSCerts.GetCertificate } - httpServer := xhttp.NewServer([]string{globalMinioAddr}, criticalErrorHandler{corsHandler(handler)}, getCert) + httpServer := xhttp.NewServer([]string{globalMinioAddr}, + criticalErrorHandler{corsHandler(handler)}, getCert) httpServer.BaseContext = func(listener net.Listener) context.Context { return GlobalContext } // Turn-off random logging by Go internally httpServer.ErrorLog = log.New(&nullWriter{}, "", 0) go func() { - globalHTTPServerErrorCh <- httpServer.Start() + globalHTTPServerErrorCh <- httpServer.Start(GlobalContext) }() setHTTPServer(httpServer) diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index cacab8bb3..f7ceaefa9 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -63,7 +63,6 @@ import ( "github.com/minio/minio/internal/config" "github.com/minio/minio/internal/crypto" "github.com/minio/minio/internal/hash" - xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/rest" "github.com/minio/pkg/bucket/policy" @@ -106,8 +105,6 @@ func TestMain(m *testing.M) { // Initialize globalConsoleSys system globalConsoleSys = NewConsoleLogger(context.Background()) - globalDNSCache = xhttp.NewDNSCache(3*time.Second, 10*time.Second, logger.LogOnceIf) - globalInternodeTransport = newInternodeHTTPTransport(nil, rest.DefaultTimeout)() initHelp() diff --git a/go.mod b/go.mod index b18fb36c8..ff05c5a37 100644 --- a/go.mod +++ b/go.mod @@ -69,12 +69,12 @@ require ( github.com/prometheus/client_model v0.2.0 github.com/prometheus/procfs v0.7.3 github.com/rs/cors v1.7.0 + github.com/rs/dnscache v0.0.0-20210201191234-295bba877686 github.com/secure-io/sio-go v0.3.1 github.com/shirou/gopsutil/v3 v3.21.7 github.com/streadway/amqp v1.0.0 github.com/tinylib/msgp v1.1.6-0.20210521143832-0becd170c402 github.com/valyala/bytebufferpool v1.0.0 - github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c github.com/yargevad/filepathx v1.0.0 go.etcd.io/etcd/api/v3 v3.5.0-beta.4 diff --git a/go.sum b/go.sum index 4226d7cc2..5773ebfa5 100644 --- a/go.sum +++ b/go.sum @@ -1268,6 +1268,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.5.2/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= +github.com/rs/dnscache v0.0.0-20210201191234-295bba877686 h1:IJ6Df0uxPDtNoByV0KkzVKNseWvZFCNM/S9UoyOMCSI= +github.com/rs/dnscache v0.0.0-20210201191234-295bba877686/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA= github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= @@ -1423,7 +1425,6 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.2.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s= github.com/valyala/quicktemplate v1.2.0/go.mod h1:EH+4AkTd43SvgIbQHYu59/cJyxDoOVRUAfrukLPuGJ4= -github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a h1:0R4NLDRDZX6JcmhJgXi5E4b8Wg84ihbmUKp/GvSPEzc= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/vdemeester/k8s-pkg-credentialprovider v1.17.4/go.mod h1:inCTmtUdr5KJbreVojo06krnTgaeAz/Z7lynpPk/Q2c= github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw= diff --git a/internal/http/dial_dnscache.go b/internal/http/dial_dnscache.go index 667113cf9..a0e991e74 100644 --- a/internal/http/dial_dnscache.go +++ b/internal/http/dial_dnscache.go @@ -19,15 +19,11 @@ package http import ( "context" - "math/rand" "net" - "sync" "time" -) -var randPerm = func(n int) []int { - return rand.Perm(n) -} + "github.com/rs/dnscache" +) // DialContextWithDNSCache is a helper function which returns `net.DialContext` function. // It randomly fetches an IP from the DNS cache and dials it by the given dial @@ -39,7 +35,7 @@ var randPerm = func(n int) []int { // // In this function, it uses functions from `rand` package. To make it really random, // you MUST call `rand.Seed` and change the value from the default in your application -func DialContextWithDNSCache(cache *DNSCache, baseDialCtx DialContext) DialContext { +func DialContextWithDNSCache(resolver *dnscache.Resolver, baseDialCtx DialContext) DialContext { if baseDialCtx == nil { // This is same as which `http.DefaultTransport` uses. baseDialCtx = (&net.Dialer{ @@ -47,147 +43,29 @@ func DialContextWithDNSCache(cache *DNSCache, baseDialCtx DialContext) DialConte KeepAlive: 30 * time.Second, }).DialContext } - return func(ctx context.Context, network, host string) (net.Conn, error) { - h, p, err := net.SplitHostPort(host) + return func(ctx context.Context, network, addr string) (conn net.Conn, err error) { + host, port, err := net.SplitHostPort(addr) if err != nil { return nil, err } - // Fetch DNS result from cache. - // - // ctxLookup is only used for canceling DNS Lookup. - ctxLookup, cancelF := context.WithTimeout(ctx, cache.lookupTimeout) - defer cancelF() - addrs, err := cache.Fetch(ctxLookup, h) + if net.ParseIP(host) != nil { + // For IP only setups there is no need for DNS lookups. + return baseDialCtx(ctx, "tcp", addr) + } + + ips, err := resolver.LookupHost(ctx, host) if err != nil { return nil, err } - var firstErr error - for _, randomIndex := range randPerm(len(addrs)) { - conn, err := baseDialCtx(ctx, "tcp", net.JoinHostPort(addrs[randomIndex], p)) + for _, ip := range ips { + conn, err = baseDialCtx(ctx, "tcp", net.JoinHostPort(ip, port)) if err == nil { - return conn, nil - } - if firstErr == nil { - firstErr = err + break } } - return nil, firstErr + return } } - -// defaultFreq is default frequency a resolver refreshes DNS cache. -var ( - defaultFreq = 3 * time.Second - defaultLookupTimeout = 10 * time.Second -) - -// DNSCache is DNS cache resolver which cache DNS resolve results in memory. -type DNSCache struct { - resolver *net.Resolver - lookupTimeout time.Duration - loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) - - cache sync.Map - doneOnce sync.Once - doneCh chan struct{} -} - -// NewDNSCache initializes DNS cache resolver and starts auto refreshing -// in a new goroutine. To stop auto refreshing, call `Stop()` function. -// Once `Stop()` is called auto refreshing cannot be resumed. -func NewDNSCache(freq time.Duration, lookupTimeout time.Duration, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})) *DNSCache { - if freq <= 0 { - freq = defaultFreq - } - - if lookupTimeout <= 0 { - lookupTimeout = defaultLookupTimeout - } - - // PreferGo controls whether Go's built-in DNS resolver - // is preferred on platforms where it's available, since - // we do not compile with CGO, FIPS builds are CGO based - // enable this to enforce Go resolver. - defaultResolver := &net.Resolver{ - PreferGo: true, - } - - r := &DNSCache{ - resolver: defaultResolver, - lookupTimeout: lookupTimeout, - loggerOnce: loggerOnce, - doneCh: make(chan struct{}), - } - - rnd := rand.New(rand.NewSource(time.Now().UnixNano())) - - timer := time.NewTimer(freq) - go func() { - defer timer.Stop() - - for { - select { - case <-timer.C: - // Make sure that refreshes on DNS do not be attempted - // at the same time, allows for reduced load on the - // DNS servers. - timer.Reset(time.Duration(rnd.Float64() * float64(freq))) - - r.Refresh() - case <-r.doneCh: - return - } - } - }() - - return r -} - -// LookupHost lookups address list from DNS server, persist the results -// in-memory cache. `Fetch` is used to obtain the values for a given host. -func (r *DNSCache) LookupHost(ctx context.Context, host string) ([]string, error) { - addrs, err := r.resolver.LookupHost(ctx, host) - if err != nil { - return nil, err - } - - r.cache.Store(host, addrs) - return addrs, nil -} - -// Fetch fetches IP list from the cache. If IP list of the given addr is not in the cache, -// then it lookups from DNS server by `Lookup` function. -func (r *DNSCache) Fetch(ctx context.Context, host string) ([]string, error) { - addrs, ok := r.cache.Load(host) - if ok { - return addrs.([]string), nil - } - return r.LookupHost(ctx, host) -} - -// Refresh refreshes IP list cache, automatically. -func (r *DNSCache) Refresh() { - var hosts []string - r.cache.Range(func(k, v interface{}) bool { - hosts = append(hosts, k.(string)) - return true - }) - - for _, host := range hosts { - ctx, cancelF := context.WithTimeout(context.Background(), r.lookupTimeout) - if _, err := r.LookupHost(ctx, host); err != nil { - r.loggerOnce(ctx, err, host) - } - cancelF() - } -} - -// Stop stops auto refreshing. -func (r *DNSCache) Stop() { - r.doneOnce.Do(func() { - close(r.doneCh) - }) -} diff --git a/internal/http/dial_dnscache_test.go b/internal/http/dial_dnscache_test.go deleted file mode 100644 index 18e6edcf7..000000000 --- a/internal/http/dial_dnscache_test.go +++ /dev/null @@ -1,209 +0,0 @@ -// Copyright (c) 2015-2021 MinIO, Inc. -// -// This file is part of MinIO Object Storage stack -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package http - -import ( - "context" - "errors" - "fmt" - "math/rand" - "net" - "runtime" - "testing" - "time" -) - -var ( - testFreq = 1 * time.Second - testDefaultLookupTimeout = 1 * time.Second -) - -func logOnce(ctx context.Context, err error, id interface{}, errKind ...interface{}) { - // no-op -} - -func testDNSCache(t *testing.T) *DNSCache { - t.Helper() // skip printing file and line information from this function - return NewDNSCache(testFreq, testDefaultLookupTimeout, logOnce) -} - -func TestDialContextWithDNSCache(t *testing.T) { - resolver := &DNSCache{} - resolver.cache.Store("play.min.io", []string{ - "127.0.0.1", - "127.0.0.2", - "127.0.0.3", - }) - cases := []struct { - permF func(n int) []int - dialF DialContext - }{ - { - permF: func(n int) []int { - return []int{0} - }, - dialF: func(ctx context.Context, network, addr string) (net.Conn, error) { - if got, want := addr, net.JoinHostPort("127.0.0.1", "443"); got != want { - t.Fatalf("got addr %q, want %q", got, want) - } - return nil, nil - }, - }, - { - permF: func(n int) []int { - return []int{1} - }, - dialF: func(ctx context.Context, network, addr string) (net.Conn, error) { - if got, want := addr, net.JoinHostPort("127.0.0.2", "443"); got != want { - t.Fatalf("got addr %q, want %q", got, want) - } - return nil, nil - }, - }, - { - permF: func(n int) []int { - return []int{2} - }, - dialF: func(ctx context.Context, network, addr string) (net.Conn, error) { - if got, want := addr, net.JoinHostPort("127.0.0.3", "443"); got != want { - t.Fatalf("got addr %q, want %q", got, want) - } - return nil, nil - }, - }, - } - - origFunc := randPerm - defer func() { - randPerm = origFunc - }() - - for _, tc := range cases { - t.Run("", func(t *testing.T) { - randPerm = tc.permF - if _, err := DialContextWithDNSCache(resolver, tc.dialF)(context.Background(), "tcp", "play.min.io:443"); err != nil { - t.Fatalf("err: %s", err) - } - }) - } - -} - -func TestDialContextWithDNSCacheRand(t *testing.T) { - rand.Seed(time.Now().UTC().UnixNano()) - defer func() { - rand.Seed(1) - }() - - resolver := &DNSCache{} - resolver.cache.Store("play.min.io", []string{ - "127.0.0.1", - "127.0.0.2", - "127.0.0.3", - }) - - count := make(map[string]int) - dialF := func(ctx context.Context, network, addr string) (net.Conn, error) { - count[addr]++ - return nil, nil - } - - for i := 0; i < 100; i++ { - if _, err := DialContextWithDNSCache(resolver, dialF)(context.Background(), "tcp", "play.min.io:443"); err != nil { - t.Fatalf("err: %s", err) - } - } - - for _, c := range count { - got := float32(c) / float32(100) - if got < float32(0.1) { - t.Fatalf("expected 0.1 rate got %f", got) - } - } -} - -// Verify without port Dial fails, Go stdlib net.Dial expects port -func TestDialContextWithDNSCacheScenario1(t *testing.T) { - resolver := testDNSCache(t) - if _, err := DialContextWithDNSCache(resolver, nil)(context.Background(), "tcp", "play.min.io"); err == nil { - t.Fatalf("expect to fail") // expected port - } -} - -// Verify if the host lookup function failed to return addresses -func TestDialContextWithDNSCacheScenario2(t *testing.T) { - if runtime.GOOS == "windows" { - // Windows doesn't use Dial to connect - // so there is no way this test will work - // as expected. - t.Skip() - } - - res := testDNSCache(t) - originalResolver := res.resolver - defer func() { - res.resolver = originalResolver - }() - - res.resolver = &net.Resolver{ - PreferGo: true, - Dial: func(ctx context.Context, network, address string) (net.Conn, error) { - return nil, fmt.Errorf("err") - }, - } - - if _, err := DialContextWithDNSCache(res, nil)(context.Background(), "tcp", "min.io:443"); err == nil { - t.Fatalf("expect to fail") - } -} - -// Verify we always return the first error from net.Dial failure -func TestDialContextWithDNSCacheScenario3(t *testing.T) { - resolver := &DNSCache{} - resolver.cache.Store("min.io", []string{ - "1.1.1.1", - "2.2.2.2", - "3.3.3.3", - }) - origFunc := randPerm - randPerm = func(n int) []int { - return []int{0, 1, 2} - } - defer func() { - randPerm = origFunc - }() - - want := errors.New("error1") - dialF := func(ctx context.Context, network, addr string) (net.Conn, error) { - if addr == net.JoinHostPort("1.1.1.1", "443") { - return nil, want // first error should be returned - } - if addr == net.JoinHostPort("2.2.2.2", "443") { - return nil, fmt.Errorf("error2") - } - if addr == net.JoinHostPort("3.3.3.3", "443") { - return nil, fmt.Errorf("error3") - } - return nil, nil - } - - _, got := DialContextWithDNSCache(resolver, dialF)(context.Background(), "tcp", "min.io:443") - if got != want { - t.Fatalf("got error %v, want %v", got, want) - } -} diff --git a/internal/http/dial_linux.go b/internal/http/dial_linux.go index 520332951..bc05ef614 100644 --- a/internal/http/dial_linux.go +++ b/internal/http/dial_linux.go @@ -29,11 +29,19 @@ import ( "golang.org/x/sys/unix" ) -func setInternalTCPParameters(c syscall.RawConn) error { - return c.Control(func(fdPtr uintptr) { +func setTCPParameters(network, address string, c syscall.RawConn) error { + c.Control(func(fdPtr uintptr) { // got socket file descriptor to set parameters. fd := int(fdPtr) + _ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEADDR, 1) + + _ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEPORT, 1) + + // Enable TCP open + // https://lwn.net/Articles/508865/ - 16k queue size. + _ = syscall.SetsockoptInt(fd, syscall.SOL_TCP, unix.TCP_FASTOPEN, 16*1024) + // Enable TCP fast connect // TCPFastOpenConnect sets the underlying socket to use // the TCP fast open connect. This feature is supported @@ -44,22 +52,8 @@ func setInternalTCPParameters(c syscall.RawConn) error { // "Set TCP_QUICKACK. If you find a case where that makes things worse, let me know." _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_QUICKACK, 1) - // The time (in seconds) the connection needs to remain idle before - // TCP starts sending keepalive probes, set this to 5 secs - // system defaults to 7200 secs!!! - _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, 5) - - // Number of probes. - // ~ cat /proc/sys/net/ipv4/tcp_keepalive_probes (defaults to 9, we reduce it to 5) - // 9 - _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPCNT, 5) - - // Wait time after successful probe in seconds. - // ~ cat /proc/sys/net/ipv4/tcp_keepalive_intvl (defaults to 75 secs, we reduce it to 3 secs) - // 75 - _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, 3) - }) + return nil } // DialContext is a function to make custom Dial for internode communications @@ -70,9 +64,7 @@ func NewInternodeDialContext(dialTimeout time.Duration) DialContext { return func(ctx context.Context, network, addr string) (net.Conn, error) { dialer := &net.Dialer{ Timeout: dialTimeout, - Control: func(network, address string, c syscall.RawConn) error { - return setInternalTCPParameters(c) - }, + Control: setTCPParameters, } return dialer.DialContext(ctx, network, addr) } @@ -83,22 +75,7 @@ func NewCustomDialContext(dialTimeout time.Duration) DialContext { return func(ctx context.Context, network, addr string) (net.Conn, error) { dialer := &net.Dialer{ Timeout: dialTimeout, - Control: func(network, address string, c syscall.RawConn) error { - return c.Control(func(fdPtr uintptr) { - // got socket file descriptor to set parameters. - fd := int(fdPtr) - - // Enable TCP fast connect - // TCPFastOpenConnect sets the underlying socket to use - // the TCP fast open connect. This feature is supported - // since Linux 4.11. - _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_FASTOPEN_CONNECT, 1) - - // Enable TCP quick ACK, John Nagle says - // "Set TCP_QUICKACK. If you find a case where that makes things worse, let me know." - _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_QUICKACK, 1) - }) - }, + Control: setTCPParameters, } return dialer.DialContext(ctx, network, addr) } diff --git a/internal/http/dial_others.go b/internal/http/dial_others.go index d36357b63..c07ed1e40 100644 --- a/internal/http/dial_others.go +++ b/internal/http/dial_others.go @@ -29,7 +29,7 @@ import ( // TODO: if possible implement for non-linux platforms, not a priority at the moment //nolint:deadcode -func setInternalTCPParameters(c syscall.RawConn) error { +func setTCPParameters(string, string, syscall.RawConn) error { return nil } diff --git a/internal/http/listen_nix.go b/internal/http/listen_nix.go index ef464e25b..aad382c34 100644 --- a/internal/http/listen_nix.go +++ b/internal/http/listen_nix.go @@ -22,19 +22,9 @@ package http import ( "net" - - "github.com/valyala/tcplisten" ) -var cfg = &tcplisten.Config{ - ReusePort: true, - DeferAccept: true, - FastOpen: true, - // Bump up the soMaxConn value from 128 to 65535 to - // handle large incoming concurrent requests. - Backlog: 65535, -} - // Unix listener with special TCP options. -var listen = cfg.NewListener -var fallbackListen = net.Listen +var listenCfg = net.ListenConfig{ + Control: setTCPParameters, +} diff --git a/internal/http/listen_others.go b/internal/http/listen_others.go index dc7876c60..38e35985e 100644 --- a/internal/http/listen_others.go +++ b/internal/http/listen_others.go @@ -23,5 +23,4 @@ package http import "net" // Windows, plan9 specific listener. -var listen = net.Listen -var fallbackListen = net.Listen +var listenCfg = net.ListenConfig{} diff --git a/internal/http/listener.go b/internal/http/listener.go index 06dd20a41..23d0b14f5 100644 --- a/internal/http/listener.go +++ b/internal/http/listener.go @@ -18,11 +18,9 @@ package http import ( + "context" "fmt" - "io" "net" - "os" - "sync" "syscall" ) @@ -33,45 +31,22 @@ type acceptResult struct { // httpListener - HTTP listener capable of handling multiple server addresses. type httpListener struct { - mutex sync.Mutex // to guard Close() method. tcpListeners []*net.TCPListener // underlaying TCP listeners. acceptCh chan acceptResult // channel where all TCP listeners write accepted connection. - doneCh chan struct{} // done channel for TCP listener goroutines. -} - -// isRoutineNetErr returns true if error is due to a network timeout, -// connect reset or io.EOF and false otherwise -func isRoutineNetErr(err error) bool { - if err == nil { - return false - } - if nErr, ok := err.(*net.OpError); ok { - // Check if the error is a tcp connection reset - if syscallErr, ok := nErr.Err.(*os.SyscallError); ok { - if errno, ok := syscallErr.Err.(syscall.Errno); ok { - return errno == syscall.ECONNRESET - } - } - // Check if the error is a timeout - return nErr.Timeout() - } - // check for io.EOF and also some times io.EOF is wrapped is another error type. - return err == io.EOF || err.Error() == "EOF" + ctx context.Context + ctxCanceler context.CancelFunc } // start - starts separate goroutine for each TCP listener. A valid new connection is passed to httpListener.acceptCh. func (listener *httpListener) start() { - listener.acceptCh = make(chan acceptResult) - listener.doneCh = make(chan struct{}) - // Closure to send acceptResult to acceptCh. // It returns true if the result is sent else false if returns when doneCh is closed. - send := func(result acceptResult, doneCh <-chan struct{}) bool { + send := func(result acceptResult) bool { select { case listener.acceptCh <- result: // Successfully written to acceptCh return true - case <-doneCh: + case <-listener.ctx.Done(): // As stop signal is received, close accepted connection. if result.conn != nil { result.conn.Close() @@ -81,56 +56,52 @@ func (listener *httpListener) start() { } // Closure to handle single connection. - handleConn := func(tcpConn *net.TCPConn, doneCh <-chan struct{}) { + handleConn := func(tcpConn *net.TCPConn) { tcpConn.SetKeepAlive(true) - send(acceptResult{tcpConn, nil}, doneCh) + send(acceptResult{tcpConn, nil}) } // Closure to handle TCPListener until done channel is closed. - handleListener := func(tcpListener *net.TCPListener, doneCh <-chan struct{}) { + handleListener := func(tcpListener *net.TCPListener) { for { tcpConn, err := tcpListener.AcceptTCP() if err != nil { // Returns when send fails. - if !send(acceptResult{nil, err}, doneCh) { + if !send(acceptResult{nil, err}) { return } } else { - go handleConn(tcpConn, doneCh) + go handleConn(tcpConn) } } } // Start separate goroutine for each TCP listener to handle connection. for _, tcpListener := range listener.tcpListeners { - go handleListener(tcpListener, listener.doneCh) + go handleListener(tcpListener) } } // Accept - reads from httpListener.acceptCh for one of previously accepted TCP connection and returns the same. func (listener *httpListener) Accept() (conn net.Conn, err error) { - result, ok := <-listener.acceptCh - if ok { - return result.conn, result.err + select { + case result, ok := <-listener.acceptCh: + if ok { + return result.conn, result.err + } + case <-listener.ctx.Done(): } - return nil, syscall.EINVAL } // Close - closes underneath all TCP listeners. func (listener *httpListener) Close() (err error) { - listener.mutex.Lock() - defer listener.mutex.Unlock() - if listener.doneCh == nil { - return syscall.EINVAL - } + listener.ctxCanceler() for i := range listener.tcpListeners { listener.tcpListeners[i].Close() } - close(listener.doneCh) - listener.doneCh = nil return nil } @@ -163,7 +134,7 @@ func (listener *httpListener) Addrs() (addrs []net.Addr) { // httpListener is capable to // * listen to multiple addresses // * controls incoming connections only doing HTTP protocol -func newHTTPListener(serverAddrs []string) (listener *httpListener, err error) { +func newHTTPListener(ctx context.Context, serverAddrs []string) (listener *httpListener, err error) { var tcpListeners []*net.TCPListener @@ -181,10 +152,8 @@ func newHTTPListener(serverAddrs []string) (listener *httpListener, err error) { for _, serverAddr := range serverAddrs { var l net.Listener - if l, err = listen("tcp", serverAddr); err != nil { - if l, err = fallbackListen("tcp", serverAddr); err != nil { - return nil, err - } + if l, err = listenCfg.Listen(ctx, "tcp", serverAddr); err != nil { + return nil, err } tcpListener, ok := l.(*net.TCPListener) @@ -197,7 +166,9 @@ func newHTTPListener(serverAddrs []string) (listener *httpListener, err error) { listener = &httpListener{ tcpListeners: tcpListeners, + acceptCh: make(chan acceptResult, len(tcpListeners)), } + listener.ctx, listener.ctxCanceler = context.WithCancel(ctx) listener.start() return listener, nil diff --git a/internal/http/listener_test.go b/internal/http/listener_test.go index 727ee67a4..bde323c51 100644 --- a/internal/http/listener_test.go +++ b/internal/http/listener_test.go @@ -18,10 +18,8 @@ package http import ( + "context" "crypto/tls" - "errors" - "fmt" - "io" "net" "strconv" "strings" @@ -150,7 +148,7 @@ func TestNewHTTPListener(t *testing.T) { } for _, testCase := range testCases { - listener, err := newHTTPListener( + listener, err := newHTTPListener(context.Background(), testCase.serverAddrs, ) @@ -183,7 +181,7 @@ func TestHTTPListenerStartClose(t *testing.T) { } for i, testCase := range testCases { - listener, err := newHTTPListener( + listener, err := newHTTPListener(context.Background(), testCase.serverAddrs, ) if err != nil { @@ -226,7 +224,7 @@ func TestHTTPListenerAddr(t *testing.T) { } for i, testCase := range testCases { - listener, err := newHTTPListener( + listener, err := newHTTPListener(context.Background(), testCase.serverAddrs, ) if err != nil { @@ -266,7 +264,7 @@ func TestHTTPListenerAddrs(t *testing.T) { } for i, testCase := range testCases { - listener, err := newHTTPListener( + listener, err := newHTTPListener(context.Background(), testCase.serverAddrs, ) if err != nil { @@ -290,50 +288,3 @@ func TestHTTPListenerAddrs(t *testing.T) { listener.Close() } } - -type myTimeoutErr struct { - timeout bool -} - -func (m *myTimeoutErr) Error() string { return fmt.Sprintf("myTimeoutErr: %v", m.timeout) } -func (m *myTimeoutErr) Timeout() bool { return m.timeout } - -// Test for ignoreErr helper function -func TestIgnoreErr(t *testing.T) { - testCases := []struct { - err error - want bool - }{ - { - err: io.EOF, - want: true, - }, - { - err: &net.OpError{Err: &myTimeoutErr{timeout: true}}, - want: true, - }, - { - err: errors.New("EOF"), - want: true, - }, - { - err: &net.OpError{Err: &myTimeoutErr{timeout: false}}, - want: false, - }, - { - err: io.ErrUnexpectedEOF, - want: false, - }, - { - err: nil, - want: false, - }, - } - - for i, tc := range testCases { - if actual := isRoutineNetErr(tc.err); actual != tc.want { - t.Errorf("Test case %d: Expected %v but got %v for %v", i+1, - tc.want, actual, tc.err) - } - } -} diff --git a/internal/http/server.go b/internal/http/server.go index ad953d2ea..d558d5e9f 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -18,6 +18,7 @@ package http import ( + "context" "crypto/tls" "errors" "io/ioutil" @@ -29,7 +30,6 @@ import ( humanize "github.com/dustin/go-humanize" - "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio/internal/config" "github.com/minio/minio/internal/config/api" "github.com/minio/minio/internal/fips" @@ -64,7 +64,7 @@ func (srv *Server) GetRequestCount() int { } // Start - start HTTP server -func (srv *Server) Start() (err error) { +func (srv *Server) Start(ctx context.Context) (err error) { // Take a copy of server fields. var tlsConfig *tls.Config if srv.TLSConfig != nil { @@ -72,12 +72,11 @@ func (srv *Server) Start() (err error) { } handler := srv.Handler // if srv.Handler holds non-synced state -> possible data race - addrs := set.CreateStringSet(srv.Addrs...).ToSlice() // copy and remove duplicates - // Create new HTTP listener. var listener *httpListener listener, err = newHTTPListener( - addrs, + ctx, + srv.Addrs, ) if err != nil { return err