mirror of
https://github.com/minio/minio.git
synced 2025-01-23 04:33:15 -05:00
Bump default idleConnsPerHost to control conns in time_wait (#10653)
This PR fixes a hang which occurs quite commonly at higher concurrency by allowing following changes - allowing lower connections in time_wait allows faster socket open's - lower idle connection timeout to ensure that we let kernel reclaim the time_wait connections quickly - increase somaxconn to 4096 instead of 2048 to allow larger tcp syn backlogs. fixes #10413
This commit is contained in:
parent
abb14aeec1
commit
2760fc86af
@ -334,7 +334,8 @@ func checkRequestAuthTypeToAccessKey(ctx context.Context, r *http.Request, actio
|
||||
r.Body = ioutil.NopCloser(bytes.NewReader(payload))
|
||||
}
|
||||
|
||||
if cred.AccessKey == "" {
|
||||
if action != policy.ListAllMyBucketsAction && cred.AccessKey == "" {
|
||||
// Anonymous checks are not meant for ListBuckets action
|
||||
if globalPolicySys.IsAllowed(policy.Args{
|
||||
AccountName: cred.AccessKey,
|
||||
Action: action,
|
||||
@ -378,6 +379,7 @@ func checkRequestAuthTypeToAccessKey(ctx context.Context, r *http.Request, actio
|
||||
// Request is allowed return the appropriate access key.
|
||||
return cred.AccessKey, owner, ErrNone
|
||||
}
|
||||
|
||||
if action == policy.ListBucketVersionsAction {
|
||||
// In AWS S3 s3:ListBucket permission is same as s3:ListBucketVersions permission
|
||||
// verify as a fallback.
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
minio "github.com/minio/minio-go/v7"
|
||||
miniogo "github.com/minio/minio-go/v7"
|
||||
@ -281,8 +282,9 @@ func (sys *BucketTargetSys) getRemoteTargetClient(tcfg *madmin.BucketTarget) (*m
|
||||
creds := credentials.NewStaticV4(config.AccessKey, config.SecretKey, "")
|
||||
|
||||
getRemoteTargetInstanceTransportOnce.Do(func() {
|
||||
getRemoteTargetInstanceTransport = NewGatewayHTTPTransport()
|
||||
getRemoteTargetInstanceTransport = newGatewayHTTPTransport(1 * time.Hour)
|
||||
})
|
||||
|
||||
core, err := miniogo.NewCore(tcfg.Endpoint, &miniogo.Options{
|
||||
Creds: creds,
|
||||
Secure: tcfg.Secure,
|
||||
|
@ -771,11 +771,7 @@ func GetProxyEndpoints(endpointZones EndpointZones) ([]ProxyEndpoint, error) {
|
||||
}
|
||||
|
||||
// allow transport to be HTTP/1.1 for proxying.
|
||||
tr := newCustomHTTP11Transport(tlsConfig, rest.DefaultTimeout)()
|
||||
|
||||
// Allow more requests to be in flight with higher response header timeout.
|
||||
tr.ResponseHeaderTimeout = 30 * time.Minute
|
||||
tr.MaxIdleConnsPerHost = 64
|
||||
tr := newCustomHTTPProxyTransport(tlsConfig, rest.DefaultTimeout)()
|
||||
|
||||
proxyEps = append(proxyEps, ProxyEndpoint{
|
||||
Endpoint: endpoint,
|
||||
|
@ -449,7 +449,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
}
|
||||
}
|
||||
|
||||
defer er.deleteObject(ctx, minioMetaTmpBucket, tmpID, len(storageDisks)/2+1)
|
||||
defer er.deleteObject(context.Background(), minioMetaTmpBucket, tmpID, len(storageDisks)/2+1)
|
||||
|
||||
// Generate and write `xl.meta` generated from other disks.
|
||||
outDatedDisks, err = writeUniqueFileInfo(ctx, outDatedDisks, minioMetaTmpBucket, tmpID,
|
||||
|
@ -277,7 +277,7 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
|
||||
// Delete the tmp path later in case we fail to commit (ignore
|
||||
// returned errors) - this will be a no-op in case of a commit
|
||||
// success.
|
||||
defer er.deleteObject(ctx, minioMetaTmpBucket, tempUploadIDPath, writeQuorum)
|
||||
defer er.deleteObject(context.Background(), minioMetaTmpBucket, tempUploadIDPath, writeQuorum)
|
||||
|
||||
var partsMetadata = make([]FileInfo, len(onlineDisks))
|
||||
for i := range onlineDisks {
|
||||
@ -396,7 +396,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
||||
tmpPartPath := pathJoin(tmpPart, partSuffix)
|
||||
|
||||
// Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete.
|
||||
defer er.deleteObject(ctx, minioMetaTmpBucket, tmpPart, writeQuorum)
|
||||
defer er.deleteObject(context.Background(), minioMetaTmpBucket, tmpPart, writeQuorum)
|
||||
|
||||
erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
|
||||
if err != nil {
|
||||
|
@ -271,6 +271,17 @@ func (s *erasureSets) monitorAndConnectEndpoints(ctx context.Context, monitorInt
|
||||
}
|
||||
}
|
||||
|
||||
// GetAllLockers return a list of all lockers for all sets.
|
||||
func (s *erasureSets) GetAllLockers() []dsync.NetLocker {
|
||||
allLockers := make([]dsync.NetLocker, s.setDriveCount*s.setCount)
|
||||
for i, lockers := range s.erasureLockers {
|
||||
for j, locker := range lockers {
|
||||
allLockers[i*s.setDriveCount+j] = locker
|
||||
}
|
||||
}
|
||||
return allLockers
|
||||
}
|
||||
|
||||
func (s *erasureSets) GetLockers(setIndex int) func() ([]dsync.NetLocker, string) {
|
||||
return func() ([]dsync.NetLocker, string) {
|
||||
lockers := make([]dsync.NetLocker, s.setDriveCount)
|
||||
|
@ -33,6 +33,7 @@ import (
|
||||
"github.com/minio/minio/cmd/config/storageclass"
|
||||
xhttp "github.com/minio/minio/cmd/http"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/dsync"
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
"github.com/minio/minio/pkg/sync/errgroup"
|
||||
)
|
||||
@ -92,6 +93,10 @@ func (z *erasureZones) NewNSLock(ctx context.Context, bucket string, objects ...
|
||||
return z.zones[0].NewNSLock(ctx, bucket, objects...)
|
||||
}
|
||||
|
||||
func (z *erasureZones) GetAllLockers() []dsync.NetLocker {
|
||||
return z.zones[0].GetAllLockers()
|
||||
}
|
||||
|
||||
func (z *erasureZones) SetDriveCount() int {
|
||||
return z.zones[0].SetDriveCount()
|
||||
}
|
||||
|
@ -289,7 +289,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
|
||||
globalHTTPServer = httpServer
|
||||
globalObjLayerMutex.Unlock()
|
||||
|
||||
signal.Notify(globalOSSignalCh, os.Interrupt, syscall.SIGTERM)
|
||||
signal.Notify(globalOSSignalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
|
||||
|
||||
newObject, err := gw.NewGatewayLayer(globalActiveCred)
|
||||
if err != nil {
|
||||
@ -323,8 +323,8 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
|
||||
}
|
||||
|
||||
if enableIAMOps {
|
||||
// Initialize IAM sys.
|
||||
startBackgroundIAMLoad(GlobalContext, newObject)
|
||||
// Initialize users credentials and policies in background.
|
||||
go globalIAMSys.Init(GlobalContext, newObject)
|
||||
}
|
||||
|
||||
if globalCacheConfig.Enabled {
|
||||
|
@ -27,9 +27,9 @@ import (
|
||||
var cfg = &tcplisten.Config{
|
||||
DeferAccept: true,
|
||||
FastOpen: true,
|
||||
// Bump up the soMaxConn value from 128 to 2048 to
|
||||
// Bump up the soMaxConn value from 128 to 4096 to
|
||||
// handle large incoming concurrent requests.
|
||||
Backlog: 2048,
|
||||
Backlog: 4096,
|
||||
}
|
||||
|
||||
// Unix listener with special TCP options.
|
||||
|
@ -408,13 +408,6 @@ func (sys *IAMSys) doIAMConfigMigration(ctx context.Context) error {
|
||||
return sys.store.migrateBackendFormat(ctx)
|
||||
}
|
||||
|
||||
// Loads IAM users and policies in background, any un-handled
|
||||
// error means this code can potentially crash the server
|
||||
// in such a situation manual intervention is necessary.
|
||||
func startBackgroundIAMLoad(ctx context.Context, objAPI ObjectLayer) {
|
||||
go globalIAMSys.Init(ctx, objAPI)
|
||||
}
|
||||
|
||||
// Init - initializes config system by reading entries from config/iam
|
||||
func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
|
||||
if objAPI == nil {
|
||||
|
@ -247,6 +247,11 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
z, ok := objAPI.(*erasureZones)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
type nlock struct {
|
||||
locks int
|
||||
writer bool
|
||||
@ -265,6 +270,8 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error {
|
||||
}
|
||||
}
|
||||
|
||||
allLockersFn := z.GetAllLockers
|
||||
|
||||
// Validate if long lived locks are indeed clean.
|
||||
// Get list of long lived locks to check for staleness.
|
||||
for lendpoint, nlrips := range getLongLivedLocks(interval) {
|
||||
@ -273,8 +280,7 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error {
|
||||
// Locks are only held on first zone, make sure that
|
||||
// we only look for ownership of locks from endpoints
|
||||
// on first zone.
|
||||
for _, endpoint := range globalEndpoints[0].Endpoints {
|
||||
c := newLockAPI(endpoint)
|
||||
for _, c := range allLockersFn() {
|
||||
if !c.IsOnline() {
|
||||
updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer)
|
||||
continue
|
||||
@ -292,16 +298,12 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error {
|
||||
cancel()
|
||||
if err != nil {
|
||||
updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer)
|
||||
c.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
if !expired {
|
||||
updateNlocks(nlripsMap, nlrip.name, nlrip.lri.Writer)
|
||||
}
|
||||
|
||||
// Close the connection regardless of the call response.
|
||||
c.Close()
|
||||
}
|
||||
|
||||
// Read locks we assume quorum for be N/2 success
|
||||
|
@ -19,6 +19,7 @@ package logger
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"go/build"
|
||||
"hash"
|
||||
@ -60,11 +61,6 @@ var globalDeploymentID string
|
||||
// TimeFormat - logging time format.
|
||||
const TimeFormat string = "15:04:05 MST 01/02/2006"
|
||||
|
||||
// List of error strings to be ignored by LogIf
|
||||
const (
|
||||
diskNotFoundError = "disk not found"
|
||||
)
|
||||
|
||||
var matchingFuncNames = [...]string{
|
||||
"http.HandlerFunc.ServeHTTP",
|
||||
"cmd.serverMain",
|
||||
@ -303,7 +299,7 @@ func LogIf(ctx context.Context, err error, errKind ...interface{}) {
|
||||
return
|
||||
}
|
||||
|
||||
if err.Error() != diskNotFoundError {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
logIf(ctx, err, errKind...)
|
||||
}
|
||||
}
|
||||
|
@ -74,11 +74,10 @@ type Client struct {
|
||||
// Should only be modified before any calls are made.
|
||||
MaxErrResponseSize int64
|
||||
|
||||
httpClient *http.Client
|
||||
httpIdleConnsCloser func()
|
||||
url *url.URL
|
||||
newAuthToken func(audience string) string
|
||||
connected int32
|
||||
httpClient *http.Client
|
||||
url *url.URL
|
||||
newAuthToken func(audience string) string
|
||||
connected int32
|
||||
}
|
||||
|
||||
// URL query separator constants
|
||||
@ -157,9 +156,6 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod
|
||||
// Close closes all idle connections of the underlying http client
|
||||
func (c *Client) Close() {
|
||||
atomic.StoreInt32(&c.connected, closed)
|
||||
if c.httpIdleConnsCloser != nil {
|
||||
c.httpIdleConnsCloser()
|
||||
}
|
||||
}
|
||||
|
||||
// NewClient - returns new REST client.
|
||||
@ -169,7 +165,6 @@ func NewClient(url *url.URL, newCustomTransport func() *http.Transport, newAuthT
|
||||
tr := newCustomTransport()
|
||||
return &Client{
|
||||
httpClient: &http.Client{Transport: tr},
|
||||
httpIdleConnsCloser: tr.CloseIdleConnections,
|
||||
url: url,
|
||||
newAuthToken: newAuthToken,
|
||||
connected: online,
|
||||
|
@ -190,7 +190,7 @@ func newAllSubsystems() {
|
||||
globalBucketTargetSys = NewBucketTargetSys()
|
||||
}
|
||||
|
||||
func initServer(ctx context.Context, newObject ObjectLayer) (err error) {
|
||||
func initServer(ctx context.Context, newObject ObjectLayer) error {
|
||||
// Create cancel context to control 'newRetryTimer' go routine.
|
||||
retryCtx, cancel := context.WithCancel(ctx)
|
||||
|
||||
@ -203,39 +203,6 @@ func initServer(ctx context.Context, newObject ObjectLayer) (err error) {
|
||||
// appropriately. This is also true for rotation of encrypted
|
||||
// content.
|
||||
txnLk := newObject.NewNSLock(retryCtx, minioMetaBucket, minioConfigPrefix+"/transaction.lock")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
var cerr config.Err
|
||||
// For any config error, we don't need to drop into safe-mode
|
||||
// instead its a user error and should be fixed by user.
|
||||
if errors.As(err, &cerr) {
|
||||
logger.FatalIf(err, "Unable to initialize the server")
|
||||
return
|
||||
}
|
||||
|
||||
// If context was canceled
|
||||
if errors.Is(err, context.Canceled) {
|
||||
logger.FatalIf(err, "Server startup canceled upon user request")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Prints the formatted startup message, if err is not nil then it prints additional information as well.
|
||||
printStartupMessage(getAPIEndpoints(), err)
|
||||
|
||||
if globalActiveCred.Equal(auth.DefaultCredentials) {
|
||||
msg := fmt.Sprintf("Detected default credentials '%s', please change the credentials immediately using 'MINIO_ACCESS_KEY' and 'MINIO_SECRET_KEY'", globalActiveCred)
|
||||
logger.StartupMessage(color.RedBold(msg))
|
||||
}
|
||||
|
||||
<-globalOSSignalCh
|
||||
}()
|
||||
|
||||
// Enable background operations for erasure coding
|
||||
if globalIsErasure {
|
||||
initAutoHeal(ctx, newObject)
|
||||
initBackgroundReplication(ctx, newObject)
|
||||
}
|
||||
|
||||
// allocate dynamic timeout once before the loop
|
||||
configLockTimeout := newDynamicTimeout(5*time.Second, 3*time.Second)
|
||||
@ -252,7 +219,9 @@ func initServer(ctx context.Context, newObject ObjectLayer) (err error) {
|
||||
// version is needed, migration is needed etc.
|
||||
rquorum := InsufficientReadQuorum{}
|
||||
wquorum := InsufficientWriteQuorum{}
|
||||
for range retry.NewTimerWithJitter(retryCtx, 250*time.Millisecond, 500*time.Millisecond, retry.MaxJitter) {
|
||||
|
||||
var err error
|
||||
for range retry.NewTimerWithJitter(retryCtx, 500*time.Millisecond, time.Second, retry.MaxJitter) {
|
||||
// let one of the server acquire the lock, if not let them timeout.
|
||||
// which shall be retried again by this loop.
|
||||
if err = txnLk.GetLock(configLockTimeout); err != nil {
|
||||
@ -389,7 +358,7 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
|
||||
|
||||
// serverMain handler called for 'minio server' command.
|
||||
func serverMain(ctx *cli.Context) {
|
||||
signal.Notify(globalOSSignalCh, os.Interrupt, syscall.SIGTERM)
|
||||
signal.Notify(globalOSSignalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
|
||||
|
||||
go handleSignals()
|
||||
|
||||
@ -509,10 +478,38 @@ func serverMain(ctx *cli.Context) {
|
||||
|
||||
go initDataCrawler(GlobalContext, newObject)
|
||||
|
||||
// Initialize users credentials and policies in background.
|
||||
go startBackgroundIAMLoad(GlobalContext, newObject)
|
||||
// Enable background operations for erasure coding
|
||||
if globalIsErasure {
|
||||
initAutoHeal(GlobalContext, newObject)
|
||||
initBackgroundReplication(GlobalContext, newObject)
|
||||
}
|
||||
|
||||
initServer(GlobalContext, newObject)
|
||||
if err = initServer(GlobalContext, newObject); err != nil {
|
||||
var cerr config.Err
|
||||
// For any config error, we don't need to drop into safe-mode
|
||||
// instead its a user error and should be fixed by user.
|
||||
if errors.As(err, &cerr) {
|
||||
logger.FatalIf(err, "Unable to initialize the server")
|
||||
}
|
||||
|
||||
// If context was canceled
|
||||
if errors.Is(err, context.Canceled) {
|
||||
logger.FatalIf(err, "Server startup canceled upon user request")
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize users credentials and policies in background right after config has initialized.
|
||||
go globalIAMSys.Init(GlobalContext, newObject)
|
||||
|
||||
// Prints the formatted startup message, if err is not nil then it prints additional information as well.
|
||||
printStartupMessage(getAPIEndpoints(), err)
|
||||
|
||||
if globalActiveCred.Equal(auth.DefaultCredentials) {
|
||||
msg := fmt.Sprintf("Detected default credentials '%s', please change the credentials immediately using 'MINIO_ACCESS_KEY' and 'MINIO_SECRET_KEY'", globalActiveCred)
|
||||
logger.StartupMessage(color.RedBold(msg))
|
||||
}
|
||||
|
||||
<-globalOSSignalCh
|
||||
}
|
||||
|
||||
// Initialize object layer with the supplied disks, objectLayer is nil upon any error.
|
||||
|
20
cmd/utils.go
20
cmd/utils.go
@ -469,8 +469,8 @@ func newInternodeHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration)
|
||||
tr := &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: xhttp.NewInternodeDialContext(dialTimeout),
|
||||
MaxIdleConnsPerHost: 16,
|
||||
IdleConnTimeout: 30 * time.Second,
|
||||
MaxIdleConnsPerHost: 1024,
|
||||
IdleConnTimeout: 15 * time.Second,
|
||||
ResponseHeaderTimeout: 3 * time.Minute, // Set conservative timeouts for MinIO internode.
|
||||
TLSHandshakeTimeout: 15 * time.Second,
|
||||
ExpectContinueTimeout: 15 * time.Second,
|
||||
@ -490,15 +490,16 @@ func newInternodeHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration)
|
||||
}
|
||||
}
|
||||
|
||||
func newCustomHTTP11Transport(tlsConfig *tls.Config, dialTimeout time.Duration) func() *http.Transport {
|
||||
// Used by only proxied requests, specifically only supports HTTP/1.1
|
||||
func newCustomHTTPProxyTransport(tlsConfig *tls.Config, dialTimeout time.Duration) func() *http.Transport {
|
||||
// For more details about various values used here refer
|
||||
// https://golang.org/pkg/net/http/#Transport documentation
|
||||
tr := &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: xhttp.NewCustomDialContext(dialTimeout),
|
||||
MaxIdleConnsPerHost: 16,
|
||||
IdleConnTimeout: 1 * time.Minute,
|
||||
ResponseHeaderTimeout: 3 * time.Minute, // Set conservative timeouts for MinIO internode.
|
||||
MaxIdleConnsPerHost: 1024,
|
||||
IdleConnTimeout: 15 * time.Second,
|
||||
ResponseHeaderTimeout: 30 * time.Minute, // Set larger timeouts for proxied requests.
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
ExpectContinueTimeout: 10 * time.Second,
|
||||
TLSClientConfig: tlsConfig,
|
||||
@ -519,8 +520,8 @@ func newCustomHTTPTransport(tlsConfig *tls.Config, dialTimeout time.Duration) fu
|
||||
tr := &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: xhttp.NewCustomDialContext(dialTimeout),
|
||||
MaxIdleConnsPerHost: 16,
|
||||
IdleConnTimeout: 1 * time.Minute,
|
||||
MaxIdleConnsPerHost: 1024,
|
||||
IdleConnTimeout: 15 * time.Second,
|
||||
ResponseHeaderTimeout: 3 * time.Minute, // Set conservative timeouts for MinIO internode.
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
ExpectContinueTimeout: 10 * time.Second,
|
||||
@ -553,9 +554,8 @@ func newGatewayHTTPTransport(timeout time.Duration) *http.Transport {
|
||||
RootCAs: globalRootCAs,
|
||||
}, defaultDialTimeout)()
|
||||
|
||||
// Allow more requests to be in flight.
|
||||
// Customize response header timeout for gateway transport.
|
||||
tr.ResponseHeaderTimeout = timeout
|
||||
tr.MaxIdleConnsPerHost = 16
|
||||
return tr
|
||||
}
|
||||
|
||||
|
@ -148,21 +148,24 @@ func IsNetworkOrHostDown(err error) bool {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return false
|
||||
}
|
||||
|
||||
// We need to figure if the error either a timeout
|
||||
// or a non-temporary error.
|
||||
e, ok := err.(net.Error)
|
||||
if ok {
|
||||
urlErr, ok := e.(*url.Error)
|
||||
if ok {
|
||||
if urlErr, ok := e.(*url.Error); ok {
|
||||
switch urlErr.Err.(type) {
|
||||
case *net.DNSError, *net.OpError, net.UnknownNetworkError:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
if e.Timeout() {
|
||||
return true
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
ok = false
|
||||
// Fallback to other mechanisms.
|
||||
if strings.Contains(err.Error(), "Connection closed by foreign host") {
|
||||
|
Loading…
x
Reference in New Issue
Block a user