From 8a698fef7161a276a71bb063f16ef9663f2551e7 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sun, 25 Feb 2024 00:51:38 -0800 Subject: [PATCH] fix: crash in ResourceMetrics RPC handling concurrent writers (#19123) Continuation of #19103 that had fixed the crash in peer metrics for cluster endpoint. --- cmd/peer-rest-client.go | 42 +++++++++++++++++++++-------------------- cmd/peer-rest-server.go | 28 ++++++++++++++++----------- 2 files changed, 39 insertions(+), 31 deletions(-) diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 156ff3beb..16c22935a 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -242,26 +242,6 @@ func (client *peerRESTClient) GetMetrics(ctx context.Context, t madmin.MetricTyp return v.ValueOrZero(), err } -func (client *peerRESTClient) GetResourceMetrics(ctx context.Context) (<-chan Metric, error) { - st, err := getResourceMetricsRPC.Call(ctx, client.gridConn(), grid.NewMSS()) - if err != nil { - return nil, err - } - ch := make(chan Metric, 1) - go func(ch chan<- Metric) { - defer close(ch) - st.Results(func(metric *Metric) error { - select { - case <-ctx.Done(): - return ctx.Err() - case ch <- *metric: - return nil - } - }) - }(ch) - return ch, nil -} - // GetProcInfo - fetch MinIO process information for a remote node. func (client *peerRESTClient) GetProcInfo(ctx context.Context) (info madmin.ProcInfo, err error) { resp, err := getProcInfoRPC.Call(ctx, client.gridConn(), grid.NewMSS()) @@ -661,6 +641,28 @@ func (client *peerRESTClient) MonitorBandwidth(ctx context.Context, buckets []st return getBandwidthRPC.Call(ctx, client.gridConn(), values) } +func (client *peerRESTClient) GetResourceMetrics(ctx context.Context) (<-chan Metric, error) { + resp, err := getResourceMetricsRPC.Call(ctx, client.gridConn(), grid.NewMSS()) + if err != nil { + return nil, err + } + ch := make(chan Metric) + go func(ch chan<- Metric) { + defer close(ch) + for _, m := range resp.Value() { + if m == nil { + continue + } + select { + case <-ctx.Done(): + return + case ch <- *m: + } + } + }(ch) + return ch, nil +} + func (client *peerRESTClient) GetPeerMetrics(ctx context.Context) (<-chan Metric, error) { resp, err := getPeerMetricsRPC.Call(ctx, client.gridConn(), grid.NewMSS()) if err != nil { diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 6738d9237..e41d97f73 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -89,6 +89,7 @@ var ( getPartitionsRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.Partitions]](grid.HandlerGetPartitions, grid.NewMSS, madminPartitions.NewJSON) getPeerBucketMetricsRPC = grid.NewSingleHandler[*grid.MSS, *grid.Array[*Metric]](grid.HandlerGetPeerBucketMetrics, grid.NewMSS, aoMetricsGroup.New) getPeerMetricsRPC = grid.NewSingleHandler[*grid.MSS, *grid.Array[*Metric]](grid.HandlerGetPeerMetrics, grid.NewMSS, aoMetricsGroup.New) + getResourceMetricsRPC = grid.NewSingleHandler[*grid.MSS, *grid.Array[*Metric]](grid.HandlerGetResourceMetrics, grid.NewMSS, aoMetricsGroup.New) getProcInfoRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.ProcInfo]](grid.HandlerGetProcInfo, grid.NewMSS, madminProcInfo.NewJSON) getSRMetricsRPC = grid.NewSingleHandler[*grid.MSS, *SRMetricsSummary](grid.HandlerGetSRMetrics, grid.NewMSS, func() *SRMetricsSummary { return &SRMetricsSummary{} }) getSysConfigRPC = grid.NewSingleHandler[*grid.MSS, *grid.JSON[madmin.SysConfig]](grid.HandlerGetSysConfig, grid.NewMSS, madminSysConfig.NewJSON) @@ -116,9 +117,8 @@ var ( // STREAMS // Set an output capacity of 100 for consoleLog and listenRPC // There is another buffer that will buffer events. - consoleLogRPC = grid.NewStream[*grid.MSS, grid.NoPayload, *grid.Bytes](grid.HandlerConsoleLog, grid.NewMSS, nil, grid.NewBytes).WithOutCapacity(100) - listenRPC = grid.NewStream[*grid.URLValues, grid.NoPayload, *grid.Bytes](grid.HandlerListen, grid.NewURLValues, nil, grid.NewBytes).WithOutCapacity(100) - getResourceMetricsRPC = grid.NewStream[*grid.MSS, grid.NoPayload, *Metric](grid.HandlerGetResourceMetrics, grid.NewMSS, nil, func() *Metric { return &Metric{} }) + consoleLogRPC = grid.NewStream[*grid.MSS, grid.NoPayload, *grid.Bytes](grid.HandlerConsoleLog, grid.NewMSS, nil, grid.NewBytes).WithOutCapacity(100) + listenRPC = grid.NewStream[*grid.URLValues, grid.NoPayload, *grid.Bytes](grid.HandlerListen, grid.NewURLValues, nil, grid.NewBytes).WithOutCapacity(100) ) // GetLocksHandler - returns list of lock from the server. @@ -440,13 +440,6 @@ func (s *peerRESTServer) GetMetricsHandler(v *grid.URLValues) (*grid.JSON[madmin return madminRealtimeMetrics.NewJSONWith(&info), nil } -func (s *peerRESTServer) GetResourceMetrics(ctx context.Context, _ *grid.MSS, out chan<- *Metric) *grid.RemoteErr { - for m := range ReportMetrics(ctx, resourceMetricsGroups) { - out <- &m - } - return nil -} - // GetSysConfigHandler - returns system config information. // (only the config that are of concern to minio) func (s *peerRESTServer) GetSysConfigHandler(_ *grid.MSS) (*grid.JSON[madmin.SysConfig], *grid.RemoteErr) { @@ -1005,6 +998,19 @@ func (s *peerRESTServer) GetBandwidth(params *grid.URLValues) (*bandwidth.Bucket return globalBucketMonitor.GetReport(selectBuckets), nil } +func (s *peerRESTServer) GetResourceMetrics(_ *grid.MSS) (*grid.Array[*Metric], *grid.RemoteErr) { + res := make([]*Metric, 0, len(resourceMetricsGroups)) + populateAndPublish(resourceMetricsGroups, func(m Metric) bool { + if m.VariableLabels == nil { + m.VariableLabels = make(map[string]string, 1) + } + m.VariableLabels[serverName] = globalLocalNodeName + res = append(res, &m) + return true + }) + return aoMetricsGroup.NewWith(res), nil +} + // GetPeerMetrics gets the metrics to be federated across peers. func (s *peerRESTServer) GetPeerMetrics(_ *grid.MSS) (*grid.Array[*Metric], *grid.RemoteErr) { res := make([]*Metric, 0, len(peerMetricsGroups)) @@ -1319,7 +1325,7 @@ func registerPeerRESTHandlers(router *mux.Router, gm *grid.Manager) { logger.FatalIf(getPeerBucketMetricsRPC.Register(gm, server.GetPeerBucketMetrics), "unable to register handler") logger.FatalIf(getPeerMetricsRPC.Register(gm, server.GetPeerMetrics), "unable to register handler") logger.FatalIf(getProcInfoRPC.Register(gm, server.GetProcInfoHandler), "unable to register handler") - logger.FatalIf(getResourceMetricsRPC.RegisterNoInput(gm, server.GetResourceMetrics), "unable to register handler") + logger.FatalIf(getResourceMetricsRPC.Register(gm, server.GetResourceMetrics), "unable to register handler") logger.FatalIf(getSRMetricsRPC.Register(gm, server.GetSRMetricsHandler), "unable to register handler") logger.FatalIf(getSysConfigRPC.Register(gm, server.GetSysConfigHandler), "unable to register handler") logger.FatalIf(getSysErrorsRPC.Register(gm, server.GetSysErrorsHandler), "unable to register handler")