diff --git a/internal/deadlineconn/deadlineconn.go b/internal/deadlineconn/deadlineconn.go index 7cb7b766e..a2cd0038a 100644 --- a/internal/deadlineconn/deadlineconn.go +++ b/internal/deadlineconn/deadlineconn.go @@ -23,23 +23,35 @@ import ( "time" ) +const updateInterval = 250 * time.Millisecond + // DeadlineConn - is a generic stream-oriented network connection supporting buffered reader and read/write timeout. type DeadlineConn struct { net.Conn readDeadline time.Duration // sets the read deadline on a connection. + readSetAt time.Time writeDeadline time.Duration // sets the write deadline on a connection. + writeSetAt time.Time } // Sets read deadline func (c *DeadlineConn) setReadDeadline() { if c.readDeadline > 0 { - c.Conn.SetReadDeadline(time.Now().UTC().Add(c.readDeadline)) + now := time.Now() + if now.Sub(c.readSetAt) > updateInterval { + c.Conn.SetReadDeadline(now.Add(c.readDeadline + updateInterval)) + c.readSetAt = now + } } } func (c *DeadlineConn) setWriteDeadline() { if c.writeDeadline > 0 { - c.Conn.SetWriteDeadline(time.Now().UTC().Add(c.writeDeadline)) + now := time.Now() + if now.Sub(c.writeSetAt) > updateInterval { + c.Conn.SetWriteDeadline(now.Add(c.writeDeadline + updateInterval)) + c.writeSetAt = now + } } } diff --git a/internal/grid/connection.go b/internal/grid/connection.go index bdec6ccb8..8bfe682f6 100644 --- a/internal/grid/connection.go +++ b/internal/grid/connection.go @@ -967,7 +967,7 @@ func (c *Connection) readStream(ctx context.Context, conn net.Conn, cancel conte SkipHeaderCheck: false, OnIntermediate: controlHandler, } - readDataInto := func(dst []byte, rw io.ReadWriter, s ws.State, want ws.OpCode) ([]byte, error) { + readDataInto := func(dst []byte, s ws.State, want ws.OpCode) ([]byte, error) { dst = dst[:0] for { hdr, err := wsReader.NextFrame() @@ -1005,7 +1005,7 @@ func (c *Connection) readStream(ctx context.Context, conn net.Conn, cancel conte } var err error - msg, err = readDataInto(msg, conn, c.side, ws.OpBinary) + msg, err = readDataInto(msg, c.side, ws.OpBinary) if err != nil { if !xnet.IsNetworkOrHostDown(err, true) { gridLogIfNot(ctx, fmt.Errorf("ws read: %w", err), net.ErrClosed, io.EOF) @@ -1108,6 +1108,38 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont var queueSize int var buf bytes.Buffer var wsw wsWriter + var lastSetDeadline time.Time + + // Helper to write everything in buf. + // Return false if an error occurred and the connection is unusable. + // Buffer will be reset empty when returning successfully. + writeBuffer := func() (ok bool) { + now := time.Now() + // Only set write deadline once every second + if now.Sub(lastSetDeadline) > time.Second { + err := conn.SetWriteDeadline(now.Add(connWriteTimeout + time.Second)) + if err != nil { + gridLogIf(ctx, fmt.Errorf("conn.SetWriteDeadline: %w", err)) + return false + } + lastSetDeadline = now + } + + _, err := buf.WriteTo(conn) + if err != nil { + if !xnet.IsNetworkOrHostDown(err, true) { + gridLogIf(ctx, fmt.Errorf("ws write: %w", err)) + } + return false + } + if buf.Cap() > writeBufferSize*4 { + // Reset buffer if it gets too big, so we don't keep it around. + buf = bytes.Buffer{} + } + buf.Reset() + return true + } + for { var toSend []byte select { @@ -1185,7 +1217,6 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont c.connChange.L.Unlock() if len(queue) == 0 { // Send single message without merging. - buf.Reset() err := wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend) if err != nil { if !xnet.IsNetworkOrHostDown(err, true) { @@ -1195,17 +1226,7 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont } PutByteBuffer(toSend) - err = conn.SetWriteDeadline(time.Now().Add(connWriteTimeout)) - if err != nil { - gridLogIf(ctx, fmt.Errorf("conn.SetWriteDeadline: %w", err)) - return - } - - _, err = buf.WriteTo(conn) - if err != nil { - if !xnet.IsNetworkOrHostDown(err, true) { - gridLogIf(ctx, fmt.Errorf("ws write: %w", err)) - } + if !writeBuffer() { return } continue @@ -1235,7 +1256,6 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont // Combine writes. // Consider avoiding buffer copy. - buf.Reset() err = wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend) if err != nil { if !xnet.IsNetworkOrHostDown(err, true) { @@ -1244,25 +1264,9 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont return } - err = conn.SetWriteDeadline(time.Now().Add(connWriteTimeout)) - if err != nil { - gridLogIf(ctx, fmt.Errorf("conn.SetWriteDeadline: %w", err)) + if !writeBuffer() { return } - - // buf is our local buffer, so we can reuse it. - _, err = buf.WriteTo(conn) - if err != nil { - if !xnet.IsNetworkOrHostDown(err, true) { - gridLogIf(ctx, fmt.Errorf("ws write: %w", err)) - } - return - } - - if buf.Cap() > writeBufferSize*4 { - // Reset buffer if it gets too big, so we don't keep it around. - buf = bytes.Buffer{} - } } }