fix: prevent queries from starting before initialization (#18766)

This commit is contained in:
jiuker 2024-01-11 07:21:52 +08:00 committed by GitHub
parent 39f9350697
commit c1a78224cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -50,32 +50,32 @@ var (
func init() {
clusterMetricsGroups := []*MetricsGroup{
getNodeHealthMetrics(),
getClusterStorageMetrics(),
getClusterTierMetrics(),
getClusterUsageMetrics(),
getKMSMetrics(),
getClusterHealthMetrics(),
getIAMNodeMetrics(),
getReplicationSiteMetrics(),
getNodeHealthMetrics(MetricsGroupOpts{dependGlobalNotificationSys: true}),
getClusterStorageMetrics(MetricsGroupOpts{dependGlobalObjectAPI: true}),
getClusterTierMetrics(MetricsGroupOpts{dependGlobalObjectAPI: true}),
getClusterUsageMetrics(MetricsGroupOpts{dependGlobalObjectAPI: true}),
getKMSMetrics(MetricsGroupOpts{dependGlobalObjectAPI: true, dependGlobalKMS: true}),
getClusterHealthMetrics(MetricsGroupOpts{dependGlobalObjectAPI: true}),
getIAMNodeMetrics(MetricsGroupOpts{dependGlobalAuthNPlugin: true, dependGlobalIAMSys: true}),
getReplicationSiteMetrics(MetricsGroupOpts{dependGlobalSiteReplicationSys: true}),
}
peerMetricsGroups = []*MetricsGroup{
getGoMetrics(),
getHTTPMetrics(false),
getNotificationMetrics(),
getLocalStorageMetrics(),
getHTTPMetrics(MetricsGroupOpts{}),
getNotificationMetrics(MetricsGroupOpts{dependGlobalLambdaTargetList: true}),
getLocalStorageMetrics(MetricsGroupOpts{dependGlobalObjectAPI: true}),
getMinioProcMetrics(),
getMinioVersionMetrics(),
getNetworkMetrics(),
getS3TTFBMetric(),
getILMNodeMetrics(),
getScannerNodeMetrics(),
getIAMNodeMetrics(),
getKMSNodeMetrics(),
getMinioHealingMetrics(),
getIAMNodeMetrics(MetricsGroupOpts{dependGlobalAuthNPlugin: true, dependGlobalIAMSys: true}),
getKMSNodeMetrics(MetricsGroupOpts{dependGlobalObjectAPI: true, dependGlobalKMS: true}),
getMinioHealingMetrics(MetricsGroupOpts{dependGlobalBackgroundHealState: true}),
getWebhookMetrics(),
getReplicationClusterMetrics(),
getReplicationClusterMetrics(MetricsGroupOpts{dependGlobalObjectAPI: true, dependBucketTargetSys: true}),
}
allMetricsGroups := func() (allMetrics []*MetricsGroup) {
@ -85,27 +85,27 @@ func init() {
}()
nodeGroups := []*MetricsGroup{
getNodeHealthMetrics(),
getHTTPMetrics(false),
getNodeHealthMetrics(MetricsGroupOpts{dependGlobalNotificationSys: true}),
getHTTPMetrics(MetricsGroupOpts{}),
getNetworkMetrics(),
getMinioVersionMetrics(),
getS3TTFBMetric(),
getTierMetrics(),
getNotificationMetrics(),
getDistLockMetrics(),
getIAMNodeMetrics(),
getLocalStorageMetrics(),
getNotificationMetrics(MetricsGroupOpts{dependGlobalLambdaTargetList: true}),
getDistLockMetrics(MetricsGroupOpts{dependGlobalIsDistErasure: true, dependGlobalLockServer: true}),
getIAMNodeMetrics(MetricsGroupOpts{dependGlobalAuthNPlugin: true, dependGlobalIAMSys: true}),
getLocalStorageMetrics(MetricsGroupOpts{dependGlobalObjectAPI: true}),
}
bucketMetricsGroups := []*MetricsGroup{
getBucketUsageMetrics(),
getHTTPMetrics(true),
getBucketUsageMetrics(MetricsGroupOpts{dependGlobalObjectAPI: true}),
getHTTPMetrics(MetricsGroupOpts{bucketOnly: true}),
getBucketTTFBMetric(),
getBatchJobsMetrics(),
getBatchJobsMetrics(MetricsGroupOpts{dependGlobalObjectAPI: true}),
}
bucketPeerMetricsGroups = []*MetricsGroup{
getHTTPMetrics(true),
getHTTPMetrics(MetricsGroupOpts{bucketOnly: true}),
getBucketTTFBMetric(),
}
@ -317,6 +317,23 @@ type Metric struct {
type MetricsGroup struct {
metricsCache timedValue
cacheInterval time.Duration
metricsGroupOpts MetricsGroupOpts
}
// MetricsGroupOpts are a group of metrics opts to be used to initialize the metrics group.
type MetricsGroupOpts struct {
dependGlobalObjectAPI bool
dependGlobalAuthNPlugin bool
dependGlobalSiteReplicationSys bool
dependGlobalNotificationSys bool
dependGlobalKMS bool
bucketOnly bool
dependGlobalLambdaTargetList bool
dependGlobalIAMSys bool
dependGlobalLockServer bool
dependGlobalIsDistErasure bool
dependGlobalBackgroundHealState bool
dependBucketTargetSys bool
}
// RegisterRead register the metrics populator function to be used
@ -326,6 +343,63 @@ func (g *MetricsGroup) RegisterRead(read func(ctx context.Context) []Metric) {
g.metricsCache.Relax = true
g.metricsCache.TTL = g.cacheInterval
g.metricsCache.Update = func() (interface{}, error) {
if g.metricsGroupOpts.dependGlobalObjectAPI {
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil {
return []Metric{}, nil
}
}
if g.metricsGroupOpts.dependGlobalAuthNPlugin {
if globalAuthNPlugin == nil {
return []Metric{}, nil
}
}
if g.metricsGroupOpts.dependGlobalSiteReplicationSys {
if !globalSiteReplicationSys.isEnabled() {
return []Metric{}, nil
}
}
if g.metricsGroupOpts.dependGlobalNotificationSys {
if globalNotificationSys == nil {
return []Metric{}, nil
}
}
if g.metricsGroupOpts.dependGlobalKMS {
if GlobalKMS == nil {
return []Metric{}, nil
}
}
if g.metricsGroupOpts.dependGlobalLambdaTargetList {
if globalLambdaTargetList == nil {
return []Metric{}, nil
}
}
if g.metricsGroupOpts.dependGlobalIAMSys {
if globalIAMSys == nil {
return []Metric{}, nil
}
}
if g.metricsGroupOpts.dependGlobalLockServer {
if globalLockServer == nil {
return []Metric{}, nil
}
}
if g.metricsGroupOpts.dependGlobalIsDistErasure {
if !globalIsDistErasure {
return []Metric{}, nil
}
}
if g.metricsGroupOpts.dependGlobalBackgroundHealState {
if globalBackgroundHealState == nil {
return []Metric{}, nil
}
}
if g.metricsGroupOpts.dependBucketTargetSys {
if globalBucketTargetSys == nil {
return []Metric{}, nil
}
}
return read(GlobalContext), nil
}
})
@ -1889,9 +1963,10 @@ func getScannerNodeMetrics() *MetricsGroup {
return mg
}
func getIAMNodeMetrics() *MetricsGroup {
func getIAMNodeMetrics(opts MetricsGroupOpts) *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 10 * time.Second,
metricsGroupOpts: opts,
}
mg.RegisterRead(func(_ context.Context) (metrics []Metric) {
lastSyncTime := atomic.LoadUint64(&globalIAMSys.LastRefreshTimeUnixNano)
@ -2010,9 +2085,10 @@ func getIAMNodeMetrics() *MetricsGroup {
}
// replication metrics for each node - published to the cluster endpoint with nodename as label
func getReplicationClusterMetrics() *MetricsGroup {
func getReplicationClusterMetrics(opts MetricsGroupOpts) *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 1 * time.Minute,
metricsGroupOpts: opts,
}
const (
Online = 1
@ -2021,11 +2097,6 @@ func getReplicationClusterMetrics() *MetricsGroup {
mg.RegisterRead(func(_ context.Context) []Metric {
var ml []Metric
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil {
return ml
}
// common operational metrics for bucket replication and site replication - published
// at cluster level
if globalReplicationStats != nil {
@ -2202,9 +2273,10 @@ func getReplicationClusterMetrics() *MetricsGroup {
}
// replication metrics for site replication
func getReplicationSiteMetrics() *MetricsGroup {
func getReplicationSiteMetrics(opts MetricsGroupOpts) *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 1 * time.Minute,
metricsGroupOpts: opts,
}
mg.RegisterRead(func(_ context.Context) []Metric {
ml := []Metric{}
@ -2301,9 +2373,10 @@ func getMinioVersionMetrics() *MetricsGroup {
return mg
}
func getNodeHealthMetrics() *MetricsGroup {
func getNodeHealthMetrics(opts MetricsGroupOpts) *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 1 * time.Minute,
metricsGroupOpts: opts,
}
mg.RegisterRead(func(_ context.Context) (metrics []Metric) {
metrics = make([]Metric, 0, 16)
@ -2321,9 +2394,10 @@ func getNodeHealthMetrics() *MetricsGroup {
return mg
}
func getMinioHealingMetrics() *MetricsGroup {
func getMinioHealingMetrics(opts MetricsGroupOpts) *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 10 * time.Second,
metricsGroupOpts: opts,
}
mg.RegisterRead(func(_ context.Context) (metrics []Metric) {
bgSeq, exists := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
@ -2391,9 +2465,10 @@ func getObjectsScanned(seq *healSequence) (m []Metric) {
return
}
func getDistLockMetrics() *MetricsGroup {
func getDistLockMetrics(opts MetricsGroupOpts) *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 1 * time.Second,
metricsGroupOpts: opts,
}
mg.RegisterRead(func(ctx context.Context) []Metric {
if !globalIsDistErasure {
@ -2438,9 +2513,10 @@ func getDistLockMetrics() *MetricsGroup {
return mg
}
func getNotificationMetrics() *MetricsGroup {
func getNotificationMetrics(opts MetricsGroupOpts) *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 10 * time.Second,
metricsGroupOpts: opts,
}
mg.RegisterRead(func(ctx context.Context) []Metric {
metrics := make([]Metric, 0, 3)
@ -2613,12 +2689,13 @@ func getNotificationMetrics() *MetricsGroup {
return mg
}
func getHTTPMetrics(bucketOnly bool) *MetricsGroup {
func getHTTPMetrics(opts MetricsGroupOpts) *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 10 * time.Second,
metricsGroupOpts: opts,
}
mg.RegisterRead(func(ctx context.Context) (metrics []Metric) {
if !bucketOnly {
if !mg.metricsGroupOpts.bucketOnly {
httpStats := globalHTTPStats.toServerHTTPStats()
metrics = make([]Metric, 0, 3+
len(httpStats.CurrentS3Requests.APIStats)+
@ -2804,16 +2881,13 @@ func getNetworkMetrics() *MetricsGroup {
return mg
}
func getClusterUsageMetrics() *MetricsGroup {
func getClusterUsageMetrics(opts MetricsGroupOpts) *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 1 * time.Minute,
metricsGroupOpts: opts,
}
mg.RegisterRead(func(ctx context.Context) (metrics []Metric) {
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil {
return
}
metrics = make([]Metric, 0, 50)
dataUsageInfo, err := loadDataUsageFromBackend(ctx, objLayer)
@ -2910,16 +2984,13 @@ func getClusterUsageMetrics() *MetricsGroup {
return mg
}
func getBucketUsageMetrics() *MetricsGroup {
func getBucketUsageMetrics(opts MetricsGroupOpts) *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 1 * time.Minute,
metricsGroupOpts: opts,
}
mg.RegisterRead(func(ctx context.Context) (metrics []Metric) {
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil {
return
}
metrics = make([]Metric, 0, 50)
dataUsageInfo, err := loadDataUsageFromBackend(ctx, objLayer)
@ -3099,15 +3170,14 @@ func getClusterTransitionedVersionsMD() MetricDescription {
}
}
func getClusterTierMetrics() *MetricsGroup {
func getClusterTierMetrics(opts MetricsGroupOpts) *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 1 * time.Minute,
metricsGroupOpts: opts,
}
mg.RegisterRead(func(ctx context.Context) (metrics []Metric) {
objLayer := newObjectLayerFn()
if objLayer == nil {
return
}
if globalTierConfigMgr.Empty() {
return
}
@ -3127,16 +3197,13 @@ func getClusterTierMetrics() *MetricsGroup {
return mg
}
func getLocalStorageMetrics() *MetricsGroup {
func getLocalStorageMetrics(opts MetricsGroupOpts) *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 1 * time.Minute,
metricsGroupOpts: opts,
}
mg.RegisterRead(func(ctx context.Context) (metrics []Metric) {
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil {
return
}
metrics = make([]Metric, 0, 50)
storageInfo := objLayer.LocalStorageInfo(ctx, true)
@ -3281,16 +3348,13 @@ func getClusterErasureSetHealingDrivesMD() MetricDescription {
}
}
func getClusterHealthMetrics() *MetricsGroup {
func getClusterHealthMetrics(opts MetricsGroupOpts) *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 10 * time.Second,
metricsGroupOpts: opts,
}
mg.RegisterRead(func(ctx context.Context) (metrics []Metric) {
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil {
return
}
opts := HealthOptions{}
result := objLayer.Health(ctx, opts)
@ -3345,18 +3409,13 @@ func getClusterHealthMetrics() *MetricsGroup {
return mg
}
func getBatchJobsMetrics() *MetricsGroup {
func getBatchJobsMetrics(opts MetricsGroupOpts) *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 10 * time.Second,
metricsGroupOpts: opts,
}
mg.RegisterRead(func(ctx context.Context) (metrics []Metric) {
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil {
return
}
var m madmin.RealtimeMetrics
mLocal := collectLocalMetrics(madmin.MetricsBatchJobs, collectMetricsOpts{})
m.Merge(&mLocal)
@ -3416,16 +3475,13 @@ func getBatchJobsMetrics() *MetricsGroup {
return mg
}
func getClusterStorageMetrics() *MetricsGroup {
func getClusterStorageMetrics(opts MetricsGroupOpts) *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 1 * time.Minute,
metricsGroupOpts: opts,
}
mg.RegisterRead(func(ctx context.Context) (metrics []Metric) {
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil {
return
}
// Fetch disk space info, ignore errors
metrics = make([]Metric, 0, 10)
@ -3472,18 +3528,13 @@ func getClusterStorageMetrics() *MetricsGroup {
return mg
}
func getKMSNodeMetrics() *MetricsGroup {
func getKMSNodeMetrics(opts MetricsGroupOpts) *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 10 * time.Second,
metricsGroupOpts: opts,
}
mg.RegisterRead(func(ctx context.Context) (metrics []Metric) {
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil || GlobalKMS == nil {
return
}
const (
Online = 1
Offline = 0
@ -3577,18 +3628,13 @@ func getWebhookMetrics() *MetricsGroup {
return mg
}
func getKMSMetrics() *MetricsGroup {
func getKMSMetrics(opts MetricsGroupOpts) *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 10 * time.Second,
metricsGroupOpts: opts,
}
mg.RegisterRead(func(ctx context.Context) []Metric {
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil || GlobalKMS == nil {
return []Metric{}
}
metrics := make([]Metric, 0, 4)
metric, err := GlobalKMS.Metrics(ctx)
if err != nil {