Expose RPC reconnections and ping time (#20157)

- Keeps track of reconnection count.
- Keeps track of connection ping roundtrip times. 
  Sends timestamp in ping message.
- Allow ping without payload.
This commit is contained in:
Klaus Post 2024-07-25 14:07:21 -07:00 committed by GitHub
parent 4a1edfd9aa
commit 15b609ecea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 307 additions and 20 deletions

2
go.mod
View File

@ -51,7 +51,7 @@ require (
github.com/minio/highwayhash v1.0.3
github.com/minio/kms-go/kes v0.3.0
github.com/minio/kms-go/kms v0.4.0
github.com/minio/madmin-go/v3 v3.0.58
github.com/minio/madmin-go/v3 v3.0.59-0.20240725120704-3cfbffc45f08
github.com/minio/minio-go/v7 v7.0.73
github.com/minio/mux v1.9.0
github.com/minio/pkg/v3 v3.0.9

4
go.sum
View File

@ -457,8 +457,8 @@ github.com/minio/kms-go/kes v0.3.0 h1:SU8VGVM/Hk9w1OiSby3OatkcojooUqIdDHl6dtM6Nk
github.com/minio/kms-go/kes v0.3.0/go.mod h1:w6DeVT878qEOU3nUrYVy1WOT5H1Ig9hbDIh698NYJKY=
github.com/minio/kms-go/kms v0.4.0 h1:cLPZceEp+05xHotVBaeFJrgL7JcXM4lBy6PU0idkE7I=
github.com/minio/kms-go/kms v0.4.0/go.mod h1:q12CehiIy2qgBnDKq6Q7wmPi2PHSyRVug5DKp0HAVeE=
github.com/minio/madmin-go/v3 v3.0.58 h1:CUhb6FsBvgPfP1iOWvMGqlrB1epYpJw0i/yGXPH12WQ=
github.com/minio/madmin-go/v3 v3.0.58/go.mod h1:IFAwr0XMrdsLovxAdCcuq/eoL4nRuMVQQv0iubJANQw=
github.com/minio/madmin-go/v3 v3.0.59-0.20240725120704-3cfbffc45f08 h1:VqSGPGX5F5els13e9j/IHN7wZQOj0eM5fvJvMS0E0tA=
github.com/minio/madmin-go/v3 v3.0.59-0.20240725120704-3cfbffc45f08/go.mod h1:IFAwr0XMrdsLovxAdCcuq/eoL4nRuMVQQv0iubJANQw=
github.com/minio/mc v0.0.0-20240702213905-74032bc16a3f h1:UN7hxbfLhBssFfoqS4zNIBDMC57qgLpbym6v0XYLe2s=
github.com/minio/mc v0.0.0-20240702213905-74032bc16a3f/go.mod h1:kJaOnJZfmThdTEUR/9GlLbKYiqx+a5oFQac8wWaDogA=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=

View File

@ -123,6 +123,9 @@ type Connection struct {
inBytes atomic.Int64
inMessages atomic.Int64
outMessages atomic.Int64
reconnects atomic.Int64
lastConnect atomic.Pointer[time.Time]
lastPingDur atomic.Int64
// For testing only
debugInConn net.Conn
@ -707,6 +710,8 @@ func (c *Connection) connect() {
retry(fmt.Errorf("connection rejected: %s", r.RejectedReason))
continue
}
t := time.Now().UTC()
c.lastConnect.Store(&t)
c.reconnectMu.Lock()
remoteUUID := uuid.UUID(r.ID)
if c.remoteID != nil {
@ -807,6 +812,8 @@ func (c *Connection) handleIncoming(ctx context.Context, conn net.Conn, req conn
if err != nil {
return err
}
t := time.Now().UTC()
c.lastConnect.Store(&t)
// Signal that we are reconnected, update state and handle messages.
// Prevent other connections from connecting while we process.
c.reconnectMu.Lock()
@ -826,6 +833,7 @@ func (c *Connection) handleIncoming(ctx context.Context, conn net.Conn, req conn
// caller *must* hold reconnectMu.
func (c *Connection) reconnected() {
c.updateState(StateConnectionError)
c.reconnects.Add(1)
// Drain the outQueue, so any blocked messages can be sent.
// We keep the queue, but start draining it, if it gets full.
@ -1090,7 +1098,8 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont
ping := time.NewTicker(connPingInterval)
pingFrame := message{
Op: OpPing,
DeadlineMS: 5000,
DeadlineMS: uint32(connPingInterval.Milliseconds()),
Payload: make([]byte, pingMsg{}.Msgsize()),
}
defer ping.Stop()
@ -1116,11 +1125,18 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont
return
}
}
ping := pingMsg{
T: time.Now(),
}
var err error
if pingFrame.Payload, err = ping.MarshalMsg(pingFrame.Payload[:0]); err != nil {
gridLogIf(ctx, err) // Fake it... Though this should never fail.
atomic.StoreInt64(&c.LastPong, time.Now().UnixNano())
continue
}
toSend, err = pingFrame.MarshalMsg(GetByteBuffer()[:0])
if err != nil {
gridLogIf(ctx, err)
// Fake it...
gridLogIf(ctx, err) // Fake it... Though this should never fail.
atomic.StoreInt64(&c.LastPong, time.Now().UnixNano())
continue
}
@ -1428,13 +1444,16 @@ func (c *Connection) handleRequest(ctx context.Context, m message, subID *subHan
}
func (c *Connection) handlePong(ctx context.Context, m message) {
if m.MuxID == 0 && m.Payload == nil {
atomic.StoreInt64(&c.LastPong, time.Now().UnixNano())
return
}
var pong pongMsg
_, err := pong.UnmarshalMsg(m.Payload)
PutByteBuffer(m.Payload)
m.Payload = nil
if m.MuxID == 0 {
atomic.StoreInt64(&c.LastPong, time.Now().UnixNano())
c.lastPingDur.Store(int64(time.Since(pong.T)))
return
}
gridLogIf(ctx, err)
if m.MuxID == 0 {
atomic.StoreInt64(&c.LastPong, time.Now().UnixNano())
@ -1450,18 +1469,26 @@ func (c *Connection) handlePong(ctx context.Context, m message) {
}
func (c *Connection) handlePing(ctx context.Context, m message) {
var ping pingMsg
if len(m.Payload) > 0 {
_, err := ping.UnmarshalMsg(m.Payload)
if err != nil {
gridLogIf(ctx, err)
}
}
// c.queueMsg will reuse m.Payload
if m.MuxID == 0 {
m.Flags.Clear(FlagPayloadIsZero)
m.Op = OpPong
gridLogIf(ctx, c.queueMsg(m, nil))
gridLogIf(ctx, c.queueMsg(m, &pongMsg{T: ping.T}))
return
}
// Single calls do not support pinging.
if v, ok := c.inStream.Load(m.MuxID); ok {
pong := v.ping(m.Seq)
pong.T = ping.T
gridLogIf(ctx, c.queueMsg(m, &pong))
} else {
pong := pongMsg{NotFound: true}
pong := pongMsg{NotFound: true, T: ping.T}
gridLogIf(ctx, c.queueMsg(m, &pong))
}
return
@ -1640,6 +1667,11 @@ func (c *Connection) Stats() madmin.RPCMetrics {
if c.State() == StateConnected {
conn++
}
var lastConn time.Time
if t := c.lastConnect.Load(); t != nil {
lastConn = *t
}
pingMS := float64(c.lastPingDur.Load()) / float64(time.Millisecond)
m := madmin.RPCMetrics{
CollectedAt: time.Now(),
Connected: conn,
@ -1652,6 +1684,10 @@ func (c *Connection) Stats() madmin.RPCMetrics {
OutgoingMessages: c.outMessages.Load(),
OutQueue: len(c.outQueue),
LastPongTime: time.Unix(0, c.LastPong).UTC(),
LastConnectTime: lastConn,
ReconnectCount: int(c.reconnects.Load()),
LastPingMS: pingMS,
MaxPingDurMS: pingMS,
}
m.ByDestination = map[string]madmin.RPCMetrics{
c.Remote: m,

View File

@ -290,10 +290,19 @@ func (muxConnectError) Op() Op {
}
type pongMsg struct {
NotFound bool `msg:"nf"`
Err *string `msg:"e,allownil"`
NotFound bool `msg:"nf"`
Err *string `msg:"e,allownil"`
T time.Time `msg:"t"`
}
func (pongMsg) Op() Op {
return OpPong
}
type pingMsg struct {
T time.Time `msg:"t"`
}
func (pingMsg) Op() Op {
return OpPing
}

View File

@ -787,6 +787,109 @@ func (z muxConnectError) Msgsize() (s int) {
return
}
// DecodeMsg implements msgp.Decodable
func (z *pingMsg) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "t":
z.T, err = dc.ReadTime()
if err != nil {
err = msgp.WrapError(err, "T")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z pingMsg) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 1
// write "t"
err = en.Append(0x81, 0xa1, 0x74)
if err != nil {
return
}
err = en.WriteTime(z.T)
if err != nil {
err = msgp.WrapError(err, "T")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z pingMsg) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 1
// string "t"
o = append(o, 0x81, 0xa1, 0x74)
o = msgp.AppendTime(o, z.T)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *pingMsg) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "t":
z.T, bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, "T")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z pingMsg) Msgsize() (s int) {
s = 1 + 2 + msgp.TimeSize
return
}
// DecodeMsg implements msgp.Decodable
func (z *pongMsg) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
@ -829,6 +932,12 @@ func (z *pongMsg) DecodeMsg(dc *msgp.Reader) (err error) {
return
}
}
case "t":
z.T, err = dc.ReadTime()
if err != nil {
err = msgp.WrapError(err, "T")
return
}
default:
err = dc.Skip()
if err != nil {
@ -842,9 +951,9 @@ func (z *pongMsg) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *pongMsg) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2
// map header, size 3
// write "nf"
err = en.Append(0x82, 0xa2, 0x6e, 0x66)
err = en.Append(0x83, 0xa2, 0x6e, 0x66)
if err != nil {
return
}
@ -870,15 +979,25 @@ func (z *pongMsg) EncodeMsg(en *msgp.Writer) (err error) {
return
}
}
// write "t"
err = en.Append(0xa1, 0x74)
if err != nil {
return
}
err = en.WriteTime(z.T)
if err != nil {
err = msgp.WrapError(err, "T")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *pongMsg) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// map header, size 3
// string "nf"
o = append(o, 0x82, 0xa2, 0x6e, 0x66)
o = append(o, 0x83, 0xa2, 0x6e, 0x66)
o = msgp.AppendBool(o, z.NotFound)
// string "e"
o = append(o, 0xa1, 0x65)
@ -887,6 +1006,9 @@ func (z *pongMsg) MarshalMsg(b []byte) (o []byte, err error) {
} else {
o = msgp.AppendString(o, *z.Err)
}
// string "t"
o = append(o, 0xa1, 0x74)
o = msgp.AppendTime(o, z.T)
return
}
@ -931,6 +1053,12 @@ func (z *pongMsg) UnmarshalMsg(bts []byte) (o []byte, err error) {
return
}
}
case "t":
z.T, bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, "T")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
@ -951,5 +1079,6 @@ func (z *pongMsg) Msgsize() (s int) {
} else {
s += msgp.StringPrefixSize + len(*z.Err)
}
s += 2 + msgp.TimeSize
return
}

View File

@ -461,6 +461,119 @@ func BenchmarkDecodemuxConnectError(b *testing.B) {
}
}
func TestMarshalUnmarshalpingMsg(t *testing.T) {
v := pingMsg{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgpingMsg(b *testing.B) {
v := pingMsg{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgpingMsg(b *testing.B) {
v := pingMsg{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalpingMsg(b *testing.B) {
v := pingMsg{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodepingMsg(t *testing.T) {
v := pingMsg{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodepingMsg Msgsize() is inaccurate")
}
vn := pingMsg{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodepingMsg(b *testing.B) {
v := pingMsg{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodepingMsg(b *testing.B) {
v := pingMsg{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalpongMsg(t *testing.T) {
v := pongMsg{}
bts, err := v.MarshalMsg(nil)