diff --git a/cmd/metrics-realtime.go b/cmd/metrics-realtime.go index 41fb2d1bb..3aa2e7d78 100644 --- a/cmd/metrics-realtime.go +++ b/cmd/metrics-realtime.go @@ -132,6 +132,15 @@ func collectLocalMetrics(types madmin.MetricType, opts collectMetricsOpts) (m ma m.Aggregated.CPU.LoadStat = loadStat } } + if types.Contains(madmin.MetricsRPC) { + gr := globalGrid.Load() + if gr == nil { + m.Errors = append(m.Errors, fmt.Sprintf("%s: Grid not initialized", byHostName)) + } else { + stats := gr.ConnStats() + m.Aggregated.RPC = &stats + } + } // Add types... // ByHost is a shallow reference, so careful about sharing. diff --git a/go.mod b/go.mod index 084d25a31..07e1de754 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,7 @@ require ( github.com/minio/highwayhash v1.0.2 github.com/minio/kms-go/kes v0.3.0 github.com/minio/kms-go/kms v0.4.0 - github.com/minio/madmin-go/v3 v3.0.52 + github.com/minio/madmin-go/v3 v3.0.54-0.20240531145019-5310b0f9f805 github.com/minio/minio-go/v7 v7.0.70 github.com/minio/mux v1.9.0 github.com/minio/pkg/v3 v3.0.1 diff --git a/go.sum b/go.sum index f0ffaaa50..3a83a88e2 100644 --- a/go.sum +++ b/go.sum @@ -454,8 +454,8 @@ github.com/minio/kms-go/kes v0.3.0 h1:SU8VGVM/Hk9w1OiSby3OatkcojooUqIdDHl6dtM6Nk github.com/minio/kms-go/kes v0.3.0/go.mod h1:w6DeVT878qEOU3nUrYVy1WOT5H1Ig9hbDIh698NYJKY= github.com/minio/kms-go/kms v0.4.0 h1:cLPZceEp+05xHotVBaeFJrgL7JcXM4lBy6PU0idkE7I= github.com/minio/kms-go/kms v0.4.0/go.mod h1:q12CehiIy2qgBnDKq6Q7wmPi2PHSyRVug5DKp0HAVeE= -github.com/minio/madmin-go/v3 v3.0.52 h1:X2zoNfEkrgql9Hlal6Nfw7pAqLAM47yZy4i7yb2bUVk= -github.com/minio/madmin-go/v3 v3.0.52/go.mod h1:IFAwr0XMrdsLovxAdCcuq/eoL4nRuMVQQv0iubJANQw= +github.com/minio/madmin-go/v3 v3.0.54-0.20240531145019-5310b0f9f805 h1:iuHJkYkULcvG+0Q47rd0b4PG8hg+6PgCBfBwaNUEz7Q= +github.com/minio/madmin-go/v3 v3.0.54-0.20240531145019-5310b0f9f805/go.mod h1:IFAwr0XMrdsLovxAdCcuq/eoL4nRuMVQQv0iubJANQw= github.com/minio/mc v0.0.0-20240524090849-a8fdcbe7cb2f h1:UPxKkqMqyHNb+68hTq+PDIUYAqAbbplUujEyP5Ez9rs= github.com/minio/mc v0.0.0-20240524090849-a8fdcbe7cb2f/go.mod h1:d/mgZMbu2yRNyYOwVqvRBV25pKMfAz7fijObWSXkqpA= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= diff --git a/internal/grid/connection.go b/internal/grid/connection.go index 6492d4625..a6b867e1b 100644 --- a/internal/grid/connection.go +++ b/internal/grid/connection.go @@ -122,6 +122,10 @@ type Connection struct { outgoingBytes func(n int64) // Record outgoing bytes. trace *tracer // tracer for this connection. baseFlags Flags + outBytes atomic.Int64 + inBytes atomic.Int64 + inMessages atomic.Int64 + outMessages atomic.Int64 // For testing only debugInConn net.Conn @@ -232,13 +236,25 @@ func newConnection(o connectionParams) *Connection { clientPingInterval: clientPingInterval, connPingInterval: connPingInterval, tlsConfig: o.tlsConfig, - incomingBytes: o.incomingBytes, - outgoingBytes: o.outgoingBytes, } if debugPrint { // Random Mux ID c.NextID = rand.Uint64() } + + // Record per connection stats. + c.outgoingBytes = func(n int64) { + if o.outgoingBytes != nil { + o.outgoingBytes(n) + } + c.outBytes.Add(n) + } + c.incomingBytes = func(n int64) { + if o.incomingBytes != nil { + o.incomingBytes(n) + } + c.inBytes.Add(n) + } if !strings.HasPrefix(o.remote, "https://") && !strings.HasPrefix(o.remote, "wss://") { c.baseFlags |= FlagCRCxxh3 } @@ -995,11 +1011,13 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) { fmt.Printf("%s Got msg: %v\n", c.Local, m) } if m.Op != OpMerged { + c.inMessages.Add(1) c.handleMsg(ctx, m, subID) continue } // Handle merged messages. messages := int(m.Seq) + c.inMessages.Add(int64(messages)) for i := 0; i < messages; i++ { if atomic.LoadUint32((*uint32)(&c.state)) != StateConnected { cancel(ErrDisconnected) @@ -1100,6 +1118,11 @@ func (c *Connection) handleMessages(ctx context.Context, conn net.Conn) { queueSize += len(toSend) continue } + c.outMessages.Add(int64(len(queue) + 1)) + if c.outgoingBytes != nil { + c.outgoingBytes(int64(len(toSend) + queueSize)) + } + c.connChange.L.Lock() for { state := c.State() @@ -1578,11 +1601,28 @@ func (c *Connection) State() State { } // Stats returns the current connection stats. -func (c *Connection) Stats() ConnectionStats { - return ConnectionStats{ - IncomingStreams: c.inStream.Size(), - OutgoingStreams: c.outgoing.Size(), +func (c *Connection) Stats() madmin.RPCMetrics { + conn := 0 + if c.State() == StateConnected { + conn++ } + m := madmin.RPCMetrics{ + CollectedAt: time.Now(), + Connected: conn, + Disconnected: 1 - conn, + IncomingStreams: c.inStream.Size(), + OutgoingStreams: c.outgoing.Size(), + IncomingBytes: c.inBytes.Load(), + OutgoingBytes: c.outBytes.Load(), + IncomingMessages: c.inMessages.Load(), + OutgoingMessages: c.outMessages.Load(), + OutQueue: len(c.outQueue), + LastPongTime: time.Unix(c.LastPong, 0).UTC(), + } + m.ByDestination = map[string]madmin.RPCMetrics{ + c.Remote: m, + } + return m } func (c *Connection) debugMsg(d debugMsg, args ...any) { diff --git a/internal/grid/manager.go b/internal/grid/manager.go index 58cc82429..d47784a2a 100644 --- a/internal/grid/manager.go +++ b/internal/grid/manager.go @@ -334,3 +334,13 @@ func (m *Manager) debugMsg(d debugMsg, args ...any) { c.debugMsg(d, args...) } } + +// ConnStats returns the connection statistics for all connections. +func (m *Manager) ConnStats() madmin.RPCMetrics { + var res madmin.RPCMetrics + for _, c := range m.targets { + t := c.Stats() + res.Merge(&t) + } + return res +} diff --git a/internal/grid/stats.go b/internal/grid/stats.go deleted file mode 100644 index 2d21d4bdc..000000000 --- a/internal/grid/stats.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright (c) 2015-2023 MinIO, Inc. -// -// This file is part of MinIO Object Storage stack -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package grid - -// ConnectionStats contains connection statistics. -type ConnectionStats struct { - OutgoingStreams int - IncomingStreams int -}