mirror of
https://github.com/minio/minio.git
synced 2025-03-25 23:10:57 -04:00
Fix grid reconnection deadlock (#19101)
If network conditions have filled the output queue before a reconnect happens blocked sends could stop reconnects from happening. In short `respMu` would be held for a mux client while sending - if the queue is full this will never get released and closing the mux client will hang. A) Use the mux client context instead of connection context for sends, so sends are unblocked when the mux client is canceled. B) Use a `TryLock` on "close" and cancel the request if we cannot get the lock at once. This will unblock any attempts to send.
This commit is contained in:
parent
526b829a09
commit
22aa16ab12
@ -531,10 +531,11 @@ func (c *Connection) shouldConnect() bool {
|
||||
return h0 < h1
|
||||
}
|
||||
|
||||
func (c *Connection) send(msg []byte) error {
|
||||
func (c *Connection) send(ctx context.Context, msg []byte) error {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
return context.Cause(c.ctx)
|
||||
case <-ctx.Done():
|
||||
// Returning error here is too noisy.
|
||||
return nil
|
||||
case c.outQueue <- msg:
|
||||
return nil
|
||||
}
|
||||
@ -570,7 +571,7 @@ func (c *Connection) queueMsg(msg message, payload sender) error {
|
||||
h := xxh3.Hash(dst)
|
||||
dst = binary.LittleEndian.AppendUint32(dst, uint32(h))
|
||||
}
|
||||
return c.send(dst)
|
||||
return c.send(c.ctx, dst)
|
||||
}
|
||||
|
||||
// sendMsg will send
|
||||
|
@ -184,12 +184,12 @@ func (m *lockedClientMap) Delete(id uint64) {
|
||||
|
||||
func (m *lockedClientMap) Range(fn func(key uint64, value *muxClient) bool) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
for k, v := range m.m {
|
||||
if !fn(k, v) {
|
||||
break
|
||||
}
|
||||
}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func (m *lockedClientMap) Clear() {
|
||||
|
@ -173,7 +173,7 @@ func (m *muxClient) sendLocked(msg message) error {
|
||||
h := xxh3.Hash(dst)
|
||||
dst = binary.LittleEndian.AppendUint32(dst, uint32(h))
|
||||
}
|
||||
return m.parent.send(dst)
|
||||
return m.parent.send(m.ctx, dst)
|
||||
}
|
||||
|
||||
// RequestStateless will send a single payload request and stream back results.
|
||||
@ -552,7 +552,15 @@ func (m *muxClient) close() {
|
||||
if debugPrint {
|
||||
fmt.Println("closing outgoing mux", m.MuxID)
|
||||
}
|
||||
m.respMu.Lock()
|
||||
if !m.respMu.TryLock() {
|
||||
// Cancel before locking - will unblock any pending sends.
|
||||
if m.cancelFn != nil {
|
||||
m.cancelFn(context.Canceled)
|
||||
}
|
||||
// Wait for senders to release.
|
||||
m.respMu.Lock()
|
||||
}
|
||||
|
||||
defer m.respMu.Unlock()
|
||||
m.closeLocked()
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user