Add per connection RPC metrics (#19852)

Provides individual and aggregate stats for each RPC connection.

Example:

```
  "rpc": {
   "collectedAt": "2024-05-31T14:33:29.1373103+02:00",
   "connected": 30,
   "disconnected": 0,
   "outgoingStreams": 69,
   "incomingStreams": 0,
   "outgoingBytes": 174822796,
   "incomingBytes": 175821566,
   "outgoingMessages": 768595,
   "incomingMessages": 768589,
   "outQueue": 0,
   "lastPongTime": "2024-05-31T12:33:28Z",
   "byDestination": {
    "http://127.0.0.1:9001": {
     "collectedAt": "2024-05-31T14:33:29.1373103+02:00",
     "connected": 5,
     "disconnected": 0,
     "outgoingStreams": 2,
     "incomingStreams": 0,
     "outgoingBytes": 38432543,
     "incomingBytes": 66604052,
     "outgoingMessages": 229496,
     "incomingMessages": 229575,
     "outQueue": 0,
     "lastPongTime": "2024-05-31T12:33:27Z"
    },
    "http://127.0.0.1:9002": {
     "collectedAt": "2024-05-31T14:33:29.1373103+02:00",
     "connected": 5,
     "disconnected": 0,
     "outgoingStreams": 6,
     "incomingStreams": 0,
     "outgoingBytes": 38215680,
     "incomingBytes": 66121283,
     "outgoingMessages": 228525,
     "incomingMessages": 228510,
     "outQueue": 0,
     "lastPongTime": "2024-05-31T12:33:27Z"
    },
...
```
This commit is contained in:
Klaus Post 2024-05-31 22:16:24 -07:00 committed by GitHub
parent d3ae0aaad3
commit c5b3f5553f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 68 additions and 33 deletions

View File

@ -132,6 +132,15 @@ func collectLocalMetrics(types madmin.MetricType, opts collectMetricsOpts) (m ma
m.Aggregated.CPU.LoadStat = loadStat
}
}
if types.Contains(madmin.MetricsRPC) {
gr := globalGrid.Load()
if gr == nil {
m.Errors = append(m.Errors, fmt.Sprintf("%s: Grid not initialized", byHostName))
} else {
stats := gr.ConnStats()
m.Aggregated.RPC = &stats
}
}
// Add types...
// ByHost is a shallow reference, so careful about sharing.

2
go.mod
View File

