Fix reconnected deadlock with full queue (#19964)

When a reconnection happens, `handleMessages` must be able to complete and exit. 

This can be prevented in a full queue.

Deadlock chain (May 10th release)

```
1 @ 0x44110e 0x453125 0x109f88c 0x109f7d5 0x10a472c 0x10a3f72 0x10a34ed 0x4795e1
#	0x109f88b	github.com/minio/minio/internal/grid.(*Connection).send+0x3eb			github.com/minio/minio/internal/grid/connection.go:548
#	0x109f7d4	github.com/minio/minio/internal/grid.(*Connection).queueMsg+0x334		github.com/minio/minio/internal/grid/connection.go:586
#	0x10a472b	github.com/minio/minio/internal/grid.(*Connection).handleAckMux+0xab		github.com/minio/minio/internal/grid/connection.go:1284
#	0x10a3f71	github.com/minio/minio/internal/grid.(*Connection).handleMsg+0x231		github.com/minio/minio/internal/grid/connection.go:1211
#	0x10a34ec	github.com/minio/minio/internal/grid.(*Connection).handleMessages.func1+0x6cc	github.com/minio/minio/internal/grid/connection.go:1019

---> blocks ---> via (Connection).handleMsgWg

1 @ 0x44110e 0x454165 0x454134 0x475325 0x486b08 0x10a161a 0x10a1465 0x2470e67 0x7395a9 0x20e61af 0x20e5f1f 0x7395a9 0x22f781c 0x7395a9 0x22f89a5 0x7395a9 0x22f6e82 0x7395a9 0x22f49a2 0x7395a9 0x2206e45 0x7395a9 0x22f4d9c 0x7395a9 0x210ba06 0x7395a9 0x23089c2 0x7395a9 0x22f86e9 0x7395a9 0xd42582 0x2106c04
#	0x475324	sync.runtime_Semacquire+0x24								runtime/sema.go:62
#	0x486b07	sync.(*WaitGroup).Wait+0x47								sync/waitgroup.go:116
#	0x10a1619	github.com/minio/minio/internal/grid.(*Connection).reconnected+0xb9			github.com/minio/minio/internal/grid/connection.go:857
#	0x10a1464	github.com/minio/minio/internal/grid.(*Connection).handleIncoming+0x384			github.com/minio/minio/internal/grid/connection.go:825
```

Add a queue cleaner in reconnected that will pop old messages so `handleMessages` can 
send messages without blocking and exit appropriately for the connection to be re-established.

Messages are likely dropped by the remote, but we may have some that can succeed, 
so we only drop when running out of space.
This commit is contained in:
Klaus Post 2024-06-20 16:11:40 -07:00 committed by GitHub
parent e200808ab7
commit 3415c4dd1e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -854,6 +854,37 @@ func (c *Connection) handleIncoming(ctx context.Context, conn net.Conn, req conn
// caller *must* hold reconnectMu.
func (c *Connection) reconnected() {
c.updateState(StateConnectionError)
// Drain the outQueue, so any blocked messages can be sent.
// We keep the queue, but start draining it, if it gets full.
stopDraining := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
defer func() {
close(stopDraining)
wg.Wait()
}()
go func() {
defer wg.Done()
for {
select {
case <-stopDraining:
return
default:
if cap(c.outQueue)-len(c.outQueue) > 100 {
// Queue is not full, wait a bit.
time.Sleep(1 * time.Millisecond)
continue
}
select {
case v := <-c.outQueue:
PutByteBuffer(v)
case <-stopDraining:
return
}
}
}
}()
// Close all active requests.
if debugReqs {
fmt.Println(c.String(), "Reconnected. Clearing outgoing.")