From 55f5c18fd97d7cd1484407ed1fb91808137ac6e6 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Mon, 11 Nov 2024 09:15:17 -0800 Subject: [PATCH] Harden internode DeadlineConn (#20631) Since DeadlineConn would send deadline updates directly upstream, it would race with Read/Write operations. The stdlib will perform a read, but do an async SetReadDeadLine(unix(1)) to cancel the Read in `abortPendingRead`. In this case, the Read may override the deadline intended to cancel the read. Stop updating deadlines if a deadline in the past is seen and when Close is called. A mutex now protects all upstream deadline calls to avoid races. This should fix the short-term buildup of... ``` 365 @ 0x44112e 0x4756b9 0x475699 0x483525 0x732286 0x737407 0x73816b 0x479601 # 0x475698 sync.runtime_notifyListWait+0x138 runtime/sema.go:569 # 0x483524 sync.(*Cond).Wait+0x84 sync/cond.go:70 # 0x732285 net/http.(*connReader).abortPendingRead+0xa5 net/http/server.go:729 # 0x737406 net/http.(*response).finishRequest+0x86 net/http/server.go:1676 # 0x73816a net/http.(*conn).serve+0x62a net/http/server.go:2050 ``` AFAICT Only affects internode calls that create a connection (non-grid). --- internal/deadlineconn/deadlineconn.go | 136 ++++++++++++++++++--- internal/deadlineconn/deadlineconn_test.go | 75 ++++++++++++ 2 files changed, 195 insertions(+), 16 deletions(-) diff --git a/internal/deadlineconn/deadlineconn.go b/internal/deadlineconn/deadlineconn.go index a2cd0038a..9665f2c79 100644 --- a/internal/deadlineconn/deadlineconn.go +++ b/internal/deadlineconn/deadlineconn.go @@ -19,44 +19,70 @@ package deadlineconn import ( + "context" "net" + "sync" + "sync/atomic" "time" ) +// updateInterval is the minimum time between deadline updates. const updateInterval = 250 * time.Millisecond // DeadlineConn - is a generic stream-oriented network connection supporting buffered reader and read/write timeout. type DeadlineConn struct { net.Conn - readDeadline time.Duration // sets the read deadline on a connection. - readSetAt time.Time - writeDeadline time.Duration // sets the write deadline on a connection. - writeSetAt time.Time + readDeadline time.Duration // sets the read deadline on a connection. + readSetAt time.Time + writeDeadline time.Duration // sets the write deadline on a connection. + writeSetAt time.Time + abortReads, abortWrites atomic.Bool // A deadline was set to indicate caller wanted the conn to time out. + mu sync.Mutex } // Sets read deadline func (c *DeadlineConn) setReadDeadline() { - if c.readDeadline > 0 { - now := time.Now() - if now.Sub(c.readSetAt) > updateInterval { - c.Conn.SetReadDeadline(now.Add(c.readDeadline + updateInterval)) - c.readSetAt = now - } + // Do not set a Read deadline, if upstream wants to cancel all reads. + if c.readDeadline <= 0 || c.abortReads.Load() { + return + } + + c.mu.Lock() + defer c.mu.Unlock() + if c.abortReads.Load() { + return + } + + now := time.Now() + if now.Sub(c.readSetAt) > updateInterval { + c.Conn.SetReadDeadline(now.Add(c.readDeadline + updateInterval)) + c.readSetAt = now } } func (c *DeadlineConn) setWriteDeadline() { - if c.writeDeadline > 0 { - now := time.Now() - if now.Sub(c.writeSetAt) > updateInterval { - c.Conn.SetWriteDeadline(now.Add(c.writeDeadline + updateInterval)) - c.writeSetAt = now - } + // Do not set a Write deadline, if upstream wants to cancel all reads. + if c.writeDeadline <= 0 || c.abortWrites.Load() { + return + } + + c.mu.Lock() + defer c.mu.Unlock() + if c.abortWrites.Load() { + return + } + now := time.Now() + if now.Sub(c.writeSetAt) > updateInterval { + c.Conn.SetWriteDeadline(now.Add(c.writeDeadline + updateInterval)) + c.writeSetAt = now } } // Read - reads data from the connection using wrapped buffered reader. func (c *DeadlineConn) Read(b []byte) (n int, err error) { + if c.abortReads.Load() { + return 0, context.DeadlineExceeded + } c.setReadDeadline() n, err = c.Conn.Read(b) return n, err @@ -64,11 +90,89 @@ func (c *DeadlineConn) Read(b []byte) (n int, err error) { // Write - writes data to the connection. func (c *DeadlineConn) Write(b []byte) (n int, err error) { + if c.abortWrites.Load() { + return 0, context.DeadlineExceeded + } c.setWriteDeadline() n, err = c.Conn.Write(b) return n, err } +// SetDeadline will set the deadline for reads and writes. +// A zero value for t means I/O operations will not time out. +func (c *DeadlineConn) SetDeadline(t time.Time) error { + c.mu.Lock() + defer c.mu.Unlock() + if t.IsZero() { + var err error + if c.readDeadline == 0 { + err = c.Conn.SetReadDeadline(t) + } + if c.writeDeadline == 0 { + if wErr := c.Conn.SetWriteDeadline(t); wErr != nil { + return wErr + } + } + c.abortReads.Store(false) + c.abortWrites.Store(false) + return err + } + // If upstream sets a deadline in the past, assume it wants to abort reads/writes. + if time.Until(t) < 0 { + c.abortReads.Store(true) + c.abortWrites.Store(true) + return c.Conn.SetDeadline(t) + } + + c.abortReads.Store(false) + c.abortWrites.Store(false) + c.readSetAt = time.Now() + c.writeSetAt = time.Now() + return c.Conn.SetDeadline(t) +} + +// SetReadDeadline sets the deadline for future Read calls +// and any currently-blocked Read call. +// A zero value for t means Read will not time out. +func (c *DeadlineConn) SetReadDeadline(t time.Time) error { + if t.IsZero() && c.readDeadline != 0 { + c.abortReads.Store(false) + // Keep the deadline we want. + return nil + } + + c.mu.Lock() + defer c.mu.Unlock() + c.abortReads.Store(time.Until(t) < 0) + c.readSetAt = time.Now() + return c.Conn.SetReadDeadline(t) +} + +// SetWriteDeadline sets the deadline for future Write calls +// and any currently-blocked Write call. +// Even if write times out, it may return n > 0, indicating that +// some of the data was successfully written. +// A zero value for t means Write will not time out. +func (c *DeadlineConn) SetWriteDeadline(t time.Time) error { + if t.IsZero() && c.writeDeadline != 0 { + c.abortWrites.Store(false) + // Keep the deadline we want. + return nil + } + c.mu.Lock() + defer c.mu.Unlock() + c.abortWrites.Store(time.Until(t) < 0) + c.writeSetAt = time.Now() + return c.Conn.SetWriteDeadline(t) +} + +// Close wraps conn.Close and stops sending deadline updates. +func (c *DeadlineConn) Close() error { + c.abortReads.Store(true) + c.abortWrites.Store(true) + return c.Conn.Close() +} + // WithReadDeadline sets a new read side net.Conn deadline. func (c *DeadlineConn) WithReadDeadline(d time.Duration) *DeadlineConn { c.readDeadline = d diff --git a/internal/deadlineconn/deadlineconn_test.go b/internal/deadlineconn/deadlineconn_test.go index c8269f97b..6921e47b1 100644 --- a/internal/deadlineconn/deadlineconn_test.go +++ b/internal/deadlineconn/deadlineconn_test.go @@ -19,6 +19,7 @@ package deadlineconn import ( "bufio" + "fmt" "io" "net" "sync" @@ -115,3 +116,77 @@ func TestBuffConnReadTimeout(t *testing.T) { wg.Wait() } + +// Test deadlineconn handles read timeout properly by reading two messages beyond deadline. +func TestBuffConnReadCheckTimeout(t *testing.T) { + l, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("unable to create listener. %v", err) + } + defer l.Close() + serverAddr := l.Addr().String() + + tcpListener, ok := l.(*net.TCPListener) + if !ok { + t.Fatalf("failed to assert to net.TCPListener") + } + var cerr error + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + tcpConn, terr := tcpListener.AcceptTCP() + if terr != nil { + cerr = fmt.Errorf("failed to accept new connection. %v", terr) + return + } + deadlineconn := New(tcpConn) + deadlineconn.WithReadDeadline(time.Second) + deadlineconn.WithWriteDeadline(time.Second) + defer deadlineconn.Close() + + // Read a line + b := make([]byte, 12) + _, terr = deadlineconn.Read(b) + if terr != nil { + cerr = fmt.Errorf("failed to read from client. %v", terr) + return + } + received := string(b) + if received != "message one\n" { + cerr = fmt.Errorf(`server: expected: "message one\n", got: %v`, received) + return + } + + // Set a deadline in the past to indicate we want the next read to fail. + // Ensure we don't override it on read. + deadlineconn.SetReadDeadline(time.Unix(1, 0)) + + // Be sure to exceed update interval + time.Sleep(updateInterval * 2) + + _, terr = deadlineconn.Read(b) + if terr == nil { + cerr = fmt.Errorf("could read from client, expected error, got %v", terr) + return + } + }() + + c, err := net.Dial("tcp", serverAddr) + if err != nil { + t.Fatalf("unable to connect to server. %v", err) + } + defer c.Close() + + _, err = io.WriteString(c, "message one\n") + if err != nil { + t.Fatalf("failed to write to server. %v", err) + } + _, _ = io.WriteString(c, "message two\n") + + wg.Wait() + if cerr != nil { + t.Fatal(cerr) + } +}