@ -52,7 +52,7 @@ require (
github.com/minio/highwayhash v1.0.2
github.com/minio/kms-go/kes v0.3.0
github.com/minio/kms-go/kms v0.4.0
github.com/minio/madmin-go/v3 v3.0.52
github.com/minio/madmin-go/v3 v3.0.54-0.20240531145019-5310b0f9f805
github.com/minio/minio-go/v7 v7.0.70
github.com/minio/mux v1.9.0
github.com/minio/pkg/v3 v3.0.1

4
go.sum
View File

@ -454,8 +454,8 @@ github.com/minio/kms-go/kes v0.3.0 h1:SU8VGVM/Hk9w1OiSby3OatkcojooUqIdDHl6dtM6Nk
github.com/minio/kms-go/kes v0.3.0/go.mod h1:w6DeVT878qEOU3nUrYVy1WOT5H1Ig9hbDIh698NYJKY=
github.com/minio/kms-go/kms v0.4.0 h1:cLPZceEp+05xHotVBaeFJrgL7JcXM4lBy6PU0idkE7I=
github.com/minio/kms-go/kms v0.4.0/go.mod h1:q12CehiIy2qgBnDKq6Q7wmPi2PHSyRVug5DKp0HAVeE=
github.com/minio/madmin-go/v3 v3.0.52 h1:X2zoNfEkrgql9Hlal6Nfw7pAqLAM47yZy4i7yb2bUVk=
github.com/minio/madmin-go/v3 v3.0.52/go.mod h1:IFAwr0XMrdsLovxAdCcuq/eoL4nRuMVQQv0iubJANQw=
github.com/minio/madmin-go/v3 v3.0.54-0.20240531145019-5310b0f9f805 h1:iuHJkYkULcvG+0Q47rd0b4PG8hg+6PgCBfBwaNUEz7Q=
github.com/minio/madmin-go/v3 v3.0.54-0.20240531145019-5310b0f9f805/go.mod h1:IFAwr0XMrdsLovxAdCcuq/eoL4nRuMVQQv0iubJANQw=
github.com/minio/mc v0.0.0-20240524090849-a8fdcbe7cb2f h1:UPxKkqMqyHNb+68hTq+PDIUYAqAbbplUujEyP5Ez9rs=
github.com/minio/mc v0.0.0-20240524090849-a8fdcbe7cb2f/go.mod h1:d/mgZMbu2yRNyYOwVqvRBV25pKMfAz7fijObWSXkqpA=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=

View File

@ -122,6 +122,10 @@ type Connection struct {
outgoingBytes func(n int64) // Record outgoing bytes.
trace *tracer // tracer for this connection.
baseFlags Flags
outBytes atomic.Int64
inBytes atomic.Int64
inMessages atomic.Int64
outMessages atomic.Int64
// For testing only
debugInConn net.Conn
@ -232,13 +236,25 @@ func newConnection(o connectionParams) *Connection {
clientPingInterval: clientPingInterval,
connPingInterval: connPingInterval,
tlsConfig: o.tlsConfig,
incomingBytes: o.incomingBytes,
outgoingBytes: o.outgoingBytes,
}
if debugPrint {
// Random Mux ID
c.NextID = rand.Uint64()
}
// Record per connection stats.
c.outgoingBytes = func(n int64) {
if o.outgoingBytes != nil {
o.outgoingBytes(n)
}
c.outBytes.Add(n)
}
c.incomingBytes = func(n int64) {
if o.incomingBytes != nil {
o.incomingBytes(n)
}
c.inBytes.Add(n)
}
if !strings.HasPrefix(o.remote, "https://") && !strings.HasPrefix(o.remote, "wss://") {
c.baseFlags |= FlagCRCxxh3
}
@ -995,11 +1011,13 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) {
fmt.Printf("%s Got msg: %v\n", c.Local, m)
}
if m.Op != OpMerged {
c.inMessages.Add(1)
c.handleMsg(ctx, m, subID)
continue
}
// Handle merged messages.
messages := int(m.Seq)
c.inMessages.Add(int64(messages))
for i := 0; i < messages; i++ {
if atomic.LoadUint32((*uint32)(&c.state)) != StateConnected {
cancel(ErrDisconnected)
@ -1100,6 +1118,11 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) {
queueSize += len(toSend)
continue
}
c.outMessages.Add(int64(len(queue) + 1))
if c.outgoingBytes != nil {
c.outgoingBytes(int64(len(toSend) + queueSize))
}
c.connChange.L.Lock()
for {
state := c.State()
@ -1578,11 +1601,28 @@ func (c *Connection) State() State {
}
// Stats returns the current connection stats.
func (c *Connection) Stats() ConnectionStats {
return ConnectionStats{
func (c *Connection) Stats() madmin.RPCMetrics {
conn := 0
if c.State() == StateConnected {
conn++
}
m := madmin.RPCMetrics{
CollectedAt: time.Now(),
Connected: conn,
Disconnected: 1 - conn,
IncomingStreams: c.inStream.Size(),
OutgoingStreams: c.outgoing.Size(),
IncomingBytes: c.inBytes.Load(),
OutgoingBytes: c.outBytes.Load(),
IncomingMessages: c.inMessages.Load(),
OutgoingMessages: c.outMessages.Load(),
OutQueue: len(c.outQueue),
LastPongTime: time.Unix(c.LastPong, 0).UTC(),
}
m.ByDestination = map[string]madmin.RPCMetrics{
c.Remote: m,
}
return m
}
func (c *Connection) debugMsg(d debugMsg, args ...any) {

View File

@ -334,3 +334,13 @@ func (m *Manager) debugMsg(d debugMsg, args ...any) {
c.debugMsg(d, args...)
}
}
// ConnStats returns the connection statistics for all connections.
func (m *Manager) ConnStats() madmin.RPCMetrics {
var res madmin.RPCMetrics
for _, c := range m.targets {
t := c.Stats()
res.Merge(&t)
}
return res
}

View File

@ -1,24 +0,0 @@
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package grid
// ConnectionStats contains connection statistics.
type ConnectionStats struct {
OutgoingStreams int
IncomingStreams int
}