mirror of
https://github.com/minio/minio.git
synced 2024-12-24 22:25:54 -05:00
debug: introduce support for configuring client connect WRITE deadline (#19170)
just like client-conn-read-deadline, added a new flag that does client-conn-write-deadline as well. Both are not configured by default, since we do not yet know what is the right value. Allow this to be configurable if needed.
This commit is contained in:
parent
c599c11e70
commit
2c2f5d871c
@ -393,6 +393,7 @@ func buildServerCtxt(ctx *cli.Context, ctxt *serverCtxt) (err error) {
|
|||||||
ctxt.ConnReadDeadline = ctx.Duration("conn-read-deadline")
|
ctxt.ConnReadDeadline = ctx.Duration("conn-read-deadline")
|
||||||
ctxt.ConnWriteDeadline = ctx.Duration("conn-write-deadline")
|
ctxt.ConnWriteDeadline = ctx.Duration("conn-write-deadline")
|
||||||
ctxt.ConnClientReadDeadline = ctx.Duration("conn-client-read-deadline")
|
ctxt.ConnClientReadDeadline = ctx.Duration("conn-client-read-deadline")
|
||||||
|
ctxt.ConnClientWriteDeadline = ctx.Duration("conn-client-write-deadline")
|
||||||
|
|
||||||
ctxt.ShutdownTimeout = ctx.Duration("shutdown-timeout")
|
ctxt.ShutdownTimeout = ctx.Duration("shutdown-timeout")
|
||||||
ctxt.IdleTimeout = ctx.Duration("idle-timeout")
|
ctxt.IdleTimeout = ctx.Duration("idle-timeout")
|
||||||
|
@ -160,10 +160,11 @@ type serverCtxt struct {
|
|||||||
FTP []string
|
FTP []string
|
||||||
SFTP []string
|
SFTP []string
|
||||||
|
|
||||||
UserTimeout time.Duration
|
UserTimeout time.Duration
|
||||||
ConnReadDeadline time.Duration
|
ConnReadDeadline time.Duration
|
||||||
ConnWriteDeadline time.Duration
|
ConnWriteDeadline time.Duration
|
||||||
ConnClientReadDeadline time.Duration
|
ConnClientReadDeadline time.Duration
|
||||||
|
ConnClientWriteDeadline time.Duration
|
||||||
|
|
||||||
ShutdownTimeout time.Duration
|
ShutdownTimeout time.Duration
|
||||||
IdleTimeout time.Duration
|
IdleTimeout time.Duration
|
||||||
|
@ -296,27 +296,23 @@ func collectAPIStats(api string, f http.HandlerFunc) http.HandlerFunc {
|
|||||||
|
|
||||||
bucket, _ := path2BucketObject(resource)
|
bucket, _ := path2BucketObject(resource)
|
||||||
|
|
||||||
globalHTTPStats.currentS3Requests.Inc(api)
|
|
||||||
defer globalHTTPStats.currentS3Requests.Dec(api)
|
|
||||||
|
|
||||||
_, err = globalBucketMetadataSys.Get(bucket) // check if this bucket exists.
|
_, err = globalBucketMetadataSys.Get(bucket) // check if this bucket exists.
|
||||||
if bucket != "" && bucket != minioReservedBucket && err == nil {
|
countBktStat := bucket != "" && bucket != minioReservedBucket && err == nil
|
||||||
|
if countBktStat {
|
||||||
globalBucketHTTPStats.updateHTTPStats(bucket, api, nil)
|
globalBucketHTTPStats.updateHTTPStats(bucket, api, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
globalHTTPStats.currentS3Requests.Inc(api)
|
||||||
f.ServeHTTP(w, r)
|
f.ServeHTTP(w, r)
|
||||||
|
globalHTTPStats.currentS3Requests.Dec(api)
|
||||||
|
|
||||||
tc, ok := r.Context().Value(mcontext.ContextTraceKey).(*mcontext.TraceCtxt)
|
tc, _ := r.Context().Value(mcontext.ContextTraceKey).(*mcontext.TraceCtxt)
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if tc != nil {
|
if tc != nil {
|
||||||
globalHTTPStats.updateStats(api, tc.ResponseRecorder)
|
globalHTTPStats.updateStats(api, tc.ResponseRecorder)
|
||||||
globalConnStats.incS3InputBytes(int64(tc.RequestRecorder.Size()))
|
globalConnStats.incS3InputBytes(int64(tc.RequestRecorder.Size()))
|
||||||
globalConnStats.incS3OutputBytes(int64(tc.ResponseRecorder.Size()))
|
globalConnStats.incS3OutputBytes(int64(tc.ResponseRecorder.Size()))
|
||||||
|
|
||||||
if bucket != "" && bucket != minioReservedBucket && err == nil {
|
if countBktStat {
|
||||||
globalBucketConnStats.incS3InputBytes(bucket, int64(tc.RequestRecorder.Size()))
|
globalBucketConnStats.incS3InputBytes(bucket, int64(tc.RequestRecorder.Size()))
|
||||||
globalBucketConnStats.incS3OutputBytes(bucket, int64(tc.ResponseRecorder.Size()))
|
globalBucketConnStats.incS3OutputBytes(bucket, int64(tc.ResponseRecorder.Size()))
|
||||||
globalBucketHTTPStats.updateHTTPStats(bucket, api, tc.ResponseRecorder)
|
globalBucketHTTPStats.updateHTTPStats(bucket, api, tc.ResponseRecorder)
|
||||||
|
@ -106,6 +106,12 @@ var ServerFlags = []cli.Flag{
|
|||||||
Hidden: true,
|
Hidden: true,
|
||||||
EnvVar: "MINIO_CONN_CLIENT_READ_DEADLINE",
|
EnvVar: "MINIO_CONN_CLIENT_READ_DEADLINE",
|
||||||
},
|
},
|
||||||
|
cli.DurationFlag{
|
||||||
|
Name: "conn-client-write-deadline",
|
||||||
|
Usage: "custom connection WRITE deadline for outgoing requests",
|
||||||
|
Hidden: true,
|
||||||
|
EnvVar: "MINIO_CONN_CLIENT_WRITE_DEADLINE",
|
||||||
|
},
|
||||||
cli.DurationFlag{
|
cli.DurationFlag{
|
||||||
Name: "conn-read-deadline",
|
Name: "conn-read-deadline",
|
||||||
Usage: "custom connection READ deadline",
|
Usage: "custom connection READ deadline",
|
||||||
@ -356,9 +362,10 @@ func serverHandleCmdArgs(ctxt serverCtxt) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
globalTCPOptions = xhttp.TCPOptions{
|
globalTCPOptions = xhttp.TCPOptions{
|
||||||
UserTimeout: int(ctxt.UserTimeout.Milliseconds()),
|
UserTimeout: int(ctxt.UserTimeout.Milliseconds()),
|
||||||
ClientReadTimeout: ctxt.ConnClientReadDeadline,
|
ClientReadTimeout: ctxt.ConnClientReadDeadline,
|
||||||
Interface: ctxt.Interface,
|
ClientWriteTimeout: ctxt.ConnClientWriteDeadline,
|
||||||
|
Interface: ctxt.Interface,
|
||||||
}
|
}
|
||||||
|
|
||||||
// On macOS, if a process already listens on LOCALIPADDR:PORT, net.Listen() falls back
|
// On macOS, if a process already listens on LOCALIPADDR:PORT, net.Listen() falls back
|
||||||
|
@ -70,16 +70,16 @@ type Cache[T any] struct {
|
|||||||
updating sync.Mutex
|
updating sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// New allocates a new cached value instance. It must be initialized with
|
// New allocates a new cached value instance. Tt must be initialized with
|
||||||
// `.InitOnce`.
|
// `.TnitOnce`.
|
||||||
func New[I any]() *Cache[I] {
|
func New[T any]() *Cache[T] {
|
||||||
return &Cache[I]{}
|
return &Cache[T]{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFromFunc allocates a new cached value instance and initializes it with an
|
// NewFromFunc allocates a new cached value instance and initializes it with an
|
||||||
// update function, making it ready for use.
|
// update function, making it ready for use.
|
||||||
func NewFromFunc[I any](ttl time.Duration, opts Opts, update func() (I, error)) *Cache[I] {
|
func NewFromFunc[T any](ttl time.Duration, opts Opts, update func() (T, error)) *Cache[T] {
|
||||||
return &Cache[I]{
|
return &Cache[T]{
|
||||||
ttl: ttl,
|
ttl: ttl,
|
||||||
updateFn: update,
|
updateFn: update,
|
||||||
opts: opts,
|
opts: opts,
|
||||||
@ -88,7 +88,7 @@ func NewFromFunc[I any](ttl time.Duration, opts Opts, update func() (I, error))
|
|||||||
|
|
||||||
// InitOnce initializes the cache with a TTL and an update function. It is
|
// InitOnce initializes the cache with a TTL and an update function. It is
|
||||||
// guaranteed to be called only once.
|
// guaranteed to be called only once.
|
||||||
func (t *Cache[I]) InitOnce(ttl time.Duration, opts Opts, update func() (I, error)) {
|
func (t *Cache[T]) InitOnce(ttl time.Duration, opts Opts, update func() (T, error)) {
|
||||||
t.Once.Do(func() {
|
t.Once.Do(func() {
|
||||||
t.ttl = ttl
|
t.ttl = ttl
|
||||||
t.updateFn = update
|
t.updateFn = update
|
||||||
@ -97,8 +97,8 @@ func (t *Cache[I]) InitOnce(ttl time.Duration, opts Opts, update func() (I, erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get will return a cached value or fetch a new one.
|
// Get will return a cached value or fetch a new one.
|
||||||
// If the Update function returns an error the value is forwarded as is and not cached.
|
// Tf the Update function returns an error the value is forwarded as is and not cached.
|
||||||
func (t *Cache[I]) Get() (I, error) {
|
func (t *Cache[T]) Get() (T, error) {
|
||||||
v := t.valErr.Load()
|
v := t.valErr.Load()
|
||||||
ttl := t.ttl
|
ttl := t.ttl
|
||||||
vTime := t.lastUpdateMs.Load()
|
vTime := t.lastUpdateMs.Load()
|
||||||
|
@ -79,7 +79,8 @@ func (listener *httpListener) Accept() (conn net.Conn, err error) {
|
|||||||
case result, ok := <-listener.acceptCh:
|
case result, ok := <-listener.acceptCh:
|
||||||
if ok {
|
if ok {
|
||||||
return deadlineconn.New(result.conn).
|
return deadlineconn.New(result.conn).
|
||||||
WithReadDeadline(listener.opts.ClientReadTimeout), result.err
|
WithReadDeadline(listener.opts.ClientReadTimeout).
|
||||||
|
WithWriteDeadline(listener.opts.ClientWriteTimeout), result.err
|
||||||
}
|
}
|
||||||
case <-listener.ctx.Done():
|
case <-listener.ctx.Done():
|
||||||
}
|
}
|
||||||
@ -124,10 +125,11 @@ func (listener *httpListener) Addrs() (addrs []net.Addr) {
|
|||||||
|
|
||||||
// TCPOptions specify customizable TCP optimizations on raw socket
|
// TCPOptions specify customizable TCP optimizations on raw socket
|
||||||
type TCPOptions struct {
|
type TCPOptions struct {
|
||||||
UserTimeout int // this value is expected to be in milliseconds
|
UserTimeout int // this value is expected to be in milliseconds
|
||||||
ClientReadTimeout time.Duration // When the net.Conn is idle for more than ReadTimeout duration, we close the connection on the client proactively.
|
ClientReadTimeout time.Duration // When the net.Conn is idle for more than ReadTimeout duration, we close the connection on the client proactively.
|
||||||
Interface string // this is a VRF device passed via `--interface` flag
|
ClientWriteTimeout time.Duration // When the net.Conn is idle for more than WriteTimeout duration, we close the connection on the client proactively.
|
||||||
Trace func(msg string) // Trace when starting.
|
Interface string // this is a VRF device passed via `--interface` flag
|
||||||
|
Trace func(msg string) // Trace when starting.
|
||||||
}
|
}
|
||||||
|
|
||||||
// newHTTPListener - creates new httpListener object which is interface compatible to net.Listener.
|
// newHTTPListener - creates new httpListener object which is interface compatible to net.Listener.
|
||||||
|
Loading…
Reference in New Issue
Block a user