Tweak grid for less writes (#20129)

Use `runtime.Gosched()` if we have less than maxMergeMessages and the 
queue is empty.  Up maxMergeMessages to 50 to merge more messages into 
a single write.

Add length check for an early bailout on readAllInto when we know packet length.
This commit is contained in:
Klaus Post 2024-07-23 03:28:14 -07:00 committed by GitHub
parent 4f5dded4d4
commit c0e2886e37
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 27 additions and 8 deletions

View File

@ -27,6 +27,7 @@ import (
"math"
"math/rand"
"net"
"runtime"
"runtime/debug"
"strings"
"sync"
@ -980,7 +981,10 @@ func (c *Connection) readStream(ctx context.Context, conn net.Conn, cancel conte
if int64(cap(dst)) < hdr.Length+1 {
dst = make([]byte, 0, hdr.Length+hdr.Length>>3)
}
return readAllInto(dst[:0], &wsReader)
if !hdr.Fin {
hdr.Length = -1
}
return readAllInto(dst[:0], &wsReader, hdr.Length)
}
}
@ -1125,10 +1129,16 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont
continue
}
}
if len(queue) < maxMergeMessages && queueSize+len(toSend) < writeBufferSize-1024 && len(c.outQueue) > 0 {
queue = append(queue, toSend)
queueSize += len(toSend)
continue
if len(queue) < maxMergeMessages && queueSize+len(toSend) < writeBufferSize-1024 {
if len(c.outQueue) == 0 {
// Yield to allow more messages to fill.
runtime.Gosched()
}
if len(c.outQueue) > 0 {
queue = append(queue, toSend)
queueSize += len(toSend)
continue
}
}
c.outMessages.Add(int64(len(queue) + 1))
if c.outgoingBytes != nil {
@ -1158,7 +1168,7 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont
}
c.connChange.L.Unlock()
if len(queue) == 0 {
// Combine writes.
// Send single message without merging.
buf.Reset()
err := wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend)
if err != nil {

View File

@ -57,7 +57,7 @@ const (
biggerBufMax = maxBufferSize
// If there is a queue, merge up to this many messages.
maxMergeMessages = 30
maxMergeMessages = 50
// clientPingInterval will ping the remote handler every 15 seconds.
// Clients disconnect when we exceed 2 intervals.
@ -126,7 +126,8 @@ var PutByteBuffer = func(b []byte) {
// A successful call returns err == nil, not err == EOF. Because readAllInto is
// defined to read from src until EOF, it does not treat an EOF from Read
// as an error to be reported.
func readAllInto(b []byte, r *wsutil.Reader) ([]byte, error) {
func readAllInto(b []byte, r *wsutil.Reader, want int64) ([]byte, error) {
read := int64(0)
for {
if len(b) == cap(b) {
// Add more capacity (let append pick how much).
@ -136,10 +137,18 @@ func readAllInto(b []byte, r *wsutil.Reader) ([]byte, error) {
b = b[:len(b)+n]
if err != nil {
if errors.Is(err, io.EOF) {
if want >= 0 && read+int64(n) != want {
return nil, io.ErrUnexpectedEOF
}
err = nil
}
return b, err
}
read += int64(n)
if want >= 0 && read == want {
// No need to read more...
return b, nil
}
}
}