update and use rs/dnscache implementation instead of custom (#13348)

additionally optimize for IP only setups, avoid doing
unnecessary lookups if the Dial addr is an IP.

allow support for multiple listeners on same socket,
this is mainly meant for future purposes.
This commit is contained in:
Harshavardhana 2021-10-05 10:13:04 -07:00 committed by GitHub
parent fabf60bc4c
commit 3d5750f31c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 103 additions and 532 deletions

View File

@ -51,7 +51,6 @@ import (
"github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/config"
"github.com/minio/minio/internal/handlers"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/kms"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/certs"
@ -59,6 +58,7 @@ import (
"github.com/minio/pkg/ellipses"
"github.com/minio/pkg/env"
xnet "github.com/minio/pkg/net"
"github.com/rs/dnscache"
)
// serverDebugLog will enable debug printing
@ -71,17 +71,34 @@ func init() {
logger.Init(GOPATH, GOROOT)
logger.RegisterError(config.FmtError)
if IsKubernetes() || IsDocker() || IsBOSH() || IsDCOS() || IsPCFTile() {
// 30 seconds matches the orchestrator DNS TTLs, have
// a 5 second timeout to lookup from DNS servers.
globalDNSCache = xhttp.NewDNSCache(30*time.Second, 5*time.Second, logger.LogOnceIf)
} else {
// On bare-metals DNS do not change often, so it is
// safe to assume a higher timeout upto 10 minutes.
globalDNSCache = xhttp.NewDNSCache(10*time.Minute, 5*time.Second, logger.LogOnceIf)
}
initGlobalContext()
options := dnscache.ResolverRefreshOptions{
ClearUnused: true,
PersistOnFailure: false,
}
// Call to refresh will refresh names in cache. If you pass true, it will also
// remove cached names not looked up since the last call to Refresh. It is a good idea
// to call this method on a regular interval.
go func() {
var t *time.Ticker
if IsKubernetes() || IsDocker() || IsBOSH() || IsDCOS() || IsPCFTile() {
t = time.NewTicker(1 * time.Minute)
} else {
t = time.NewTicker(10 * time.Minute)
}
defer t.Stop()
for {
select {
case <-t.C:
globalDNSCache.RefreshWithOptions(options)
case <-GlobalContext.Done():
return
}
}
}()
globalForwarder = handlers.NewForwarder(&handlers.Forwarder{
PassHost: true,
RoundTripper: newGatewayHTTPTransport(1 * time.Hour),

View File

@ -154,8 +154,6 @@ func ValidateGatewayArguments(serverAddr, endpointAddr string) error {
// StartGateway - handler for 'minio gateway <name>'.
func StartGateway(ctx *cli.Context, gw Gateway) {
defer globalDNSCache.Stop()
signal.Notify(globalOSSignalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
go handleSignals()
@ -268,7 +266,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
return GlobalContext
}
go func() {
globalHTTPServerErrorCh <- httpServer.Start()
globalHTTPServerErrorCh <- httpServer.Start(GlobalContext)
}()
globalObjLayerMutex.Lock()

View File

@ -30,6 +30,7 @@ import (
"github.com/minio/minio/internal/bucket/bandwidth"
"github.com/minio/minio/internal/handlers"
"github.com/minio/minio/internal/kms"
"github.com/rs/dnscache"
"github.com/dustin/go-humanize"
"github.com/minio/minio/internal/auth"
@ -309,7 +310,9 @@ var (
globalProxyTransport http.RoundTripper
globalDNSCache *xhttp.DNSCache
globalDNSCache = &dnscache.Resolver{
Timeout: 5 * time.Second,
}
globalForwarder *handlers.Forwarder

View File

@ -431,8 +431,6 @@ func (lw nullWriter) Write(b []byte) (int, error) {
// serverMain handler called for 'minio server' command.
func serverMain(ctx *cli.Context) {
defer globalDNSCache.Stop()
signal.Notify(globalOSSignalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
go handleSignals()
@ -496,14 +494,15 @@ func serverMain(ctx *cli.Context) {
getCert = globalTLSCerts.GetCertificate
}
httpServer := xhttp.NewServer([]string{globalMinioAddr}, criticalErrorHandler{corsHandler(handler)}, getCert)
httpServer := xhttp.NewServer([]string{globalMinioAddr},
criticalErrorHandler{corsHandler(handler)}, getCert)
httpServer.BaseContext = func(listener net.Listener) context.Context {
return GlobalContext
}
// Turn-off random logging by Go internally
httpServer.ErrorLog = log.New(&nullWriter{}, "", 0)
go func() {
globalHTTPServerErrorCh <- httpServer.Start()
globalHTTPServerErrorCh <- httpServer.Start(GlobalContext)
}()
setHTTPServer(httpServer)

View File

@ -63,7 +63,6 @@ import (
"github.com/minio/minio/internal/config"
"github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/rest"
"github.com/minio/pkg/bucket/policy"
@ -106,8 +105,6 @@ func TestMain(m *testing.M) {
// Initialize globalConsoleSys system
globalConsoleSys = NewConsoleLogger(context.Background())
globalDNSCache = xhttp.NewDNSCache(3*time.Second, 10*time.Second, logger.LogOnceIf)
globalInternodeTransport = newInternodeHTTPTransport(nil, rest.DefaultTimeout)()
initHelp()

2
go.mod
View File

@ -69,12 +69,12 @@ require (
github.com/prometheus/client_model v0.2.0
github.com/prometheus/procfs v0.7.3
github.com/rs/cors v1.7.0
github.com/rs/dnscache v0.0.0-20210201191234-295bba877686
github.com/secure-io/sio-go v0.3.1
github.com/shirou/gopsutil/v3 v3.21.7
github.com/streadway/amqp v1.0.0
github.com/tinylib/msgp v1.1.6-0.20210521143832-0becd170c402
github.com/valyala/bytebufferpool v1.0.0
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/yargevad/filepathx v1.0.0
go.etcd.io/etcd/api/v3 v3.5.0-beta.4

3
go.sum
View File

@ -1268,6 +1268,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/rogpeppe/go-internal v1.5.2/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik=
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/rs/dnscache v0.0.0-20210201191234-295bba877686 h1:IJ6Df0uxPDtNoByV0KkzVKNseWvZFCNM/S9UoyOMCSI=
github.com/rs/dnscache v0.0.0-20210201191234-295bba877686/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA=
github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
@ -1423,7 +1425,6 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.2.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/quicktemplate v1.2.0/go.mod h1:EH+4AkTd43SvgIbQHYu59/cJyxDoOVRUAfrukLPuGJ4=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a h1:0R4NLDRDZX6JcmhJgXi5E4b8Wg84ihbmUKp/GvSPEzc=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/vdemeester/k8s-pkg-credentialprovider v1.17.4/go.mod h1:inCTmtUdr5KJbreVojo06krnTgaeAz/Z7lynpPk/Q2c=
github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw=

View File

@ -19,15 +19,11 @@ package http
import (
"context"
"math/rand"
"net"
"sync"
"time"
)
var randPerm = func(n int) []int {
return rand.Perm(n)
}
"github.com/rs/dnscache"
)
// DialContextWithDNSCache is a helper function which returns `net.DialContext` function.
// It randomly fetches an IP from the DNS cache and dials it by the given dial
@ -39,7 +35,7 @@ var randPerm = func(n int) []int {
//
// In this function, it uses functions from `rand` package. To make it really random,
// you MUST call `rand.Seed` and change the value from the default in your application
func DialContextWithDNSCache(cache *DNSCache, baseDialCtx DialContext) DialContext {
func DialContextWithDNSCache(resolver *dnscache.Resolver, baseDialCtx DialContext) DialContext {
if baseDialCtx == nil {
// This is same as which `http.DefaultTransport` uses.
baseDialCtx = (&net.Dialer{
@ -47,147 +43,29 @@ func DialContextWithDNSCache(cache *DNSCache, baseDialCtx DialContext) DialConte
KeepAlive: 30 * time.Second,
}).DialContext
}
return func(ctx context.Context, network, host string) (net.Conn, error) {
h, p, err := net.SplitHostPort(host)
return func(ctx context.Context, network, addr string) (conn net.Conn, err error) {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
// Fetch DNS result from cache.
//
// ctxLookup is only used for canceling DNS Lookup.
ctxLookup, cancelF := context.WithTimeout(ctx, cache.lookupTimeout)
defer cancelF()
addrs, err := cache.Fetch(ctxLookup, h)
if net.ParseIP(host) != nil {
// For IP only setups there is no need for DNS lookups.
return baseDialCtx(ctx, "tcp", addr)
}
ips, err := resolver.LookupHost(ctx, host)
if err != nil {
return nil, err
}
var firstErr error
for _, randomIndex := range randPerm(len(addrs)) {
conn, err := baseDialCtx(ctx, "tcp", net.JoinHostPort(addrs[randomIndex], p))
for _, ip := range ips {
conn, err = baseDialCtx(ctx, "tcp", net.JoinHostPort(ip, port))
if err == nil {
return conn, nil
}
if firstErr == nil {
firstErr = err
break
}
}
return nil, firstErr
}
}
// defaultFreq is default frequency a resolver refreshes DNS cache.
var (
defaultFreq = 3 * time.Second
defaultLookupTimeout = 10 * time.Second
)
// DNSCache is DNS cache resolver which cache DNS resolve results in memory.
type DNSCache struct {
resolver *net.Resolver
lookupTimeout time.Duration
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
cache sync.Map
doneOnce sync.Once
doneCh chan struct{}
}
// NewDNSCache initializes DNS cache resolver and starts auto refreshing
// in a new goroutine. To stop auto refreshing, call `Stop()` function.
// Once `Stop()` is called auto refreshing cannot be resumed.
func NewDNSCache(freq time.Duration, lookupTimeout time.Duration, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})) *DNSCache {
if freq <= 0 {
freq = defaultFreq
}
if lookupTimeout <= 0 {
lookupTimeout = defaultLookupTimeout
}
// PreferGo controls whether Go's built-in DNS resolver
// is preferred on platforms where it's available, since
// we do not compile with CGO, FIPS builds are CGO based
// enable this to enforce Go resolver.
defaultResolver := &net.Resolver{
PreferGo: true,
}
r := &DNSCache{
resolver: defaultResolver,
lookupTimeout: lookupTimeout,
loggerOnce: loggerOnce,
doneCh: make(chan struct{}),
}
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
timer := time.NewTimer(freq)
go func() {
defer timer.Stop()
for {
select {
case <-timer.C:
// Make sure that refreshes on DNS do not be attempted
// at the same time, allows for reduced load on the
// DNS servers.
timer.Reset(time.Duration(rnd.Float64() * float64(freq)))
r.Refresh()
case <-r.doneCh:
return
}
}
}()
return r
}
// LookupHost lookups address list from DNS server, persist the results
// in-memory cache. `Fetch` is used to obtain the values for a given host.
func (r *DNSCache) LookupHost(ctx context.Context, host string) ([]string, error) {
addrs, err := r.resolver.LookupHost(ctx, host)
if err != nil {
return nil, err
}
r.cache.Store(host, addrs)
return addrs, nil
}
// Fetch fetches IP list from the cache. If IP list of the given addr is not in the cache,
// then it lookups from DNS server by `Lookup` function.
func (r *DNSCache) Fetch(ctx context.Context, host string) ([]string, error) {
addrs, ok := r.cache.Load(host)
if ok {
return addrs.([]string), nil
}
return r.LookupHost(ctx, host)
}
// Refresh refreshes IP list cache, automatically.
func (r *DNSCache) Refresh() {
var hosts []string
r.cache.Range(func(k, v interface{}) bool {
hosts = append(hosts, k.(string))
return true
})
for _, host := range hosts {
ctx, cancelF := context.WithTimeout(context.Background(), r.lookupTimeout)
if _, err := r.LookupHost(ctx, host); err != nil {
r.loggerOnce(ctx, err, host)
}
cancelF()
}
}
// Stop stops auto refreshing.
func (r *DNSCache) Stop() {
r.doneOnce.Do(func() {
close(r.doneCh)
})
}

View File

@ -1,209 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package http
import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"runtime"
"testing"
"time"
)
var (
testFreq = 1 * time.Second
testDefaultLookupTimeout = 1 * time.Second
)
func logOnce(ctx context.Context, err error, id interface{}, errKind ...interface{}) {
// no-op
}
func testDNSCache(t *testing.T) *DNSCache {
t.Helper() // skip printing file and line information from this function
return NewDNSCache(testFreq, testDefaultLookupTimeout, logOnce)
}
func TestDialContextWithDNSCache(t *testing.T) {
resolver := &DNSCache{}
resolver.cache.Store("play.min.io", []string{
"127.0.0.1",
"127.0.0.2",
"127.0.0.3",
})
cases := []struct {
permF func(n int) []int
dialF DialContext
}{
{
permF: func(n int) []int {
return []int{0}
},
dialF: func(ctx context.Context, network, addr string) (net.Conn, error) {
if got, want := addr, net.JoinHostPort("127.0.0.1", "443"); got != want {
t.Fatalf("got addr %q, want %q", got, want)
}
return nil, nil
},
},
{
permF: func(n int) []int {
return []int{1}
},
dialF: func(ctx context.Context, network, addr string) (net.Conn, error) {
if got, want := addr, net.JoinHostPort("127.0.0.2", "443"); got != want {
t.Fatalf("got addr %q, want %q", got, want)
}
return nil, nil
},
},
{
permF: func(n int) []int {
return []int{2}
},
dialF: func(ctx context.Context, network, addr string) (net.Conn, error) {
if got, want := addr, net.JoinHostPort("127.0.0.3", "443"); got != want {
t.Fatalf("got addr %q, want %q", got, want)
}
return nil, nil
},
},
}
origFunc := randPerm
defer func() {
randPerm = origFunc
}()
for _, tc := range cases {
t.Run("", func(t *testing.T) {
randPerm = tc.permF
if _, err := DialContextWithDNSCache(resolver, tc.dialF)(context.Background(), "tcp", "play.min.io:443"); err != nil {
t.Fatalf("err: %s", err)
}
})
}
}
func TestDialContextWithDNSCacheRand(t *testing.T) {
rand.Seed(time.Now().UTC().UnixNano())
defer func() {
rand.Seed(1)
}()
resolver := &DNSCache{}
resolver.cache.Store("play.min.io", []string{
"127.0.0.1",
"127.0.0.2",
"127.0.0.3",
})
count := make(map[string]int)
dialF := func(ctx context.Context, network, addr string) (net.Conn, error) {
count[addr]++
return nil, nil
}
for i := 0; i < 100; i++ {
if _, err := DialContextWithDNSCache(resolver, dialF)(context.Background(), "tcp", "play.min.io:443"); err != nil {
t.Fatalf("err: %s", err)
}
}
for _, c := range count {
got := float32(c) / float32(100)
if got < float32(0.1) {
t.Fatalf("expected 0.1 rate got %f", got)
}
}
}
// Verify without port Dial fails, Go stdlib net.Dial expects port
func TestDialContextWithDNSCacheScenario1(t *testing.T) {
resolver := testDNSCache(t)
if _, err := DialContextWithDNSCache(resolver, nil)(context.Background(), "tcp", "play.min.io"); err == nil {
t.Fatalf("expect to fail") // expected port
}
}
// Verify if the host lookup function failed to return addresses
func TestDialContextWithDNSCacheScenario2(t *testing.T) {
if runtime.GOOS == "windows" {
// Windows doesn't use Dial to connect
// so there is no way this test will work
// as expected.
t.Skip()
}
res := testDNSCache(t)
originalResolver := res.resolver
defer func() {
res.resolver = originalResolver
}()
res.resolver = &net.Resolver{
PreferGo: true,
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
return nil, fmt.Errorf("err")
},
}
if _, err := DialContextWithDNSCache(res, nil)(context.Background(), "tcp", "min.io:443"); err == nil {
t.Fatalf("expect to fail")
}
}
// Verify we always return the first error from net.Dial failure
func TestDialContextWithDNSCacheScenario3(t *testing.T) {
resolver := &DNSCache{}
resolver.cache.Store("min.io", []string{
"1.1.1.1",
"2.2.2.2",
"3.3.3.3",
})
origFunc := randPerm
randPerm = func(n int) []int {
return []int{0, 1, 2}
}
defer func() {
randPerm = origFunc
}()
want := errors.New("error1")
dialF := func(ctx context.Context, network, addr string) (net.Conn, error) {
if addr == net.JoinHostPort("1.1.1.1", "443") {
return nil, want // first error should be returned
}
if addr == net.JoinHostPort("2.2.2.2", "443") {
return nil, fmt.Errorf("error2")
}
if addr == net.JoinHostPort("3.3.3.3", "443") {
return nil, fmt.Errorf("error3")
}
return nil, nil
}
_, got := DialContextWithDNSCache(resolver, dialF)(context.Background(), "tcp", "min.io:443")
if got != want {
t.Fatalf("got error %v, want %v", got, want)
}
}

View File

@ -29,11 +29,19 @@ import (
"golang.org/x/sys/unix"
)
func setInternalTCPParameters(c syscall.RawConn) error {
return c.Control(func(fdPtr uintptr) {
func setTCPParameters(network, address string, c syscall.RawConn) error {
c.Control(func(fdPtr uintptr) {
// got socket file descriptor to set parameters.
fd := int(fdPtr)
_ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)
_ = unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)
// Enable TCP open
// https://lwn.net/Articles/508865/ - 16k queue size.
_ = syscall.SetsockoptInt(fd, syscall.SOL_TCP, unix.TCP_FASTOPEN, 16*1024)
// Enable TCP fast connect
// TCPFastOpenConnect sets the underlying socket to use
// the TCP fast open connect. This feature is supported
@ -44,22 +52,8 @@ func setInternalTCPParameters(c syscall.RawConn) error {
// "Set TCP_QUICKACK. If you find a case where that makes things worse, let me know."
_ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_QUICKACK, 1)
// The time (in seconds) the connection needs to remain idle before
// TCP starts sending keepalive probes, set this to 5 secs
// system defaults to 7200 secs!!!
_ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, 5)
// Number of probes.
// ~ cat /proc/sys/net/ipv4/tcp_keepalive_probes (defaults to 9, we reduce it to 5)
// 9
_ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPCNT, 5)
// Wait time after successful probe in seconds.
// ~ cat /proc/sys/net/ipv4/tcp_keepalive_intvl (defaults to 75 secs, we reduce it to 3 secs)
// 75
_ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, 3)
})
return nil
}
// DialContext is a function to make custom Dial for internode communications
@ -70,9 +64,7 @@ func NewInternodeDialContext(dialTimeout time.Duration) DialContext {
return func(ctx context.Context, network, addr string) (net.Conn, error) {
dialer := &net.Dialer{
Timeout: dialTimeout,
Control: func(network, address string, c syscall.RawConn) error {
return setInternalTCPParameters(c)
},
Control: setTCPParameters,
}
return dialer.DialContext(ctx, network, addr)
}
@ -83,22 +75,7 @@ func NewCustomDialContext(dialTimeout time.Duration) DialContext {
return func(ctx context.Context, network, addr string) (net.Conn, error) {
dialer := &net.Dialer{
Timeout: dialTimeout,
Control: func(network, address string, c syscall.RawConn) error {
return c.Control(func(fdPtr uintptr) {
// got socket file descriptor to set parameters.
fd := int(fdPtr)
// Enable TCP fast connect
// TCPFastOpenConnect sets the underlying socket to use
// the TCP fast open connect. This feature is supported
// since Linux 4.11.
_ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_FASTOPEN_CONNECT, 1)
// Enable TCP quick ACK, John Nagle says
// "Set TCP_QUICKACK. If you find a case where that makes things worse, let me know."
_ = syscall.SetsockoptInt(fd, syscall.IPPROTO_TCP, unix.TCP_QUICKACK, 1)
})
},
Control: setTCPParameters,
}
return dialer.DialContext(ctx, network, addr)
}

View File

@ -29,7 +29,7 @@ import (
// TODO: if possible implement for non-linux platforms, not a priority at the moment
//nolint:deadcode
func setInternalTCPParameters(c syscall.RawConn) error {
func setTCPParameters(string, string, syscall.RawConn) error {
return nil
}

View File

@ -22,19 +22,9 @@ package http
import (
"net"
"github.com/valyala/tcplisten"
)
var cfg = &tcplisten.Config{
ReusePort: true,
DeferAccept: true,
FastOpen: true,
// Bump up the soMaxConn value from 128 to 65535 to
// handle large incoming concurrent requests.
Backlog: 65535,
}
// Unix listener with special TCP options.
var listen = cfg.NewListener
var fallbackListen = net.Listen
var listenCfg = net.ListenConfig{
Control: setTCPParameters,
}

View File

@ -23,5 +23,4 @@ package http
import "net"
// Windows, plan9 specific listener.
var listen = net.Listen
var fallbackListen = net.Listen
var listenCfg = net.ListenConfig{}

View File

@ -18,11 +18,9 @@
package http
import (
"context"
"fmt"
"io"
"net"
"os"
"sync"
"syscall"
)
@ -33,45 +31,22 @@ type acceptResult struct {
// httpListener - HTTP listener capable of handling multiple server addresses.
type httpListener struct {
mutex sync.Mutex // to guard Close() method.
tcpListeners []*net.TCPListener // underlaying TCP listeners.
acceptCh chan acceptResult // channel where all TCP listeners write accepted connection.
doneCh chan struct{} // done channel for TCP listener goroutines.
}
// isRoutineNetErr returns true if error is due to a network timeout,
// connect reset or io.EOF and false otherwise
func isRoutineNetErr(err error) bool {
if err == nil {
return false
}
if nErr, ok := err.(*net.OpError); ok {
// Check if the error is a tcp connection reset
if syscallErr, ok := nErr.Err.(*os.SyscallError); ok {
if errno, ok := syscallErr.Err.(syscall.Errno); ok {
return errno == syscall.ECONNRESET
}
}
// Check if the error is a timeout
return nErr.Timeout()
}
// check for io.EOF and also some times io.EOF is wrapped is another error type.
return err == io.EOF || err.Error() == "EOF"
ctx context.Context
ctxCanceler context.CancelFunc
}
// start - starts separate goroutine for each TCP listener. A valid new connection is passed to httpListener.acceptCh.
func (listener *httpListener) start() {
listener.acceptCh = make(chan acceptResult)
listener.doneCh = make(chan struct{})
// Closure to send acceptResult to acceptCh.
// It returns true if the result is sent else false if returns when doneCh is closed.
send := func(result acceptResult, doneCh <-chan struct{}) bool {
send := func(result acceptResult) bool {
select {
case listener.acceptCh <- result:
// Successfully written to acceptCh
return true
case <-doneCh:
case <-listener.ctx.Done():
// As stop signal is received, close accepted connection.
if result.conn != nil {
result.conn.Close()
@ -81,56 +56,52 @@ func (listener *httpListener) start() {
}
// Closure to handle single connection.
handleConn := func(tcpConn *net.TCPConn, doneCh <-chan struct{}) {
handleConn := func(tcpConn *net.TCPConn) {
tcpConn.SetKeepAlive(true)
send(acceptResult{tcpConn, nil}, doneCh)
send(acceptResult{tcpConn, nil})
}
// Closure to handle TCPListener until done channel is closed.
handleListener := func(tcpListener *net.TCPListener, doneCh <-chan struct{}) {
handleListener := func(tcpListener *net.TCPListener) {
for {
tcpConn, err := tcpListener.AcceptTCP()
if err != nil {
// Returns when send fails.
if !send(acceptResult{nil, err}, doneCh) {
if !send(acceptResult{nil, err}) {
return
}
} else {
go handleConn(tcpConn, doneCh)
go handleConn(tcpConn)
}
}
}
// Start separate goroutine for each TCP listener to handle connection.
for _, tcpListener := range listener.tcpListeners {
go handleListener(tcpListener, listener.doneCh)
go handleListener(tcpListener)
}
}
// Accept - reads from httpListener.acceptCh for one of previously accepted TCP connection and returns the same.
func (listener *httpListener) Accept() (conn net.Conn, err error) {
result, ok := <-listener.acceptCh
select {
case result, ok := <-listener.acceptCh:
if ok {
return result.conn, result.err
}
case <-listener.ctx.Done():
}
return nil, syscall.EINVAL
}
// Close - closes underneath all TCP listeners.
func (listener *httpListener) Close() (err error) {
listener.mutex.Lock()
defer listener.mutex.Unlock()
if listener.doneCh == nil {
return syscall.EINVAL
}
listener.ctxCanceler()
for i := range listener.tcpListeners {
listener.tcpListeners[i].Close()
}
close(listener.doneCh)
listener.doneCh = nil
return nil
}
@ -163,7 +134,7 @@ func (listener *httpListener) Addrs() (addrs []net.Addr) {
// httpListener is capable to
// * listen to multiple addresses
// * controls incoming connections only doing HTTP protocol
func newHTTPListener(serverAddrs []string) (listener *httpListener, err error) {
func newHTTPListener(ctx context.Context, serverAddrs []string) (listener *httpListener, err error) {
var tcpListeners []*net.TCPListener
@ -181,11 +152,9 @@ func newHTTPListener(serverAddrs []string) (listener *httpListener, err error) {
for _, serverAddr := range serverAddrs {
var l net.Listener
if l, err = listen("tcp", serverAddr); err != nil {
if l, err = fallbackListen("tcp", serverAddr); err != nil {
if l, err = listenCfg.Listen(ctx, "tcp", serverAddr); err != nil {
return nil, err
}
}
tcpListener, ok := l.(*net.TCPListener)
if !ok {
@ -197,7 +166,9 @@ func newHTTPListener(serverAddrs []string) (listener *httpListener, err error) {
listener = &httpListener{
tcpListeners: tcpListeners,
acceptCh: make(chan acceptResult, len(tcpListeners)),
}
listener.ctx, listener.ctxCanceler = context.WithCancel(ctx)
listener.start()
return listener, nil

View File

@ -18,10 +18,8 @@
package http
import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"strconv"
"strings"
@ -150,7 +148,7 @@ func TestNewHTTPListener(t *testing.T) {
}
for _, testCase := range testCases {
listener, err := newHTTPListener(
listener, err := newHTTPListener(context.Background(),
testCase.serverAddrs,
)
@ -183,7 +181,7 @@ func TestHTTPListenerStartClose(t *testing.T) {
}
for i, testCase := range testCases {
listener, err := newHTTPListener(
listener, err := newHTTPListener(context.Background(),
testCase.serverAddrs,
)
if err != nil {
@ -226,7 +224,7 @@ func TestHTTPListenerAddr(t *testing.T) {
}
for i, testCase := range testCases {
listener, err := newHTTPListener(
listener, err := newHTTPListener(context.Background(),
testCase.serverAddrs,
)
if err != nil {
@ -266,7 +264,7 @@ func TestHTTPListenerAddrs(t *testing.T) {
}
for i, testCase := range testCases {
listener, err := newHTTPListener(
listener, err := newHTTPListener(context.Background(),
testCase.serverAddrs,
)
if err != nil {
@ -290,50 +288,3 @@ func TestHTTPListenerAddrs(t *testing.T) {
listener.Close()
}
}
type myTimeoutErr struct {
timeout bool
}
func (m *myTimeoutErr) Error() string { return fmt.Sprintf("myTimeoutErr: %v", m.timeout) }
func (m *myTimeoutErr) Timeout() bool { return m.timeout }
// Test for ignoreErr helper function
func TestIgnoreErr(t *testing.T) {
testCases := []struct {
err error
want bool
}{
{
err: io.EOF,
want: true,
},
{
err: &net.OpError{Err: &myTimeoutErr{timeout: true}},
want: true,
},
{
err: errors.New("EOF"),
want: true,
},
{
err: &net.OpError{Err: &myTimeoutErr{timeout: false}},
want: false,
},
{
err: io.ErrUnexpectedEOF,
want: false,
},
{
err: nil,
want: false,
},
}
for i, tc := range testCases {
if actual := isRoutineNetErr(tc.err); actual != tc.want {
t.Errorf("Test case %d: Expected %v but got %v for %v", i+1,
tc.want, actual, tc.err)
}
}
}

View File

@ -18,6 +18,7 @@
package http
import (
"context"
"crypto/tls"
"errors"
"io/ioutil"
@ -29,7 +30,6 @@ import (
humanize "github.com/dustin/go-humanize"
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio/internal/config"
"github.com/minio/minio/internal/config/api"
"github.com/minio/minio/internal/fips"
@ -64,7 +64,7 @@ func (srv *Server) GetRequestCount() int {
}
// Start - start HTTP server
func (srv *Server) Start() (err error) {
func (srv *Server) Start(ctx context.Context) (err error) {
// Take a copy of server fields.
var tlsConfig *tls.Config
if srv.TLSConfig != nil {
@ -72,12 +72,11 @@ func (srv *Server) Start() (err error) {
}
handler := srv.Handler // if srv.Handler holds non-synced state -> possible data race
addrs := set.CreateStringSet(srv.Addrs...).ToSlice() // copy and remove duplicates
// Create new HTTP listener.
var listener *httpListener
listener, err = newHTTPListener(
addrs,
ctx,
srv.Addrs,
)
if err != nil {
return err