mirror of
https://github.com/minio/minio.git
synced 2025-01-26 22:23:15 -05:00
Make DeadlineConn http.Listener compatible (#20635)
HTTP likes to slap an infinite read deadline on a connection and do a blocking read while the response is being written. This effectively means that a reading deadline becomes the request-response deadline. Instead of enforcing our timeout, we pass it through and keep "infinite deadline" is sticky on connections. However, we still "record" when reads are aborted, so we never overwrite that. The HTTP server should have `ReadTimeout` and `IdleTimeout` set for the deadline to be effective. Use --idle-timeout for incoming connections.
This commit is contained in:
parent
55f5c18fd9
commit
b5177993b3
@ -421,6 +421,7 @@ func serverHandleCmdArgs(ctxt serverCtxt) {
|
|||||||
Interface: ctxt.Interface,
|
Interface: ctxt.Interface,
|
||||||
SendBufSize: ctxt.SendBufSize,
|
SendBufSize: ctxt.SendBufSize,
|
||||||
RecvBufSize: ctxt.RecvBufSize,
|
RecvBufSize: ctxt.RecvBufSize,
|
||||||
|
IdleTimeout: ctxt.IdleTimeout,
|
||||||
}
|
}
|
||||||
|
|
||||||
// allow transport to be HTTP/1.1 for proxying.
|
// allow transport to be HTTP/1.1 for proxying.
|
||||||
@ -878,6 +879,8 @@ func serverMain(ctx *cli.Context) {
|
|||||||
UseHandler(setCriticalErrorHandler(corsHandler(handler))).
|
UseHandler(setCriticalErrorHandler(corsHandler(handler))).
|
||||||
UseTLSConfig(newTLSConfig(getCert)).
|
UseTLSConfig(newTLSConfig(getCert)).
|
||||||
UseIdleTimeout(globalServerCtxt.IdleTimeout).
|
UseIdleTimeout(globalServerCtxt.IdleTimeout).
|
||||||
|
UseReadTimeout(24 * time.Hour). // (overridden by listener.config.IdleTimeout on requests)
|
||||||
|
UseWriteTimeout(24 * time.Hour). // (overridden by listener.config.IdleTimeout on requests)
|
||||||
UseReadHeaderTimeout(globalServerCtxt.ReadHeaderTimeout).
|
UseReadHeaderTimeout(globalServerCtxt.ReadHeaderTimeout).
|
||||||
UseBaseContext(GlobalContext).
|
UseBaseContext(GlobalContext).
|
||||||
UseCustomLogger(log.New(io.Discard, "", 0)). // Turn-off random logging by Go stdlib
|
UseCustomLogger(log.New(io.Discard, "", 0)). // Turn-off random logging by Go stdlib
|
||||||
|
@ -37,13 +37,23 @@ type DeadlineConn struct {
|
|||||||
writeDeadline time.Duration // sets the write deadline on a connection.
|
writeDeadline time.Duration // sets the write deadline on a connection.
|
||||||
writeSetAt time.Time
|
writeSetAt time.Time
|
||||||
abortReads, abortWrites atomic.Bool // A deadline was set to indicate caller wanted the conn to time out.
|
abortReads, abortWrites atomic.Bool // A deadline was set to indicate caller wanted the conn to time out.
|
||||||
|
infReads, infWrites atomic.Bool
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unwrap will unwrap the connection and remove the deadline if applied.
|
||||||
|
// If not a *DeadlineConn, the unmodified net.Conn is returned.
|
||||||
|
func Unwrap(c net.Conn) net.Conn {
|
||||||
|
if dc, ok := c.(*DeadlineConn); ok {
|
||||||
|
return dc.Conn
|
||||||
|
}
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
// Sets read deadline
|
// Sets read deadline
|
||||||
func (c *DeadlineConn) setReadDeadline() {
|
func (c *DeadlineConn) setReadDeadline() {
|
||||||
// Do not set a Read deadline, if upstream wants to cancel all reads.
|
// Do not set a Read deadline, if upstream wants to cancel all reads.
|
||||||
if c.readDeadline <= 0 || c.abortReads.Load() {
|
if c.readDeadline <= 0 || c.abortReads.Load() || c.infReads.Load() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -62,7 +72,7 @@ func (c *DeadlineConn) setReadDeadline() {
|
|||||||
|
|
||||||
func (c *DeadlineConn) setWriteDeadline() {
|
func (c *DeadlineConn) setWriteDeadline() {
|
||||||
// Do not set a Write deadline, if upstream wants to cancel all reads.
|
// Do not set a Write deadline, if upstream wants to cancel all reads.
|
||||||
if c.writeDeadline <= 0 || c.abortWrites.Load() {
|
if c.writeDeadline <= 0 || c.abortWrites.Load() || c.infWrites.Load() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -103,31 +113,13 @@ func (c *DeadlineConn) Write(b []byte) (n int, err error) {
|
|||||||
func (c *DeadlineConn) SetDeadline(t time.Time) error {
|
func (c *DeadlineConn) SetDeadline(t time.Time) error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
if t.IsZero() {
|
|
||||||
var err error
|
|
||||||
if c.readDeadline == 0 {
|
|
||||||
err = c.Conn.SetReadDeadline(t)
|
|
||||||
}
|
|
||||||
if c.writeDeadline == 0 {
|
|
||||||
if wErr := c.Conn.SetWriteDeadline(t); wErr != nil {
|
|
||||||
return wErr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
c.abortReads.Store(false)
|
|
||||||
c.abortWrites.Store(false)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// If upstream sets a deadline in the past, assume it wants to abort reads/writes.
|
|
||||||
if time.Until(t) < 0 {
|
|
||||||
c.abortReads.Store(true)
|
|
||||||
c.abortWrites.Store(true)
|
|
||||||
return c.Conn.SetDeadline(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.abortReads.Store(false)
|
|
||||||
c.abortWrites.Store(false)
|
|
||||||
c.readSetAt = time.Now()
|
c.readSetAt = time.Now()
|
||||||
c.writeSetAt = time.Now()
|
c.writeSetAt = time.Now()
|
||||||
|
c.abortReads.Store(!t.IsZero() && time.Until(t) < 0)
|
||||||
|
c.abortWrites.Store(!t.IsZero() && time.Until(t) < 0)
|
||||||
|
c.infReads.Store(t.IsZero())
|
||||||
|
c.infWrites.Store(t.IsZero())
|
||||||
return c.Conn.SetDeadline(t)
|
return c.Conn.SetDeadline(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -135,15 +127,10 @@ func (c *DeadlineConn) SetDeadline(t time.Time) error {
|
|||||||
// and any currently-blocked Read call.
|
// and any currently-blocked Read call.
|
||||||
// A zero value for t means Read will not time out.
|
// A zero value for t means Read will not time out.
|
||||||
func (c *DeadlineConn) SetReadDeadline(t time.Time) error {
|
func (c *DeadlineConn) SetReadDeadline(t time.Time) error {
|
||||||
if t.IsZero() && c.readDeadline != 0 {
|
|
||||||
c.abortReads.Store(false)
|
|
||||||
// Keep the deadline we want.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
c.abortReads.Store(time.Until(t) < 0)
|
c.abortReads.Store(!t.IsZero() && time.Until(t) < 0)
|
||||||
|
c.infReads.Store(t.IsZero())
|
||||||
c.readSetAt = time.Now()
|
c.readSetAt = time.Now()
|
||||||
return c.Conn.SetReadDeadline(t)
|
return c.Conn.SetReadDeadline(t)
|
||||||
}
|
}
|
||||||
@ -154,14 +141,10 @@ func (c *DeadlineConn) SetReadDeadline(t time.Time) error {
|
|||||||
// some of the data was successfully written.
|
// some of the data was successfully written.
|
||||||
// A zero value for t means Write will not time out.
|
// A zero value for t means Write will not time out.
|
||||||
func (c *DeadlineConn) SetWriteDeadline(t time.Time) error {
|
func (c *DeadlineConn) SetWriteDeadline(t time.Time) error {
|
||||||
if t.IsZero() && c.writeDeadline != 0 {
|
|
||||||
c.abortWrites.Store(false)
|
|
||||||
// Keep the deadline we want.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
c.abortWrites.Store(time.Until(t) < 0)
|
c.abortWrites.Store(!t.IsZero() && time.Until(t) < 0)
|
||||||
|
c.infWrites.Store(t.IsZero())
|
||||||
c.writeSetAt = time.Now()
|
c.writeSetAt = time.Now()
|
||||||
return c.Conn.SetWriteDeadline(t)
|
return c.Conn.SetWriteDeadline(t)
|
||||||
}
|
}
|
||||||
|
@ -32,6 +32,7 @@ import (
|
|||||||
"github.com/gobwas/ws/wsutil"
|
"github.com/gobwas/ws/wsutil"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/minio/madmin-go/v3"
|
"github.com/minio/madmin-go/v3"
|
||||||
|
"github.com/minio/minio/internal/deadlineconn"
|
||||||
"github.com/minio/minio/internal/pubsub"
|
"github.com/minio/minio/internal/pubsub"
|
||||||
"github.com/minio/mux"
|
"github.com/minio/mux"
|
||||||
)
|
)
|
||||||
@ -188,6 +189,8 @@ func (m *Manager) Handler(authReq func(r *http.Request) error) http.HandlerFunc
|
|||||||
// This should be called with the incoming connection after accept.
|
// This should be called with the incoming connection after accept.
|
||||||
// Auth is handled internally, as well as disconnecting any connections from the same host.
|
// Auth is handled internally, as well as disconnecting any connections from the same host.
|
||||||
func (m *Manager) IncomingConn(ctx context.Context, conn net.Conn) {
|
func (m *Manager) IncomingConn(ctx context.Context, conn net.Conn) {
|
||||||
|
// We manage our own deadlines.
|
||||||
|
conn = deadlineconn.Unwrap(conn)
|
||||||
remoteAddr := conn.RemoteAddr().String()
|
remoteAddr := conn.RemoteAddr().String()
|
||||||
// will write an OpConnectResponse message to the remote and log it once locally.
|
// will write an OpConnectResponse message to the remote and log it once locally.
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
@ -23,6 +23,8 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/minio/minio/internal/deadlineconn"
|
||||||
)
|
)
|
||||||
|
|
||||||
type acceptResult struct {
|
type acceptResult struct {
|
||||||
@ -73,7 +75,7 @@ func (listener *httpListener) Accept() (conn net.Conn, err error) {
|
|||||||
select {
|
select {
|
||||||
case result, ok := <-listener.acceptCh:
|
case result, ok := <-listener.acceptCh:
|
||||||
if ok {
|
if ok {
|
||||||
return result.conn, result.err
|
return deadlineconn.New(result.conn).WithReadDeadline(listener.opts.IdleTimeout).WithWriteDeadline(listener.opts.IdleTimeout), result.err
|
||||||
}
|
}
|
||||||
case <-listener.ctx.Done():
|
case <-listener.ctx.Done():
|
||||||
}
|
}
|
||||||
@ -128,6 +130,7 @@ type TCPOptions struct {
|
|||||||
NoDelay bool // Indicates callers to enable TCP_NODELAY on the net.Conn
|
NoDelay bool // Indicates callers to enable TCP_NODELAY on the net.Conn
|
||||||
Interface string // This is a VRF device passed via `--interface` flag
|
Interface string // This is a VRF device passed via `--interface` flag
|
||||||
Trace func(msg string) // Trace when starting.
|
Trace func(msg string) // Trace when starting.
|
||||||
|
IdleTimeout time.Duration // Incoming TCP read/write timeout
|
||||||
}
|
}
|
||||||
|
|
||||||
// ForWebsocket returns TCPOptions valid for websocket net.Conn
|
// ForWebsocket returns TCPOptions valid for websocket net.Conn
|
||||||
|
@ -167,12 +167,24 @@ func (srv *Server) UseIdleTimeout(d time.Duration) *Server {
|
|||||||
return srv
|
return srv
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UseReadTimeout configure connection request read timeout.
|
||||||
|
func (srv *Server) UseReadTimeout(d time.Duration) *Server {
|
||||||
|
srv.ReadTimeout = d
|
||||||
|
return srv
|
||||||
|
}
|
||||||
|
|
||||||
// UseReadHeaderTimeout configure read header timeout
|
// UseReadHeaderTimeout configure read header timeout
|
||||||
func (srv *Server) UseReadHeaderTimeout(d time.Duration) *Server {
|
func (srv *Server) UseReadHeaderTimeout(d time.Duration) *Server {
|
||||||
srv.ReadHeaderTimeout = d
|
srv.ReadHeaderTimeout = d
|
||||||
return srv
|
return srv
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UseWriteTimeout configure connection response write timeout.
|
||||||
|
func (srv *Server) UseWriteTimeout(d time.Duration) *Server {
|
||||||
|
srv.WriteTimeout = d
|
||||||
|
return srv
|
||||||
|
}
|
||||||
|
|
||||||
// UseHandler configure final handler for this HTTP *Server
|
// UseHandler configure final handler for this HTTP *Server
|
||||||
func (srv *Server) UseHandler(h http.Handler) *Server {
|
func (srv *Server) UseHandler(h http.Handler) *Server {
|
||||||
srv.Handler = h
|
srv.Handler = h
|
||||||
|
Loading…
x
Reference in New Issue
Block a user