Update connection deadlines less frequently (#20166)

Only set write deadline on connections every second. Combine the 2 write locations into 1.
This commit is contained in:
Klaus Post 2024-07-26 10:40:11 -07:00 committed by GitHub
parent a16193bb50
commit 59788e25c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 50 additions and 34 deletions

View File

@ -23,23 +23,35 @@ import (
"time" "time"
) )
const updateInterval = 250 * time.Millisecond
// DeadlineConn - is a generic stream-oriented network connection supporting buffered reader and read/write timeout. // DeadlineConn - is a generic stream-oriented network connection supporting buffered reader and read/write timeout.
type DeadlineConn struct { type DeadlineConn struct {
net.Conn net.Conn
readDeadline time.Duration // sets the read deadline on a connection. readDeadline time.Duration // sets the read deadline on a connection.
readSetAt time.Time
writeDeadline time.Duration // sets the write deadline on a connection. writeDeadline time.Duration // sets the write deadline on a connection.
writeSetAt time.Time
} }
// Sets read deadline // Sets read deadline
func (c *DeadlineConn) setReadDeadline() { func (c *DeadlineConn) setReadDeadline() {
if c.readDeadline > 0 { if c.readDeadline > 0 {
c.Conn.SetReadDeadline(time.Now().UTC().Add(c.readDeadline)) now := time.Now()
if now.Sub(c.readSetAt) > updateInterval {
c.Conn.SetReadDeadline(now.Add(c.readDeadline + updateInterval))
c.readSetAt = now
}
} }
} }
func (c *DeadlineConn) setWriteDeadline() { func (c *DeadlineConn) setWriteDeadline() {
if c.writeDeadline > 0 { if c.writeDeadline > 0 {
c.Conn.SetWriteDeadline(time.Now().UTC().Add(c.writeDeadline)) now := time.Now()
if now.Sub(c.writeSetAt) > updateInterval {
c.Conn.SetWriteDeadline(now.Add(c.writeDeadline + updateInterval))
c.writeSetAt = now
}
} }
} }

View File

@ -967,7 +967,7 @@ func (c *Connection) readStream(ctx context.Context, conn net.Conn, cancel conte
SkipHeaderCheck: false, SkipHeaderCheck: false,
OnIntermediate: controlHandler, OnIntermediate: controlHandler,
} }
readDataInto := func(dst []byte, rw io.ReadWriter, s ws.State, want ws.OpCode) ([]byte, error) { readDataInto := func(dst []byte, s ws.State, want ws.OpCode) ([]byte, error) {
dst = dst[:0] dst = dst[:0]
for { for {
hdr, err := wsReader.NextFrame() hdr, err := wsReader.NextFrame()
@ -1005,7 +1005,7 @@ func (c *Connection) readStream(ctx context.Context, conn net.Conn, cancel conte
} }
var err error var err error
msg, err = readDataInto(msg, conn, c.side, ws.OpBinary) msg, err = readDataInto(msg, c.side, ws.OpBinary)
if err != nil { if err != nil {
if !xnet.IsNetworkOrHostDown(err, true) { if !xnet.IsNetworkOrHostDown(err, true) {
gridLogIfNot(ctx, fmt.Errorf("ws read: %w", err), net.ErrClosed, io.EOF) gridLogIfNot(ctx, fmt.Errorf("ws read: %w", err), net.ErrClosed, io.EOF)
@ -1108,6 +1108,38 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont
var queueSize int var queueSize int
var buf bytes.Buffer var buf bytes.Buffer
var wsw wsWriter var wsw wsWriter
var lastSetDeadline time.Time
// Helper to write everything in buf.
// Return false if an error occurred and the connection is unusable.
// Buffer will be reset empty when returning successfully.
writeBuffer := func() (ok bool) {
now := time.Now()
// Only set write deadline once every second
if now.Sub(lastSetDeadline) > time.Second {
err := conn.SetWriteDeadline(now.Add(connWriteTimeout + time.Second))
if err != nil {
gridLogIf(ctx, fmt.Errorf("conn.SetWriteDeadline: %w", err))
return false
}
lastSetDeadline = now
}
_, err := buf.WriteTo(conn)
if err != nil {
if !xnet.IsNetworkOrHostDown(err, true) {
gridLogIf(ctx, fmt.Errorf("ws write: %w", err))
}
return false
}
if buf.Cap() > writeBufferSize*4 {
// Reset buffer if it gets too big, so we don't keep it around.
buf = bytes.Buffer{}
}
buf.Reset()
return true
}
for { for {
var toSend []byte var toSend []byte
select { select {
@ -1185,7 +1217,6 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont
c.connChange.L.Unlock() c.connChange.L.Unlock()
if len(queue) == 0 { if len(queue) == 0 {
// Send single message without merging. // Send single message without merging.
buf.Reset()
err := wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend) err := wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend)
if err != nil { if err != nil {
if !xnet.IsNetworkOrHostDown(err, true) { if !xnet.IsNetworkOrHostDown(err, true) {
@ -1195,17 +1226,7 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont
} }
PutByteBuffer(toSend) PutByteBuffer(toSend)
err = conn.SetWriteDeadline(time.Now().Add(connWriteTimeout)) if !writeBuffer() {
if err != nil {
gridLogIf(ctx, fmt.Errorf("conn.SetWriteDeadline: %w", err))
return
}
_, err = buf.WriteTo(conn)
if err != nil {
if !xnet.IsNetworkOrHostDown(err, true) {
gridLogIf(ctx, fmt.Errorf("ws write: %w", err))
}
return return
} }
continue continue
@ -1235,7 +1256,6 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont
// Combine writes. // Combine writes.
// Consider avoiding buffer copy. // Consider avoiding buffer copy.
buf.Reset()
err = wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend) err = wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend)
if err != nil { if err != nil {
if !xnet.IsNetworkOrHostDown(err, true) { if !xnet.IsNetworkOrHostDown(err, true) {
@ -1244,25 +1264,9 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont
return return
} }
err = conn.SetWriteDeadline(time.Now().Add(connWriteTimeout)) if !writeBuffer() {
if err != nil {
gridLogIf(ctx, fmt.Errorf("conn.SetWriteDeadline: %w", err))
return return
} }
// buf is our local buffer, so we can reuse it.
_, err = buf.WriteTo(conn)
if err != nil {
if !xnet.IsNetworkOrHostDown(err, true) {
gridLogIf(ctx, fmt.Errorf("ws write: %w", err))
}
return
}
if buf.Cap() > writeBufferSize*4 {
// Reset buffer if it gets too big, so we don't keep it around.
buf = bytes.Buffer{}
}
} }
} }