From 3d5750f31c27bb318b791a182e63050f663eddaf Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 5 Oct 2021 10:13:04 -0700 Subject: [PATCH] update and use rs/dnscache implementation instead of custom (#13348) additionally optimize for IP only setups, avoid doing unnecessary lookups if the Dial addr is an IP. allow support for multiple listeners on same socket, this is mainly meant for future purposes. --- cmd/common-main.go | 37 +++-- cmd/gateway-main.go | 4 +- cmd/globals.go | 5 +- cmd/server-main.go | 7 +- cmd/test-utils_test.go | 3 - go.mod | 2 +- go.sum | 3 +- internal/http/dial_dnscache.go | 152 ++------------------ internal/http/dial_dnscache_test.go | 209 ---------------------------- internal/http/dial_linux.go | 49 ++----- internal/http/dial_others.go | 2 +- internal/http/listen_nix.go | 16 +-- internal/http/listen_others.go | 3 +- internal/http/listener.go | 75 +++------- internal/http/listener_test.go | 59 +------- internal/http/server.go | 9 +- 16 files changed, 103 insertions(+), 532 deletions(-) delete mode 100644 internal/http/dial_dnscache_test.go diff --git a/cmd/common-main.go b/cmd/common-main.go index 66fe5a487..2c13d2917 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -51,7 +51,6 @@ import ( "github.com/minio/minio/internal/color" "github.com/minio/minio/internal/config" "github.com/minio/minio/internal/handlers" - xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/kms" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/certs" @@ -59,6 +58,7 @@ import ( "github.com/minio/pkg/ellipses" "github.com/minio/pkg/env" xnet "github.com/minio/pkg/net" + "github.com/rs/dnscache" ) // serverDebugLog will enable debug printing @@ -71,17 +71,34 @@ func init() { logger.Init(GOPATH, GOROOT) logger.RegisterError(config.FmtError) - if IsKubernetes() || IsDocker() || IsBOSH() || IsDCOS() || IsPCFTile() { - // 30 seconds matches the orchestrator DNS TTLs, have - // a 5 second timeout to lookup from DNS servers. - globalDNSCache = xhttp.NewDNSCache(30*time.Second, 5*time.Second, logger.LogOnceIf) - } else { - // On bare-metals DNS do not change often, so it is - // safe to assume a higher timeout upto 10 minutes. - globalDNSCache = xhttp.NewDNSCache(10*time.Minute, 5*time.Second, logger.LogOnceIf) - } initGlobalContext() + options := dnscache.ResolverRefreshOptions{ + ClearUnused: true, + PersistOnFailure: false, + } + + // Call to refresh will refresh names in cache. If you pass true, it will also + // remove cached names not looked up since the last call to Refresh. It is a good idea + // to call this method on a regular interval. + go func() { + var t *time.Ticker + if IsKubernetes() || IsDocker() || IsBOSH() || IsDCOS() || IsPCFTile() { + t = time.NewTicker(1 * time.Minute) + } else { + t = time.NewTicker(10 * time.Minute) + } + defer t.Stop() + for { + select { + case <-t.C: + globalDNSCache.RefreshWithOptions(options) + case <-GlobalContext.Done(): + return + } + } + }() + globalForwarder = handlers.NewForwarder(&handlers.Forwarder{ PassHost: true, RoundTripper: newGatewayHTTPTransport(1 * time.Hour), diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index 1190572a7..d969a88d9 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -154,8 +154,6 @@ func ValidateGatewayArguments(serverAddr, endpointAddr string) error { // StartGateway - handler for 'minio gateway '. func StartGateway(ctx *cli.Context, gw Gateway) { - defer globalDNSCache.Stop() - signal.Notify(globalOSSignalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT) go handleSignals() @@ -268,7 +266,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) { return GlobalContext } go func() { - globalHTTPServerErrorCh <- httpServer.Start() + globalHTTPServerErrorCh <- httpServer.Start(GlobalContext) }() globalObjLayerMutex.Lock() diff --git a/cmd/globals.go b/cmd/globals.go index 49198e3cc..74eae64f9 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -30,6 +30,7 @@ import ( "github.com/minio/minio/internal/bucket/bandwidth" "github.com/minio/minio/internal/handlers" "github.com/minio/minio/internal/kms" + "github.com/rs/dnscache" "github.com/dustin/go-humanize" "github.com/minio/minio/internal/auth" @@ -309,7 +310,9 @@ var ( globalProxyTransport http.RoundTripper - globalDNSCache *xhttp.DNSCache + globalDNSCache = &dnscache.Resolver{ + Timeout: 5 * time.Second, + } globalForwarder *handlers.Forwarder diff --git a/cmd/server-main.go b/cmd/server-main.go index 1cb14ae51..f5b0a565b 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -431,8 +431,6 @@ func (lw nullWriter) Write(b []byte) (int, error) { // serverMain handler called for 'minio server' command. func serverMain(ctx *cli.Context) { - defer globalDNSCache.Stop() - signal.Notify(globalOSSignalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT) go handleSignals() @@ -496,14 +494,15 @@ func serverMain(ctx *cli.Context) { getCert = globalTLSCerts.GetCertificate } - httpServer := xhttp.NewServer([]string{globalMinioAddr}, criticalErrorHandler{corsHandler(handler)}, getCert) + httpServer := xhttp.NewServer([]string{globalMinioAddr}, + criticalErrorHandler{corsHandler(handler)}, getCert) httpServer.BaseContext = func(listener net.Listener) context.Context { return GlobalContext } // Turn-off random logging by Go internally httpServer.ErrorLog = log.New(&nullWriter{}, "", 0) go func() { - globalHTTPServerErrorCh <- httpServer.Start() + globalHTTPServerErrorCh <- httpServer.Start(GlobalContext) }() setHTTPServer(httpServer) diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index cacab8bb3..f7ceaefa9 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -63,7 +63,6 @@ import ( "github.com/minio/minio/internal/config" "github.com/minio/minio/internal/crypto" "github.com/minio/minio/internal/hash" - xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/rest" "github.com/minio/pkg/bucket/policy" @@ -106,8 +105,6 @@ func TestMain(m *testing.M) { // Initialize globalConsoleSys system globalConsoleSys = NewConsoleLogger(context.Background()) - globalDNSCache = xhttp.NewDNSCache(3*time.Second, 10*time.Second, logger.LogOnceIf) - globalInternodeTransport = newInternodeHTTPTransport(nil, rest.DefaultTimeout)() initHelp() diff --git a/go.mod b/go.mod index b18fb36c8..ff05c5a37 100644 --- a/go.mod +++ b/go.mod @@ -69,12 +69,12 @@ require ( github.com/prometheus/client_model v0.2.0 github.com/prometheus/procfs v0.7.3 github.com/rs/cors v1.7.0 + github.com/rs/dnscache v0.0.0-20210201191234-295bba877686 github.com/secure-io/sio-go v0.3.1 github.com/shirou/gopsutil/v3 v3.21.7 github.com/streadway/amqp v1.0.0 github.com/tinylib/msgp v1.1.6-0.20210521143832-0becd170c402 github.com/valyala/bytebufferpool v1.0.0 - github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c github.com/yargevad/filepathx v1.0.0 go.etcd.io/etcd/api/v3 v3.5.0-beta.4 diff --git a/go.sum b/go.sum index 4226d7cc2..5773ebfa5 100644 --- a/go.sum +++ b/go.sum @@ -1268,6 +1268,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.5.2/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= +github.com/rs/dnscache v0.0.0-20210201191234-295bba877686 h1:IJ6Df0uxPDtNoByV0KkzVKNseWvZFCNM/S9UoyOMCSI= +github.com/rs/dnscache v0.0.0-20210201191234-295bba877686/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA= github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= @@ -1423,7 +1425,6 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.2.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s= github.com/valyala/quicktemplate v1.2.0/go.mod h1:EH+4AkTd43SvgIbQHYu59/cJyxDoOVRUAfrukLPuGJ4= -github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a h1:0R4NLDRDZX6JcmhJgXi5E4b8Wg84ihbmUKp/GvSPEzc= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/vdemeester/k8s-pkg-credentialprovider v1.17.4/go.mod h1:inCTmtUdr5KJbreVojo06krnTgaeAz/Z7lynpPk/Q2c= github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw= diff --git a/internal/http/dial_dnscache.go b/internal/http/dial_dnscache.go index 667113cf9..a0e991e74 100644 --- a/internal/http/dial_dnscache.go +++ b/internal/http/dial_dnscache.go @@ -19,15 +19,11 @@ package http import ( "context" - "math/rand" "net" - "sync" "time" -) -var randPerm = func(n int) []int { - return rand.Perm(n) -} + "github.com/rs/dnscache" +) // DialContextWithDNSCache is a helper function which returns `net.DialContext` function. // It randomly fetches an IP from the DNS cache and dials it by the given dial @@ -39,7 +35,7 @@ var randPerm = func(n int) []int { // // In this function, it uses functions from `rand` package. To make it really random, // you MUST call `rand.Seed` and change the value from the default in your application -func DialContextWithDNSCache(cache *DNSCache, baseDialCtx DialContext) DialContext { +func DialContextWithDNSCache(resolver *dnscache.Resolver, baseDialCtx DialContext) DialContext { if baseDialCtx == nil { // This is same as which `http.DefaultTransport` uses. baseDialCtx = (&net.Dialer{ @@ -47,147 +43,29 @@ func DialContextWithDNSCache(cache *DNSCache, baseDialCtx DialContext) DialConte KeepAlive: 30 * time.Second, }).DialContext } - return func(ctx context.Context, network, host string) (net.Conn, error) { - h, p, err := net.SplitHostPort(host) + return func(ctx context.Context, network, addr string) (conn net.Conn, err error) { + host, port, err := net.SplitHostPort(addr) if err != nil { return nil, err } - // Fetch DNS result from cache. - // - // ctxLookup is only used for canceling DNS Lookup. - ctxLookup, cancelF := context.WithTimeout(ctx, cache.lookupTimeout) - defer cancelF() - addrs, err := cache.Fetch(ctxLookup, h) + if net.ParseIP(host) != nil { + // For IP only setups there is no need for DNS lookups. + return baseDialCtx(ctx, "tcp", addr) + } + + ips, err := resolver.LookupHost(ctx, host) if err != nil { return nil, err } - var firstErr error - for _, randomIndex := range randPerm(len(addrs)) { - conn, err := baseDialCtx(ctx, "tcp", net.JoinHostPort(addrs[randomIndex], p)) + for _, ip := range ips { + conn, err = baseDialCtx(ctx, "tcp", net.JoinHostPort(ip, port)) if err == nil { - return conn, nil - } - if firstErr == nil { - firstErr = err + break } } - return nil, firstErr + return } } - -// defaultFreq is default frequency a resolver refreshes DNS cache. -var ( - defaultFreq = 3 * time.Second - defaultLookupTimeout = 10 * time.Second -) - -// DNSCache is DNS cache resolver which cache DNS resolve results in memory. -type DNSCache struct { - resolver *net.Resolver - lookupTimeout time.Duration - loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) - - cache sync.Map - doneOnce sync.Once - doneCh chan struct{} -} - -// NewDNSCache initializes DNS cache resolver and starts auto refreshing -// in a new goroutine. To stop auto refreshing, call `Stop()` function. -// Once `Stop()` is called auto refreshing cannot be resumed. -func NewDNSCache(freq time.Duration, lookupTimeout time.Duration, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})) *DNSCache { - if freq <= 0 { - freq = defaultFreq - } - - if lookupTimeout <= 0 { - lookupTimeout = defaultLookupTimeout - } - - // PreferGo controls whether Go's built-in DNS resolver - // is preferred on platforms where it's available, since - // we do not compile with CGO, FIPS builds are CGO based - // enable this to enforce Go resolver. - defaultResolver := &net.Resolver{ - PreferGo: true, - } - - r := &DNSCache{ - resolver: defaultResolver, - lookupTimeout: lookupTimeout, - loggerOnce: loggerOnce, - doneCh: make(chan struct{}), - } - - rnd := rand.New(rand.NewSource(time.Now().UnixNano())) - - timer := time.NewTimer(freq) - go func() { - defer timer.Stop() - - for { - select { - case <-timer.C: - // Make sure that refreshes on DNS do not be attempted - // at the same time, allows for reduced load on the - // DNS servers. - timer.Reset(time.Duration(rnd.Float64() * float64(freq))) - - r.Refresh() - case <-r.doneCh: - return - } - } - }() - - return r -} - -// LookupHost lookups address list from DNS server, persist the results -// in-memory cache. `Fetch` is used to obtain the values for a given host. -func (r *DNSCache) LookupHost(ctx context.Context, host string) ([]string, error) { - addrs, err := r.resolver.LookupHost(ctx, host) - if err != nil { - return nil, err - } - - r.cache.Store(host, addrs) - return addrs, nil -} - -// Fetch fetches IP list from the cache. If IP list of the given addr is not in the cache, -// then it lookups from DNS server by `Lookup` function. -func (r *DNSCache) Fetch(ctx context.Context, host string) ([]string, error) { - addrs, ok := r.cache.Load(host) - if ok { - return addrs.([]string), nil - } - return r.LookupHost(ctx, host) -} - -// Refresh refreshes IP list cache, automatically. -func (r *DNSCache) Refresh() { - var hosts []string - r.cache.Range(func(k, v interface{}) bool { - hosts = append(hosts, k.(string)) - return true - }) - - for _, host := range hosts { - ctx, cancelF := context.WithTimeout(context.Background(), r.lookupTimeout) - if _, err := r.LookupHost(ctx, host); err != nil { - r.loggerOnce(ctx, err, host) - } - cancelF() - } -} - -// Stop stops auto refreshing. -func (r *DNSCache) Stop() { - r.doneOnce.Do(func() { - close(r.doneCh) - }) -} diff --git a/internal/http/dial_dnscache_test.go b/internal/http/dial_dnscache_test.go deleted file mode 100644 index 18e6edcf7..000000000 --- a/internal/http/dial_dnscache_test.go +++ /dev/null @@ -1,209 +0,0 @@ -// Copyright (c) 2015-2021 MinIO, Inc. -// -// This file is part of MinIO Object Storage stack -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package http - -import ( - "context" - "errors" - "fmt" - "math/rand" - "net" - "runtime" - "testing" - "time" -) - -var ( - testFreq = 1 * time.Second - testDefaultLookupTimeout = 1 * time.Second -) - -func logOnce(ctx context.Context, err error, id interface{}, errKind ...interface{}) { - // no-op -} - -func testDNSCache(t *testing.T) *DNSCache { - t.Helper() // skip printing file and line information from this function - return NewDNSCache(testFreq, testDefaultLookupTimeout, logOnce) -} - -func TestDialContextWithDNSCache(t *testing.T) { - resolver := &DNSCache{} - resolver.cache.Store("play.min.io", []string{ - "127.0.0.1", - "127.0.0.2", - "127.0.0.3", - }) - cases := []struct { - permF func(n int) []int - dialF DialContext - }{ - { - permF: func(n int) []int { - return []int{0} - }, - dialF: func(ctx context.Context, network, addr string) (net.Conn, error) { - if got, want := addr, net.JoinHostPort("127.0.0.1", "443"); got != want { - t.Fatalf("got addr %q, want %q", got, want) - } - return nil, nil - }, - }, - { - permF: func(n int) []int { - return []int{1} - }, - dialF: func(ctx context.Context, network, addr string) (net.Conn, error) { - if got, want := addr, net.JoinHostPort("127.0.0.2", "443"); got != want { - t.Fatalf("got addr %q, want %q", got, want) - } - return nil, nil - }, - }, - { - permF: func(n int) []int { - return []int{2} - }, - dialF: func(ctx context.Context, network, addr string) (net.Conn, error) { - if got, want := addr, net.JoinHostPort("127.0.0.3", "443"); got != want { - t.Fatalf("got addr %q, want %q", got, want) - } - return nil, nil - }, - }, - } - - origFunc := randPerm - defer func() { - randPerm = origFunc - }() - - for _, tc := range cases { - t.Run("", func(t *testing.T) { - randPerm = tc.permF - if _, err := DialContextWithDNSCache(resolver, tc.dialF)(context.Background(), "tcp", "play.min.io:443"); err != nil { - t.Fatalf("err: %s", err) - } - }) - } - -} - -func TestDialContextWithDNSCacheRand(t *testing.T) { - rand.Seed(time.Now().UTC().UnixNano()) - defer func() { - rand.Seed(1) - }() - - resolver := &DNSCache{} - resolver.cache.Store("play.min.io", []string{ - "127.0.0.1", - "127.0.0.2", - "127.0.0.3", - }) - - count := make(map[string]int) - dialF := func(ctx context.Context, network, addr string) (net.Conn, error) { - count[addr]++ - return nil, nil - } - - for i := 0; i < 100; i++ { - if _, err := DialContextWithDNSCache(resolver, dialF)(context.Background(), "tcp", "play.min.io:443"); err != nil { - t.Fatalf("err: %s", err) - } - } - - for _, c := range count { - got := float32(c) / float32(100) - if got < float32(0.1) { - t.Fatalf("expected 0.1 rate got %f", got) - } - } -} - -// Verify without port Dial fails, Go stdlib net.Dial expects port -func TestDialContextWithDNSCacheScenario1(t *testing.T) { - resolver := testDNSCache(t) - if _, err := DialContextWithDNSCache(resolver, nil)(context.Background(), "tcp", "play.min.io"); err == nil { - t.Fatalf("expect to fail") // expected port - } -} - -// Verify if the host lookup function failed to return addresses -func TestDialContextWithDNSCacheScenario2(t *testing.T) { - if runtime.GOOS == "windows" { - // Windows doesn't use Dial to connect - // so there is no way this test will work - // as expected. - t.Skip() - } - - res := testDNSCache(t) - originalResolver := res.resolver - defer func() { - res.resolver = originalResolver - }() - - res.resolver = &net.Resolver{ - PreferGo: true, - Dial: func(ctx context.Context, network, address string) (net.Conn, error) { - return nil, fmt.Errorf("err") - }, - } - - if _, err := DialContextWithDNSCache(res, nil)(context.Background(), "tcp", "min.io:443"); err == nil { - t.Fatalf("expect to fail") - } -} - -// Verify we always return the first error from net.Dial failure -func TestDialContextWithDNSCacheScenario3(t *testing.T) { - resolver := &DNSCache{} - resolver.cache.Store("min.io", []string{ - "1.1.1.1", - "2.2.2.2", - "3.3.3.3", - }) - origFunc := randPerm - randPerm = func(n int) []int { - return []int{0, 1, 2} - } - defer func() { - randPerm = origFunc - }() - - want := errors.New("error1") - dialF := func(ctx context.Context, network, addr string) (net.Conn, error) { - if addr == net.JoinHostPort("1.1.1.1", "443") { - return nil, want // first error should be returned - } - if addr == net.JoinHostPort("2.2.2.2", "443") { - return nil, fmt.Errorf("error2") - } - if addr == net.JoinHostPort("3.3.3.3", "443") { - return nil, fmt.Errorf("error3") - } - return nil, nil - } - - _, got := DialContextWithDNSCache(resolver, dialF)(context.Background(), "tcp", "min.io:443") - if got != want { - t.Fatalf("got error %v, want %v", got, want) - } -} diff --git a/internal/http/dial_linux.go b/internal/http/dial_linux.go index 520332951..bc05ef614 100644 --- a/internal/http/dial_linux.go +++ b/internal/http/dial_linux.go @@ -29,11 +29,19 @@ import ( "golang.org/x/sys/unix" ) -func setInternalTCPParameters(c syscall.RawConn) error { - return c.Control(func(fdPtr uintptr) { +func setTCPParameters(network, address string, c syscall.RawConn) error { + c.Control(func(fdPtr uintptr) { // got socket file descriptor to set parameters. fd := int(fdPtr) + _ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEADDR, 1) + + _ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEPORT, 1) + + // Enable TCP open + // https://lwn.net/Articles/508865/ - 16k queue size. + _ = syscall.SetsockoptInt(fd, syscall.SOL_TCP, unix.TCP_FASTOPEN, 16*1024) + // Enable TCP fast connect // TCPFastOpenConnect sets the underlying socket to use // the TCP fast open connect. This feature is supported @@ -44,22 +52,8 @@ func setInternalTCPParameters(c syscall.RawConn) error { // "Set TCP_QUICKACK. If you find a case where that makes things worse, let me know." _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_QUICKACK, 1) - // The time (in seconds) the connection needs to remain idle before - // TCP starts sending keepalive probes, set this to 5 secs - // system defaults to 7200 secs!!! - _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, 5) - - // Number of probes. - // ~ cat /proc/sys/net/ipv4/tcp_keepalive_probes (defaults to 9, we reduce it to 5) - // 9 - _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPCNT, 5) - - // Wait time after successful probe in seconds. - // ~ cat /proc/sys/net/ipv4/tcp_keepalive_intvl (defaults to 75 secs, we reduce it to 3 secs) - // 75 - _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, 3) - }) + return nil } // DialContext is a function to make custom Dial for internode communications @@ -70,9 +64,7 @@ func NewInternodeDialContext(dialTimeout time.Duration) DialContext { return func(ctx context.Context, network, addr string) (net.Conn, error) { dialer := &net.Dialer{ Timeout: dialTimeout, - Control: func(network, address string, c syscall.RawConn) error { - return setInternalTCPParameters(c) - }, + Control: setTCPParameters, } return dialer.DialContext(ctx, network, addr) } @@ -83,22 +75,7 @@ func NewCustomDialContext(dialTimeout time.Duration) DialContext { return func(ctx context.Context, network, addr string) (net.Conn, error) { dialer := &net.Dialer{ Timeout: dialTimeout, - Control: func(network, address string, c syscall.RawConn) error { - return c.Control(func(fdPtr uintptr) { - // got socket file descriptor to set parameters. - fd := int(fdPtr) - - // Enable TCP fast connect - // TCPFastOpenConnect sets the underlying socket to use - // the TCP fast open connect. This feature is supported - // since Linux 4.11. - _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_FASTOPEN_CONNECT, 1) - - // Enable TCP quick ACK, John Nagle says - // "Set TCP_QUICKACK. If you find a case where that makes things worse, let me know." - _ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_QUICKACK, 1) - }) - }, + Control: setTCPParameters, } return dialer.DialContext(ctx, network, addr) } diff --git a/internal/http/dial_others.go b/internal/http/dial_others.go index d36357b63..c07ed1e40 100644 --- a/internal/http/dial_others.go +++ b/internal/http/dial_others.go @@ -29,7 +29,7 @@ import ( // TODO: if possible implement for non-linux platforms, not a priority at the moment //nolint:deadcode -func setInternalTCPParameters(c syscall.RawConn) error { +func setTCPParameters(string, string, syscall.RawConn) error { return nil } diff --git a/internal/http/listen_nix.go b/internal/http/listen_nix.go index ef464e25b..aad382c34 100644 --- a/internal/http/listen_nix.go +++ b/internal/http/listen_nix.go @@ -22,19 +22,9 @@ package http import ( "net" - - "github.com/valyala/tcplisten" ) -var cfg = &tcplisten.Config{ - ReusePort: true, - DeferAccept: true, - FastOpen: true, - // Bump up the soMaxConn value from 128 to 65535 to - // handle large incoming concurrent requests. - Backlog: 65535, -} - // Unix listener with special TCP options. -var listen = cfg.NewListener -var fallbackListen = net.Listen +var listenCfg = net.ListenConfig{ + Control: setTCPParameters, +} diff --git a/internal/http/listen_others.go b/internal/http/listen_others.go index dc7876c60..38e35985e 100644 --- a/internal/http/listen_others.go +++ b/internal/http/listen_others.go @@ -23,5 +23,4 @@ package http import "net" // Windows, plan9 specific listener. -var listen = net.Listen -var fallbackListen = net.Listen +var listenCfg = net.ListenConfig{} diff --git a/internal/http/listener.go b/internal/http/listener.go index 06dd20a41..23d0b14f5 100644 --- a/internal/http/listener.go +++ b/internal/http/listener.go @@ -18,11 +18,9 @@ package http import ( + "context" "fmt" - "io" "net" - "os" - "sync" "syscall" ) @@ -33,45 +31,22 @@ type acceptResult struct { // httpListener - HTTP listener capable of handling multiple server addresses. type httpListener struct { - mutex sync.Mutex // to guard Close() method. tcpListeners []*net.TCPListener // underlaying TCP listeners. acceptCh chan acceptResult // channel where all TCP listeners write accepted connection. - doneCh chan struct{} // done channel for TCP listener goroutines. -} - -// isRoutineNetErr returns true if error is due to a network timeout, -// connect reset or io.EOF and false otherwise -func isRoutineNetErr(err error) bool { - if err == nil { - return false - } - if nErr, ok := err.(*net.OpError); ok { - // Check if the error is a tcp connection reset - if syscallErr, ok := nErr.Err.(*os.SyscallError); ok { - if errno, ok := syscallErr.Err.(syscall.Errno); ok { - return errno == syscall.ECONNRESET - } - } - // Check if the error is a timeout - return nErr.Timeout() - } - // check for io.EOF and also some times io.EOF is wrapped is another error type. - return err == io.EOF || err.Error() == "EOF" + ctx context.Context + ctxCanceler context.CancelFunc } // start - starts separate goroutine for each TCP listener. A valid new connection is passed to httpListener.acceptCh. func (listener *httpListener) start() { - listener.acceptCh = make(chan acceptResult) - listener.doneCh = make(chan struct{}) - // Closure to send acceptResult to acceptCh. // It returns true if the result is sent else false if returns when doneCh is closed. - send := func(result acceptResult, doneCh <-chan struct{}) bool { + send := func(result acceptResult) bool { select { case listener.acceptCh <- result: // Successfully written to acceptCh return true - case <-doneCh: + case <-listener.ctx.Done(): // As stop signal is received, close accepted connection. if result.conn != nil { result.conn.Close() @@ -81,56 +56,52 @@ func (listener *httpListener) start() { } // Closure to handle single connection. - handleConn := func(tcpConn *net.TCPConn, doneCh <-chan struct{}) { + handleConn := func(tcpConn *net.TCPConn) { tcpConn.SetKeepAlive(true) - send(acceptResult{tcpConn, nil}, doneCh) + send(acceptResult{tcpConn, nil}) } // Closure to handle TCPListener until done channel is closed. - handleListener := func(tcpListener *net.TCPListener, doneCh <-chan struct{}) { + handleListener := func(tcpListener *net.TCPListener) { for { tcpConn, err := tcpListener.AcceptTCP() if err != nil { // Returns when send fails. - if !send(acceptResult{nil, err}, doneCh) { + if !send(acceptResult{nil, err}) { return } } else { - go handleConn(tcpConn, doneCh) + go handleConn(tcpConn) } } } // Start separate goroutine for each TCP listener to handle connection. for _, tcpListener := range listener.tcpListeners { - go handleListener(tcpListener, listener.doneCh) + go handleListener(tcpListener) } } // Accept - reads from httpListener.acceptCh for one of previously accepted TCP connection and returns the same. func (listener *httpListener) Accept() (conn net.Conn, err error) { - result, ok := <-listener.acceptCh - if ok { - return result.conn, result.err + select { + case result, ok := <-listener.acceptCh: + if ok { + return result.conn, result.err + } + case <-listener.ctx.Done(): } - return nil, syscall.EINVAL } // Close - closes underneath all TCP listeners. func (listener *httpListener) Close() (err error) { - listener.mutex.Lock() - defer listener.mutex.Unlock() - if listener.doneCh == nil { - return syscall.EINVAL - } + listener.ctxCanceler() for i := range listener.tcpListeners { listener.tcpListeners[i].Close() } - close(listener.doneCh) - listener.doneCh = nil return nil } @@ -163,7 +134,7 @@ func (listener *httpListener) Addrs() (addrs []net.Addr) { // httpListener is capable to // * listen to multiple addresses // * controls incoming connections only doing HTTP protocol -func newHTTPListener(serverAddrs []string) (listener *httpListener, err error) { +func newHTTPListener(ctx context.Context, serverAddrs []string) (listener *httpListener, err error) { var tcpListeners []*net.TCPListener @@ -181,10 +152,8 @@ func newHTTPListener(serverAddrs []string) (listener *httpListener, err error) { for _, serverAddr := range serverAddrs { var l net.Listener - if l, err = listen("tcp", serverAddr); err != nil { - if l, err = fallbackListen("tcp", serverAddr); err != nil { - return nil, err - } + if l, err = listenCfg.Listen(ctx, "tcp", serverAddr); err != nil { + return nil, err } tcpListener, ok := l.(*net.TCPListener) @@ -197,7 +166,9 @@ func newHTTPListener(serverAddrs []string) (listener *httpListener, err error) { listener = &httpListener{ tcpListeners: tcpListeners, + acceptCh: make(chan acceptResult, len(tcpListeners)), } + listener.ctx, listener.ctxCanceler = context.WithCancel(ctx) listener.start() return listener, nil diff --git a/internal/http/listener_test.go b/internal/http/listener_test.go index 727ee67a4..bde323c51 100644 --- a/internal/http/listener_test.go +++ b/internal/http/listener_test.go @@ -18,10 +18,8 @@ package http import ( + "context" "crypto/tls" - "errors" - "fmt" - "io" "net" "strconv" "strings" @@ -150,7 +148,7 @@ func TestNewHTTPListener(t *testing.T) { } for _, testCase := range testCases { - listener, err := newHTTPListener( + listener, err := newHTTPListener(context.Background(), testCase.serverAddrs, ) @@ -183,7 +181,7 @@ func TestHTTPListenerStartClose(t *testing.T) { } for i, testCase := range testCases { - listener, err := newHTTPListener( + listener, err := newHTTPListener(context.Background(), testCase.serverAddrs, ) if err != nil { @@ -226,7 +224,7 @@ func TestHTTPListenerAddr(t *testing.T) { } for i, testCase := range testCases { - listener, err := newHTTPListener( + listener, err := newHTTPListener(context.Background(), testCase.serverAddrs, ) if err != nil { @@ -266,7 +264,7 @@ func TestHTTPListenerAddrs(t *testing.T) { } for i, testCase := range testCases { - listener, err := newHTTPListener( + listener, err := newHTTPListener(context.Background(), testCase.serverAddrs, ) if err != nil { @@ -290,50 +288,3 @@ func TestHTTPListenerAddrs(t *testing.T) { listener.Close() } } - -type myTimeoutErr struct { - timeout bool -} - -func (m *myTimeoutErr) Error() string { return fmt.Sprintf("myTimeoutErr: %v", m.timeout) } -func (m *myTimeoutErr) Timeout() bool { return m.timeout } - -// Test for ignoreErr helper function -func TestIgnoreErr(t *testing.T) { - testCases := []struct { - err error - want bool - }{ - { - err: io.EOF, - want: true, - }, - { - err: &net.OpError{Err: &myTimeoutErr{timeout: true}}, - want: true, - }, - { - err: errors.New("EOF"), - want: true, - }, - { - err: &net.OpError{Err: &myTimeoutErr{timeout: false}}, - want: false, - }, - { - err: io.ErrUnexpectedEOF, - want: false, - }, - { - err: nil, - want: false, - }, - } - - for i, tc := range testCases { - if actual := isRoutineNetErr(tc.err); actual != tc.want { - t.Errorf("Test case %d: Expected %v but got %v for %v", i+1, - tc.want, actual, tc.err) - } - } -} diff --git a/internal/http/server.go b/internal/http/server.go index ad953d2ea..d558d5e9f 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -18,6 +18,7 @@ package http import ( + "context" "crypto/tls" "errors" "io/ioutil" @@ -29,7 +30,6 @@ import ( humanize "github.com/dustin/go-humanize" - "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio/internal/config" "github.com/minio/minio/internal/config/api" "github.com/minio/minio/internal/fips" @@ -64,7 +64,7 @@ func (srv *Server) GetRequestCount() int { } // Start - start HTTP server -func (srv *Server) Start() (err error) { +func (srv *Server) Start(ctx context.Context) (err error) { // Take a copy of server fields. var tlsConfig *tls.Config if srv.TLSConfig != nil { @@ -72,12 +72,11 @@ func (srv *Server) Start() (err error) { } handler := srv.Handler // if srv.Handler holds non-synced state -> possible data race - addrs := set.CreateStringSet(srv.Addrs...).ToSlice() // copy and remove duplicates - // Create new HTTP listener. var listener *httpListener listener, err = newHTTPListener( - addrs, + ctx, + srv.Addrs, ) if err != nil { return err