mirror of
https://github.com/minio/minio.git
synced 2025-04-19 02:05:24 -04:00
grid: Simpler reconnect logic (#18889)
Do not rely on `connChange` to do reconnects. Instead, you can block while the connection is running and reconnect when handleMessages returns. Add fully async monitoring instead of monitoring on the main goroutine and keep this to avoid full network lockup.
This commit is contained in:
parent
6347fb6636
commit
38de8e6936
@ -707,26 +707,15 @@ func (c *Connection) connect() {
|
|||||||
if debugPrint {
|
if debugPrint {
|
||||||
fmt.Println(c.Local, "Connected Waiting for Messages")
|
fmt.Println(c.Local, "Connected Waiting for Messages")
|
||||||
}
|
}
|
||||||
c.updateState(StateConnected)
|
// Handle messages...
|
||||||
go c.handleMessages(c.ctx, conn)
|
c.handleMessages(c.ctx, conn)
|
||||||
// Monitor state changes and reconnect if needed.
|
// Reconnect unless we are shutting down (debug only).
|
||||||
c.connChange.L.Lock()
|
if c.State() == StateShutdown {
|
||||||
for {
|
conn.Close()
|
||||||
newState := c.State()
|
return
|
||||||
if newState != StateConnected {
|
}
|
||||||
c.connChange.L.Unlock()
|
if debugPrint {
|
||||||
if newState == StateShutdown {
|
fmt.Println(c.Local, "Disconnected. Attempting to reconnect.")
|
||||||
conn.Close()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if debugPrint {
|
|
||||||
fmt.Println(c.Local, "Disconnected")
|
|
||||||
}
|
|
||||||
// Reconnect
|
|
||||||
break
|
|
||||||
}
|
|
||||||
// Unlock and wait for state change.
|
|
||||||
c.connChange.Wait()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -818,7 +807,7 @@ func (c *Connection) handleIncoming(ctx context.Context, conn net.Conn, req conn
|
|||||||
rid := uuid.UUID(req.ID)
|
rid := uuid.UUID(req.ID)
|
||||||
c.remoteID = &rid
|
c.remoteID = &rid
|
||||||
|
|
||||||
c.updateState(StateConnected)
|
// Handle incoming messages until disconnect.
|
||||||
c.handleMessages(ctx, conn)
|
c.handleMessages(ctx, conn)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -867,12 +856,36 @@ func (c *Connection) updateState(s State) {
|
|||||||
c.connChange.Broadcast()
|
c.connChange.Broadcast()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// monitorState will monitor the state of the connection and close the net.Conn if it changes.
|
||||||
|
func (c *Connection) monitorState(conn net.Conn, cancel context.CancelCauseFunc) {
|
||||||
|
c.connChange.L.Lock()
|
||||||
|
defer c.connChange.L.Unlock()
|
||||||
|
for {
|
||||||
|
newState := c.State()
|
||||||
|
if newState != StateConnected {
|
||||||
|
conn.Close()
|
||||||
|
cancel(ErrDisconnected)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Unlock and wait for state change.
|
||||||
|
c.connChange.Wait()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// handleMessages will handle incoming messages on conn.
|
// handleMessages will handle incoming messages on conn.
|
||||||
// caller *must* hold reconnectMu.
|
// caller *must* hold reconnectMu.
|
||||||
func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) {
|
func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) {
|
||||||
|
c.updateState(StateConnected)
|
||||||
|
ctx, cancel := context.WithCancelCause(ctx)
|
||||||
|
defer cancel(ErrDisconnected)
|
||||||
|
|
||||||
|
// This will ensure that is something asks to disconnect and we are blocked on reads/writes
|
||||||
|
// the connection will be closed and readers/writers will unblock.
|
||||||
|
go c.monitorState(conn, cancel)
|
||||||
|
|
||||||
c.handleMsgWg.Add(2)
|
c.handleMsgWg.Add(2)
|
||||||
c.reconnectMu.Unlock()
|
c.reconnectMu.Unlock()
|
||||||
ctx, cancel := context.WithCancelCause(ctx)
|
|
||||||
// Read goroutine
|
// Read goroutine
|
||||||
go func() {
|
go func() {
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -1034,7 +1047,6 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) {
|
|||||||
lastPongTime := time.Unix(lastPong, 0)
|
lastPongTime := time.Unix(lastPong, 0)
|
||||||
if d := time.Since(lastPongTime); d > connPingInterval*2 {
|
if d := time.Since(lastPongTime); d > connPingInterval*2 {
|
||||||
logger.LogIf(ctx, fmt.Errorf("host %s last pong too old (%v); disconnecting", c.Remote, d.Round(time.Millisecond)))
|
logger.LogIf(ctx, fmt.Errorf("host %s last pong too old (%v); disconnecting", c.Remote, d.Round(time.Millisecond)))
|
||||||
cancel(ErrDisconnected)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1084,14 +1096,12 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) {
|
|||||||
err := wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend)
|
err := wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("ws writeMessage: %w", err))
|
logger.LogIf(ctx, fmt.Errorf("ws writeMessage: %w", err))
|
||||||
cancel(ErrDisconnected)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
PutByteBuffer(toSend)
|
PutByteBuffer(toSend)
|
||||||
_, err = buf.WriteTo(conn)
|
_, err = buf.WriteTo(conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("ws write: %w", err))
|
logger.LogIf(ctx, fmt.Errorf("ws write: %w", err))
|
||||||
cancel(ErrDisconnected)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
@ -1109,7 +1119,6 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) {
|
|||||||
toSend, err = m.MarshalMsg(toSend)
|
toSend, err = m.MarshalMsg(toSend)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("msg.MarshalMsg: %w", err))
|
logger.LogIf(ctx, fmt.Errorf("msg.MarshalMsg: %w", err))
|
||||||
cancel(ErrDisconnected)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Append as byte slices.
|
// Append as byte slices.
|
||||||
@ -1126,14 +1135,12 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) {
|
|||||||
err = wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend)
|
err = wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("ws writeMessage: %w", err))
|
logger.LogIf(ctx, fmt.Errorf("ws writeMessage: %w", err))
|
||||||
cancel(ErrDisconnected)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Tosend is our local buffer, so we can reuse it.
|
// buf is our local buffer, so we can reuse it.
|
||||||
_, err = buf.WriteTo(conn)
|
_, err = buf.WriteTo(conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("ws write: %w", err))
|
logger.LogIf(ctx, fmt.Errorf("ws write: %w", err))
|
||||||
cancel(ErrDisconnected)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -691,6 +691,9 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) Call(ctx context.Context, c Stre
|
|||||||
if h.InCapacity > 0 {
|
if h.InCapacity > 0 {
|
||||||
reqT = make(chan Req)
|
reqT = make(chan Req)
|
||||||
// Request handler
|
// Request handler
|
||||||
|
if stream.Requests == nil {
|
||||||
|
return nil, fmt.Errorf("internal error: stream request channel nil")
|
||||||
|
}
|
||||||
go func() {
|
go func() {
|
||||||
defer close(stream.Requests)
|
defer close(stream.Requests)
|
||||||
for req := range reqT {
|
for req := range reqT {
|
||||||
|
@ -533,7 +533,9 @@ func (m *muxClient) closeLocked() {
|
|||||||
if m.closed {
|
if m.closed {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
close(m.respWait)
|
if m.respWait != nil {
|
||||||
m.respWait = nil
|
close(m.respWait)
|
||||||
|
m.respWait = nil
|
||||||
|
}
|
||||||
m.closed = true
|
m.closed = true
|
||||||
}
|
}
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"runtime/debug"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@ -138,7 +139,8 @@ func newMuxStream(ctx context.Context, msg message, c *Connection, handler Strea
|
|||||||
}
|
}
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("grid handler (%v) panic: %v", msg.Handler, r))
|
logger.LogIf(ctx, fmt.Errorf("grid handler (%v) panic: %v", msg.Handler, r))
|
||||||
err := RemoteErr(fmt.Sprintf("panic: %v", r))
|
debug.PrintStack()
|
||||||
|
err := RemoteErr(fmt.Sprintf("remote call panic: %v", r))
|
||||||
handlerErr = &err
|
handlerErr = &err
|
||||||
}
|
}
|
||||||
if debugPrint {
|
if debugPrint {
|
||||||
@ -244,8 +246,10 @@ func (m *muxServer) message(msg message) {
|
|||||||
if len(msg.Payload) > 0 {
|
if len(msg.Payload) > 0 {
|
||||||
logger.LogIf(m.ctx, fmt.Errorf("muxServer: EOF message with payload"))
|
logger.LogIf(m.ctx, fmt.Errorf("muxServer: EOF message with payload"))
|
||||||
}
|
}
|
||||||
close(m.inbound)
|
if m.inbound != nil {
|
||||||
m.inbound = nil
|
close(m.inbound)
|
||||||
|
m.inbound = nil
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user