mirror of
https://github.com/minio/minio.git
synced 2025-05-22 18:11:50 -04:00
Harden internode DeadlineConn (#20631)
Since DeadlineConn would send deadline updates directly upstream, it would race with Read/Write operations. The stdlib will perform a read, but do an async SetReadDeadLine(unix(1)) to cancel the Read in `abortPendingRead`. In this case, the Read may override the deadline intended to cancel the read. Stop updating deadlines if a deadline in the past is seen and when Close is called. A mutex now protects all upstream deadline calls to avoid races. This should fix the short-term buildup of... ``` 365 @ 0x44112e 0x4756b9 0x475699 0x483525 0x732286 0x737407 0x73816b 0x479601 # 0x475698 sync.runtime_notifyListWait+0x138 runtime/sema.go:569 # 0x483524 sync.(*Cond).Wait+0x84 sync/cond.go:70 # 0x732285 net/http.(*connReader).abortPendingRead+0xa5 net/http/server.go:729 # 0x737406 net/http.(*response).finishRequest+0x86 net/http/server.go:1676 # 0x73816a net/http.(*conn).serve+0x62a net/http/server.go:2050 ``` AFAICT Only affects internode calls that create a connection (non-grid).
This commit is contained in:
parent
8ce101c174
commit
55f5c18fd9
@ -19,44 +19,70 @@
|
|||||||
package deadlineconn
|
package deadlineconn
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"net"
|
"net"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// updateInterval is the minimum time between deadline updates.
|
||||||
const updateInterval = 250 * time.Millisecond
|
const updateInterval = 250 * time.Millisecond
|
||||||
|
|
||||||
// DeadlineConn - is a generic stream-oriented network connection supporting buffered reader and read/write timeout.
|
// DeadlineConn - is a generic stream-oriented network connection supporting buffered reader and read/write timeout.
|
||||||
type DeadlineConn struct {
|
type DeadlineConn struct {
|
||||||
net.Conn
|
net.Conn
|
||||||
readDeadline time.Duration // sets the read deadline on a connection.
|
readDeadline time.Duration // sets the read deadline on a connection.
|
||||||
readSetAt time.Time
|
readSetAt time.Time
|
||||||
writeDeadline time.Duration // sets the write deadline on a connection.
|
writeDeadline time.Duration // sets the write deadline on a connection.
|
||||||
writeSetAt time.Time
|
writeSetAt time.Time
|
||||||
|
abortReads, abortWrites atomic.Bool // A deadline was set to indicate caller wanted the conn to time out.
|
||||||
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sets read deadline
|
// Sets read deadline
|
||||||
func (c *DeadlineConn) setReadDeadline() {
|
func (c *DeadlineConn) setReadDeadline() {
|
||||||
if c.readDeadline > 0 {
|
// Do not set a Read deadline, if upstream wants to cancel all reads.
|
||||||
now := time.Now()
|
if c.readDeadline <= 0 || c.abortReads.Load() {
|
||||||
if now.Sub(c.readSetAt) > updateInterval {
|
return
|
||||||
c.Conn.SetReadDeadline(now.Add(c.readDeadline + updateInterval))
|
}
|
||||||
c.readSetAt = now
|
|
||||||
}
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
if c.abortReads.Load() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
if now.Sub(c.readSetAt) > updateInterval {
|
||||||
|
c.Conn.SetReadDeadline(now.Add(c.readDeadline + updateInterval))
|
||||||
|
c.readSetAt = now
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *DeadlineConn) setWriteDeadline() {
|
func (c *DeadlineConn) setWriteDeadline() {
|
||||||
if c.writeDeadline > 0 {
|
// Do not set a Write deadline, if upstream wants to cancel all reads.
|
||||||
now := time.Now()
|
if c.writeDeadline <= 0 || c.abortWrites.Load() {
|
||||||
if now.Sub(c.writeSetAt) > updateInterval {
|
return
|
||||||
c.Conn.SetWriteDeadline(now.Add(c.writeDeadline + updateInterval))
|
}
|
||||||
c.writeSetAt = now
|
|
||||||
}
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
if c.abortWrites.Load() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
now := time.Now()
|
||||||
|
if now.Sub(c.writeSetAt) > updateInterval {
|
||||||
|
c.Conn.SetWriteDeadline(now.Add(c.writeDeadline + updateInterval))
|
||||||
|
c.writeSetAt = now
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read - reads data from the connection using wrapped buffered reader.
|
// Read - reads data from the connection using wrapped buffered reader.
|
||||||
func (c *DeadlineConn) Read(b []byte) (n int, err error) {
|
func (c *DeadlineConn) Read(b []byte) (n int, err error) {
|
||||||
|
if c.abortReads.Load() {
|
||||||
|
return 0, context.DeadlineExceeded
|
||||||
|
}
|
||||||
c.setReadDeadline()
|
c.setReadDeadline()
|
||||||
n, err = c.Conn.Read(b)
|
n, err = c.Conn.Read(b)
|
||||||
return n, err
|
return n, err
|
||||||
@ -64,11 +90,89 @@ func (c *DeadlineConn) Read(b []byte) (n int, err error) {
|
|||||||
|
|
||||||
// Write - writes data to the connection.
|
// Write - writes data to the connection.
|
||||||
func (c *DeadlineConn) Write(b []byte) (n int, err error) {
|
func (c *DeadlineConn) Write(b []byte) (n int, err error) {
|
||||||
|
if c.abortWrites.Load() {
|
||||||
|
return 0, context.DeadlineExceeded
|
||||||
|
}
|
||||||
c.setWriteDeadline()
|
c.setWriteDeadline()
|
||||||
n, err = c.Conn.Write(b)
|
n, err = c.Conn.Write(b)
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetDeadline will set the deadline for reads and writes.
|
||||||
|
// A zero value for t means I/O operations will not time out.
|
||||||
|
func (c *DeadlineConn) SetDeadline(t time.Time) error {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
if t.IsZero() {
|
||||||
|
var err error
|
||||||
|
if c.readDeadline == 0 {
|
||||||
|
err = c.Conn.SetReadDeadline(t)
|
||||||
|
}
|
||||||
|
if c.writeDeadline == 0 {
|
||||||
|
if wErr := c.Conn.SetWriteDeadline(t); wErr != nil {
|
||||||
|
return wErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.abortReads.Store(false)
|
||||||
|
c.abortWrites.Store(false)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// If upstream sets a deadline in the past, assume it wants to abort reads/writes.
|
||||||
|
if time.Until(t) < 0 {
|
||||||
|
c.abortReads.Store(true)
|
||||||
|
c.abortWrites.Store(true)
|
||||||
|
return c.Conn.SetDeadline(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.abortReads.Store(false)
|
||||||
|
c.abortWrites.Store(false)
|
||||||
|
c.readSetAt = time.Now()
|
||||||
|
c.writeSetAt = time.Now()
|
||||||
|
return c.Conn.SetDeadline(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetReadDeadline sets the deadline for future Read calls
|
||||||
|
// and any currently-blocked Read call.
|
||||||
|
// A zero value for t means Read will not time out.
|
||||||
|
func (c *DeadlineConn) SetReadDeadline(t time.Time) error {
|
||||||
|
if t.IsZero() && c.readDeadline != 0 {
|
||||||
|
c.abortReads.Store(false)
|
||||||
|
// Keep the deadline we want.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
c.abortReads.Store(time.Until(t) < 0)
|
||||||
|
c.readSetAt = time.Now()
|
||||||
|
return c.Conn.SetReadDeadline(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetWriteDeadline sets the deadline for future Write calls
|
||||||
|
// and any currently-blocked Write call.
|
||||||
|
// Even if write times out, it may return n > 0, indicating that
|
||||||
|
// some of the data was successfully written.
|
||||||
|
// A zero value for t means Write will not time out.
|
||||||
|
func (c *DeadlineConn) SetWriteDeadline(t time.Time) error {
|
||||||
|
if t.IsZero() && c.writeDeadline != 0 {
|
||||||
|
c.abortWrites.Store(false)
|
||||||
|
// Keep the deadline we want.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
c.abortWrites.Store(time.Until(t) < 0)
|
||||||
|
c.writeSetAt = time.Now()
|
||||||
|
return c.Conn.SetWriteDeadline(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close wraps conn.Close and stops sending deadline updates.
|
||||||
|
func (c *DeadlineConn) Close() error {
|
||||||
|
c.abortReads.Store(true)
|
||||||
|
c.abortWrites.Store(true)
|
||||||
|
return c.Conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
// WithReadDeadline sets a new read side net.Conn deadline.
|
// WithReadDeadline sets a new read side net.Conn deadline.
|
||||||
func (c *DeadlineConn) WithReadDeadline(d time.Duration) *DeadlineConn {
|
func (c *DeadlineConn) WithReadDeadline(d time.Duration) *DeadlineConn {
|
||||||
c.readDeadline = d
|
c.readDeadline = d
|
||||||
|
@ -19,6 +19,7 @@ package deadlineconn
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
@ -115,3 +116,77 @@ func TestBuffConnReadTimeout(t *testing.T) {
|
|||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test deadlineconn handles read timeout properly by reading two messages beyond deadline.
|
||||||
|
func TestBuffConnReadCheckTimeout(t *testing.T) {
|
||||||
|
l, err := net.Listen("tcp", "localhost:0")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create listener. %v", err)
|
||||||
|
}
|
||||||
|
defer l.Close()
|
||||||
|
serverAddr := l.Addr().String()
|
||||||
|
|
||||||
|
tcpListener, ok := l.(*net.TCPListener)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("failed to assert to net.TCPListener")
|
||||||
|
}
|
||||||
|
var cerr error
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
tcpConn, terr := tcpListener.AcceptTCP()
|
||||||
|
if terr != nil {
|
||||||
|
cerr = fmt.Errorf("failed to accept new connection. %v", terr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
deadlineconn := New(tcpConn)
|
||||||
|
deadlineconn.WithReadDeadline(time.Second)
|
||||||
|
deadlineconn.WithWriteDeadline(time.Second)
|
||||||
|
defer deadlineconn.Close()
|
||||||
|
|
||||||
|
// Read a line
|
||||||
|
b := make([]byte, 12)
|
||||||
|
_, terr = deadlineconn.Read(b)
|
||||||
|
if terr != nil {
|
||||||
|
cerr = fmt.Errorf("failed to read from client. %v", terr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
received := string(b)
|
||||||
|
if received != "message one\n" {
|
||||||
|
cerr = fmt.Errorf(`server: expected: "message one\n", got: %v`, received)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set a deadline in the past to indicate we want the next read to fail.
|
||||||
|
// Ensure we don't override it on read.
|
||||||
|
deadlineconn.SetReadDeadline(time.Unix(1, 0))
|
||||||
|
|
||||||
|
// Be sure to exceed update interval
|
||||||
|
time.Sleep(updateInterval * 2)
|
||||||
|
|
||||||
|
_, terr = deadlineconn.Read(b)
|
||||||
|
if terr == nil {
|
||||||
|
cerr = fmt.Errorf("could read from client, expected error, got %v", terr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
c, err := net.Dial("tcp", serverAddr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to connect to server. %v", err)
|
||||||
|
}
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
_, err = io.WriteString(c, "message one\n")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to write to server. %v", err)
|
||||||
|
}
|
||||||
|
_, _ = io.WriteString(c, "message two\n")
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
if cerr != nil {
|
||||||
|
t.Fatal(cerr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user