From 6d3e0c7db6df1a804e1691041d72e81bfe137cb9 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Wed, 15 May 2024 08:39:21 -0700 Subject: [PATCH] Tweak one way stream ping (#19743) Do not log errors on oneway streams when sending ping fails. Instead cancel the stream. This also makes sure pings are sent when blocked on sending responses. I will do a separate PR that includes this and adds pings to two-way streams as well as tests for pings. --- internal/grid/muxclient.go | 40 ++++++++++++++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/internal/grid/muxclient.go b/internal/grid/muxclient.go index 5a981e513..dd5331a31 100644 --- a/internal/grid/muxclient.go +++ b/internal/grid/muxclient.go @@ -331,6 +331,7 @@ func (m *muxClient) handleOneWayStream(respHandler chan<- Response, respServer < if !ok { return } + sendResp: select { case respHandler <- resp: m.respMu.Lock() @@ -341,18 +342,49 @@ func (m *muxClient) handleOneWayStream(respHandler chan<- Response, respServer < case <-m.ctx.Done(): // Client canceled. Don't block. // Next loop will catch it. + case <-pingTimer: + if !m.doPing(respHandler) { + return + } + goto sendResp } case <-pingTimer: - if time.Since(time.Unix(atomic.LoadInt64(&m.LastPong), 0)) > clientPingInterval*2 { - m.addErrorNonBlockingClose(respHandler, ErrDisconnected) + if !m.doPing(respHandler) { return } - // Send new ping. - gridLogIf(m.ctx, m.send(message{Op: OpPing, MuxID: m.MuxID})) } } } +// doPing checks last ping time and sends another ping. +func (m *muxClient) doPing(respHandler chan<- Response) (ok bool) { + m.respMu.Lock() + if m.closed { + m.respMu.Unlock() + // Already closed. This is not an error state; + // we may just be delivering the last responses. + return true + } + + // Only check ping when not closed. + if got := time.Since(time.Unix(atomic.LoadInt64(&m.LastPong), 0)); got > clientPingInterval*2 { + m.respMu.Unlock() + if debugPrint { + fmt.Printf("Mux %d: last pong %v ago, disconnecting\n", m.MuxID, got) + } + m.addErrorNonBlockingClose(respHandler, ErrDisconnected) + return false + } + + // Send new ping + err := m.sendLocked(message{Op: OpPing, MuxID: m.MuxID}) + m.respMu.Unlock() + if err != nil { + m.addErrorNonBlockingClose(respHandler, err) + } + return err == nil +} + // responseCh is the channel to that goes to the requester. // internalResp is the channel that comes from the server. func (m *muxClient) handleTwowayResponses(responseCh chan<- Response, internalResp <-chan Response) {