fix: crash in ResourceMetrics RPC handling concurrent writers (#19123)

Continuation of #19103 that had fixed the crash in peer metrics for cluster endpoint.
This commit is contained in:
Harshavardhana 2024-02-25 00:51:38 -08:00 committed by GitHub
parent b49ce1713f
commit 8a698fef71
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 39 additions and 31 deletions

View File

@ -242,26 +242,6 @@ func (client *peerRESTClient) GetMetrics(ctx context.Context, t madmin.MetricTyp
return v.ValueOrZero(), err
}
func (client *peerRESTClient) GetResourceMetrics(ctx context.Context) (<-chan Metric, error) {
st, err := getResourceMetricsRPC.Call(ctx, client.gridConn(), grid.NewMSS())
if err != nil {
return nil, err
}
ch := make(chan Metric, 1)
go func(ch chan<- Metric) {
defer close(ch)
st.Results(func(metric *Metric) error {
select {
case <-ctx.Done():
return ctx.Err()
case ch <- *metric:
return nil
}
})
}(ch)
return ch, nil
}
// GetProcInfo - fetch MinIO process information for a remote node.
func (client *peerRESTClient) GetProcInfo(ctx context.Context) (info madmin.ProcInfo, err error) {
resp, err := getProcInfoRPC.Call(ctx, client.gridConn(), grid.NewMSS())
@ -661,6 +641,28 @@ func (client *peerRESTClient) MonitorBandwidth(ctx context.Context, buckets []st
return getBandwidthRPC.Call(ctx, client.gridConn(), values)
}
func (client *peerRESTClient) GetResourceMetrics(ctx context.Context) (<-chan Metric, error) {
resp, err := getResourceMetricsRPC.Call(ctx, client.gridConn(), grid.NewMSS())
if err != nil {
return nil, err
}
ch := make(chan Metric)
go func(ch chan<- Metric) {
defer close(ch)
for _, m := range resp.Value() {
if m == nil {
continue
}
select {
case <-ctx.Done():
return
case ch <- *m:
}
}
}(ch)
return ch, nil
}
func (client *peerRESTClient) GetPeerMetrics(ctx context.Context) (<-chan Metric, error) {
resp, err := getPeerMetricsRPC.Call(ctx, client.gridConn(), grid.NewMSS())
if err != nil {

View File

@ -89,6 +89,7 @@ var (
getPartitionsRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.Partitions]](grid.HandlerGetPartitions, grid.NewMSS, madminPartitions.NewJSON)
getPeerBucketMetricsRPC = grid.NewSingleHandler[*grid.MSS, *grid.Array[*Metric]](grid.HandlerGetPeerBucketMetrics, grid.NewMSS, aoMetricsGroup.New)
getPeerMetricsRPC = grid.NewSingleHandler[*grid.MSS, *grid.Array[*Metric]](grid.HandlerGetPeerMetrics, grid.NewMSS, aoMetricsGroup.New)
getResourceMetricsRPC = grid.NewSingleHandler[*grid.MSS, *grid.Array[*Metric]](grid.HandlerGetResourceMetrics, grid.NewMSS, aoMetricsGroup.New)
getProcInfoRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.ProcInfo]](grid.HandlerGetProcInfo, grid.NewMSS, madminProcInfo.NewJSON)
getSRMetricsRPC = grid.NewSingleHandler[*grid.MSS, *SRMetricsSummary](grid.HandlerGetSRMetrics, grid.NewMSS, func() *SRMetricsSummary { return &SRMetricsSummary{} })
getSysConfigRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.SysConfig]](grid.HandlerGetSysConfig, grid.NewMSS, madminSysConfig.NewJSON)
@ -116,9 +117,8 @@ var (
// STREAMS
// Set an output capacity of 100 for consoleLog and listenRPC
// There is another buffer that will buffer events.
consoleLogRPC = grid.NewStream[*grid.MSS, grid.NoPayload, *grid.Bytes](grid.HandlerConsoleLog, grid.NewMSS, nil, grid.NewBytes).WithOutCapacity(100)
listenRPC = grid.NewStream[*grid.URLValues, grid.NoPayload, *grid.Bytes](grid.HandlerListen, grid.NewURLValues, nil, grid.NewBytes).WithOutCapacity(100)
getResourceMetricsRPC = grid.NewStream[*grid.MSS, grid.NoPayload, *Metric](grid.HandlerGetResourceMetrics, grid.NewMSS, nil, func() *Metric { return &Metric{} })
consoleLogRPC = grid.NewStream[*grid.MSS, grid.NoPayload, *grid.Bytes](grid.HandlerConsoleLog, grid.NewMSS, nil, grid.NewBytes).WithOutCapacity(100)
listenRPC = grid.NewStream[*grid.URLValues, grid.NoPayload, *grid.Bytes](grid.HandlerListen, grid.NewURLValues, nil, grid.NewBytes).WithOutCapacity(100)
)
// GetLocksHandler - returns list of lock from the server.
@ -440,13 +440,6 @@ func (s *peerRESTServer) GetMetricsHandler(v *grid.URLValues) (*grid.JSON[madmin
return madminRealtimeMetrics.NewJSONWith(&info), nil
}
func (s *peerRESTServer) GetResourceMetrics(ctx context.Context, _ *grid.MSS, out chan<- *Metric) *grid.RemoteErr {
for m := range ReportMetrics(ctx, resourceMetricsGroups) {
out <- &m
}
return nil
}
// GetSysConfigHandler - returns system config information.
// (only the config that are of concern to minio)
func (s *peerRESTServer) GetSysConfigHandler(_ *grid.MSS) (*grid.JSON[madmin.SysConfig], *grid.RemoteErr) {
@ -1005,6 +998,19 @@ func (s *peerRESTServer) GetBandwidth(params *grid.URLValues) (*bandwidth.Bucket
return globalBucketMonitor.GetReport(selectBuckets), nil
}
func (s *peerRESTServer) GetResourceMetrics(_ *grid.MSS) (*grid.Array[*Metric], *grid.RemoteErr) {
res := make([]*Metric, 0, len(resourceMetricsGroups))
populateAndPublish(resourceMetricsGroups, func(m Metric) bool {
if m.VariableLabels == nil {
m.VariableLabels = make(map[string]string, 1)
}
m.VariableLabels[serverName] = globalLocalNodeName
res = append(res, &m)
return true
})
return aoMetricsGroup.NewWith(res), nil
}
// GetPeerMetrics gets the metrics to be federated across peers.
func (s *peerRESTServer) GetPeerMetrics(_ *grid.MSS) (*grid.Array[*Metric], *grid.RemoteErr) {
res := make([]*Metric, 0, len(peerMetricsGroups))
@ -1319,7 +1325,7 @@ func registerPeerRESTHandlers(router *mux.Router, gm *grid.Manager) {
logger.FatalIf(getPeerBucketMetricsRPC.Register(gm, server.GetPeerBucketMetrics), "unable to register handler")
logger.FatalIf(getPeerMetricsRPC.Register(gm, server.GetPeerMetrics), "unable to register handler")
logger.FatalIf(getProcInfoRPC.Register(gm, server.GetProcInfoHandler), "unable to register handler")
logger.FatalIf(getResourceMetricsRPC.RegisterNoInput(gm, server.GetResourceMetrics), "unable to register handler")
logger.FatalIf(getResourceMetricsRPC.Register(gm, server.GetResourceMetrics), "unable to register handler")
logger.FatalIf(getSRMetricsRPC.Register(gm, server.GetSRMetricsHandler), "unable to register handler")
logger.FatalIf(getSysConfigRPC.Register(gm, server.GetSysConfigHandler), "unable to register handler")
logger.FatalIf(getSysErrorsRPC.Register(gm, server.GetSysErrorsHandler), "unable to register handler")