diff --git a/go.mod b/go.mod index f5e796b22..cfbf5a422 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( github.com/minio/highwayhash v1.0.3 github.com/minio/kms-go/kes v0.3.0 github.com/minio/kms-go/kms v0.4.0 - github.com/minio/madmin-go/v3 v3.0.58 + github.com/minio/madmin-go/v3 v3.0.59-0.20240725120704-3cfbffc45f08 github.com/minio/minio-go/v7 v7.0.73 github.com/minio/mux v1.9.0 github.com/minio/pkg/v3 v3.0.9 diff --git a/go.sum b/go.sum index 0a0b221f9..28a1fc984 100644 --- a/go.sum +++ b/go.sum @@ -457,8 +457,8 @@ github.com/minio/kms-go/kes v0.3.0 h1:SU8VGVM/Hk9w1OiSby3OatkcojooUqIdDHl6dtM6Nk github.com/minio/kms-go/kes v0.3.0/go.mod h1:w6DeVT878qEOU3nUrYVy1WOT5H1Ig9hbDIh698NYJKY= github.com/minio/kms-go/kms v0.4.0 h1:cLPZceEp+05xHotVBaeFJrgL7JcXM4lBy6PU0idkE7I= github.com/minio/kms-go/kms v0.4.0/go.mod h1:q12CehiIy2qgBnDKq6Q7wmPi2PHSyRVug5DKp0HAVeE= -github.com/minio/madmin-go/v3 v3.0.58 h1:CUhb6FsBvgPfP1iOWvMGqlrB1epYpJw0i/yGXPH12WQ= -github.com/minio/madmin-go/v3 v3.0.58/go.mod h1:IFAwr0XMrdsLovxAdCcuq/eoL4nRuMVQQv0iubJANQw= +github.com/minio/madmin-go/v3 v3.0.59-0.20240725120704-3cfbffc45f08 h1:VqSGPGX5F5els13e9j/IHN7wZQOj0eM5fvJvMS0E0tA= +github.com/minio/madmin-go/v3 v3.0.59-0.20240725120704-3cfbffc45f08/go.mod h1:IFAwr0XMrdsLovxAdCcuq/eoL4nRuMVQQv0iubJANQw= github.com/minio/mc v0.0.0-20240702213905-74032bc16a3f h1:UN7hxbfLhBssFfoqS4zNIBDMC57qgLpbym6v0XYLe2s= github.com/minio/mc v0.0.0-20240702213905-74032bc16a3f/go.mod h1:kJaOnJZfmThdTEUR/9GlLbKYiqx+a5oFQac8wWaDogA= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= diff --git a/internal/grid/connection.go b/internal/grid/connection.go index 508126e56..bdec6ccb8 100644 --- a/internal/grid/connection.go +++ b/internal/grid/connection.go @@ -123,6 +123,9 @@ type Connection struct { inBytes atomic.Int64 inMessages atomic.Int64 outMessages atomic.Int64 + reconnects atomic.Int64 + lastConnect atomic.Pointer[time.Time] + lastPingDur atomic.Int64 // For testing only debugInConn net.Conn @@ -707,6 +710,8 @@ func (c *Connection) connect() { retry(fmt.Errorf("connection rejected: %s", r.RejectedReason)) continue } + t := time.Now().UTC() + c.lastConnect.Store(&t) c.reconnectMu.Lock() remoteUUID := uuid.UUID(r.ID) if c.remoteID != nil { @@ -807,6 +812,8 @@ func (c *Connection) handleIncoming(ctx context.Context, conn net.Conn, req conn if err != nil { return err } + t := time.Now().UTC() + c.lastConnect.Store(&t) // Signal that we are reconnected, update state and handle messages. // Prevent other connections from connecting while we process. c.reconnectMu.Lock() @@ -826,6 +833,7 @@ func (c *Connection) handleIncoming(ctx context.Context, conn net.Conn, req conn // caller *must* hold reconnectMu. func (c *Connection) reconnected() { c.updateState(StateConnectionError) + c.reconnects.Add(1) // Drain the outQueue, so any blocked messages can be sent. // We keep the queue, but start draining it, if it gets full. @@ -1090,7 +1098,8 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont ping := time.NewTicker(connPingInterval) pingFrame := message{ Op: OpPing, - DeadlineMS: 5000, + DeadlineMS: uint32(connPingInterval.Milliseconds()), + Payload: make([]byte, pingMsg{}.Msgsize()), } defer ping.Stop() @@ -1116,11 +1125,18 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont return } } + ping := pingMsg{ + T: time.Now(), + } var err error + if pingFrame.Payload, err = ping.MarshalMsg(pingFrame.Payload[:0]); err != nil { + gridLogIf(ctx, err) // Fake it... Though this should never fail. + atomic.StoreInt64(&c.LastPong, time.Now().UnixNano()) + continue + } toSend, err = pingFrame.MarshalMsg(GetByteBuffer()[:0]) if err != nil { - gridLogIf(ctx, err) - // Fake it... + gridLogIf(ctx, err) // Fake it... Though this should never fail. atomic.StoreInt64(&c.LastPong, time.Now().UnixNano()) continue } @@ -1428,13 +1444,16 @@ func (c *Connection) handleRequest(ctx context.Context, m message, subID *subHan } func (c *Connection) handlePong(ctx context.Context, m message) { - if m.MuxID == 0 && m.Payload == nil { - atomic.StoreInt64(&c.LastPong, time.Now().UnixNano()) - return - } var pong pongMsg _, err := pong.UnmarshalMsg(m.Payload) PutByteBuffer(m.Payload) + m.Payload = nil + + if m.MuxID == 0 { + atomic.StoreInt64(&c.LastPong, time.Now().UnixNano()) + c.lastPingDur.Store(int64(time.Since(pong.T))) + return + } gridLogIf(ctx, err) if m.MuxID == 0 { atomic.StoreInt64(&c.LastPong, time.Now().UnixNano()) @@ -1450,18 +1469,26 @@ func (c *Connection) handlePong(ctx context.Context, m message) { } func (c *Connection) handlePing(ctx context.Context, m message) { + var ping pingMsg + if len(m.Payload) > 0 { + _, err := ping.UnmarshalMsg(m.Payload) + if err != nil { + gridLogIf(ctx, err) + } + } + // c.queueMsg will reuse m.Payload + if m.MuxID == 0 { - m.Flags.Clear(FlagPayloadIsZero) - m.Op = OpPong - gridLogIf(ctx, c.queueMsg(m, nil)) + gridLogIf(ctx, c.queueMsg(m, &pongMsg{T: ping.T})) return } // Single calls do not support pinging. if v, ok := c.inStream.Load(m.MuxID); ok { pong := v.ping(m.Seq) + pong.T = ping.T gridLogIf(ctx, c.queueMsg(m, &pong)) } else { - pong := pongMsg{NotFound: true} + pong := pongMsg{NotFound: true, T: ping.T} gridLogIf(ctx, c.queueMsg(m, &pong)) } return @@ -1640,6 +1667,11 @@ func (c *Connection) Stats() madmin.RPCMetrics { if c.State() == StateConnected { conn++ } + var lastConn time.Time + if t := c.lastConnect.Load(); t != nil { + lastConn = *t + } + pingMS := float64(c.lastPingDur.Load()) / float64(time.Millisecond) m := madmin.RPCMetrics{ CollectedAt: time.Now(), Connected: conn, @@ -1652,6 +1684,10 @@ func (c *Connection) Stats() madmin.RPCMetrics { OutgoingMessages: c.outMessages.Load(), OutQueue: len(c.outQueue), LastPongTime: time.Unix(0, c.LastPong).UTC(), + LastConnectTime: lastConn, + ReconnectCount: int(c.reconnects.Load()), + LastPingMS: pingMS, + MaxPingDurMS: pingMS, } m.ByDestination = map[string]madmin.RPCMetrics{ c.Remote: m, diff --git a/internal/grid/msg.go b/internal/grid/msg.go index 355078b6c..b72520f08 100644 --- a/internal/grid/msg.go +++ b/internal/grid/msg.go @@ -290,10 +290,19 @@ func (muxConnectError) Op() Op { } type pongMsg struct { - NotFound bool `msg:"nf"` - Err *string `msg:"e,allownil"` + NotFound bool `msg:"nf"` + Err *string `msg:"e,allownil"` + T time.Time `msg:"t"` } func (pongMsg) Op() Op { return OpPong } + +type pingMsg struct { + T time.Time `msg:"t"` +} + +func (pingMsg) Op() Op { + return OpPing +} diff --git a/internal/grid/msg_gen.go b/internal/grid/msg_gen.go index 14e88c740..c51e92969 100644 --- a/internal/grid/msg_gen.go +++ b/internal/grid/msg_gen.go @@ -787,6 +787,109 @@ func (z muxConnectError) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *pingMsg) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "t": + z.T, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "T") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z pingMsg) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 1 + // write "t" + err = en.Append(0x81, 0xa1, 0x74) + if err != nil { + return + } + err = en.WriteTime(z.T) + if err != nil { + err = msgp.WrapError(err, "T") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z pingMsg) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 1 + // string "t" + o = append(o, 0x81, 0xa1, 0x74) + o = msgp.AppendTime(o, z.T) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *pingMsg) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "t": + z.T, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "T") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z pingMsg) Msgsize() (s int) { + s = 1 + 2 + msgp.TimeSize + return +} + // DecodeMsg implements msgp.Decodable func (z *pongMsg) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte @@ -829,6 +932,12 @@ func (z *pongMsg) DecodeMsg(dc *msgp.Reader) (err error) { return } } + case "t": + z.T, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "T") + return + } default: err = dc.Skip() if err != nil { @@ -842,9 +951,9 @@ func (z *pongMsg) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *pongMsg) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 2 + // map header, size 3 // write "nf" - err = en.Append(0x82, 0xa2, 0x6e, 0x66) + err = en.Append(0x83, 0xa2, 0x6e, 0x66) if err != nil { return } @@ -870,15 +979,25 @@ func (z *pongMsg) EncodeMsg(en *msgp.Writer) (err error) { return } } + // write "t" + err = en.Append(0xa1, 0x74) + if err != nil { + return + } + err = en.WriteTime(z.T) + if err != nil { + err = msgp.WrapError(err, "T") + return + } return } // MarshalMsg implements msgp.Marshaler func (z *pongMsg) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 2 + // map header, size 3 // string "nf" - o = append(o, 0x82, 0xa2, 0x6e, 0x66) + o = append(o, 0x83, 0xa2, 0x6e, 0x66) o = msgp.AppendBool(o, z.NotFound) // string "e" o = append(o, 0xa1, 0x65) @@ -887,6 +1006,9 @@ func (z *pongMsg) MarshalMsg(b []byte) (o []byte, err error) { } else { o = msgp.AppendString(o, *z.Err) } + // string "t" + o = append(o, 0xa1, 0x74) + o = msgp.AppendTime(o, z.T) return } @@ -931,6 +1053,12 @@ func (z *pongMsg) UnmarshalMsg(bts []byte) (o []byte, err error) { return } } + case "t": + z.T, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "T") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -951,5 +1079,6 @@ func (z *pongMsg) Msgsize() (s int) { } else { s += msgp.StringPrefixSize + len(*z.Err) } + s += 2 + msgp.TimeSize return } diff --git a/internal/grid/msg_gen_test.go b/internal/grid/msg_gen_test.go index a3170c811..6bade39e0 100644 --- a/internal/grid/msg_gen_test.go +++ b/internal/grid/msg_gen_test.go @@ -461,6 +461,119 @@ func BenchmarkDecodemuxConnectError(b *testing.B) { } } +func TestMarshalUnmarshalpingMsg(t *testing.T) { + v := pingMsg{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgpingMsg(b *testing.B) { + v := pingMsg{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgpingMsg(b *testing.B) { + v := pingMsg{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalpingMsg(b *testing.B) { + v := pingMsg{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodepingMsg(t *testing.T) { + v := pingMsg{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodepingMsg Msgsize() is inaccurate") + } + + vn := pingMsg{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodepingMsg(b *testing.B) { + v := pingMsg{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodepingMsg(b *testing.B) { + v := pingMsg{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalpongMsg(t *testing.T) { v := pongMsg{} bts, err := v.MarshalMsg(nil)