diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index c1793190a..56a71e449 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -58,6 +58,7 @@ func init() { getCacheMetrics(), getGoMetrics(), getHTTPMetrics(), + getNotificationMetrics(), getLocalStorageMetrics(), getMinioProcMetrics(), getMinioVersionMetrics(), @@ -83,6 +84,7 @@ func init() { getNetworkMetrics(), getMinioVersionMetrics(), getS3TTFBMetric(), + getNotificationMetrics(), }) clusterCollector = newMinioClusterCollector(allMetricsGroups) } @@ -127,6 +129,7 @@ const ( scannerSubsystem MetricSubsystem = "scanner" iamSubsystem MetricSubsystem = "iam" kmsSubsystem MetricSubsystem = "kms" + notifySubsystem MetricSubsystem = "notify" ) // MetricName are the individual names for the metric. @@ -1536,6 +1539,39 @@ func getCacheMetrics() *MetricsGroup { return mg } +func getNotificationMetrics() *MetricsGroup { + mg := &MetricsGroup{} + mg.RegisterRead(func(ctx context.Context) []Metric { + stats := globalConfigTargetList.Stats() + metrics := make([]Metric, 0, 1+len(stats.TargetStats)) + metrics = append(metrics, Metric{ + Description: MetricDescription{ + Namespace: minioNamespace, + Subsystem: notifySubsystem, + Name: "current_send_in_progress", + Help: "Number of concurrent async Send calls active to all targets", + Type: gaugeMetric, + }, + Value: float64(stats.CurrentSendCalls), + }) + for _, st := range stats.TargetStats { + metrics = append(metrics, Metric{ + Description: MetricDescription{ + Namespace: minioNamespace, + Subsystem: notifySubsystem, + Name: "target_queue_length", + Help: "Number of unsent notifications in queue for target", + Type: gaugeMetric, + }, + VariableLabels: map[string]string{"target_id": st.ID.ID, "target_name": st.ID.Name}, + Value: float64(st.CurrentQueue), + }) + } + return metrics + }) + return mg +} + func getHTTPMetrics() *MetricsGroup { mg := &MetricsGroup{} mg.RegisterRead(func(ctx context.Context) (metrics []Metric) { diff --git a/internal/event/target/amqp.go b/internal/event/target/amqp.go index 357e7a1bc..b9fa9997c 100644 --- a/internal/event/target/amqp.go +++ b/internal/event/target/amqp.go @@ -128,6 +128,11 @@ func (target *AMQPTarget) ID() event.TargetID { return target.id } +// Store returns any underlying store if set. +func (target *AMQPTarget) Store() event.TargetStore { + return target.store +} + // IsActive - Return true if target is up and active func (target *AMQPTarget) IsActive() (bool, error) { if err := target.init(); err != nil { diff --git a/internal/event/target/elasticsearch.go b/internal/event/target/elasticsearch.go index 0a0a31dae..80a9b81e1 100644 --- a/internal/event/target/elasticsearch.go +++ b/internal/event/target/elasticsearch.go @@ -167,6 +167,11 @@ func (target *ElasticsearchTarget) ID() event.TargetID { return target.id } +// Store returns any underlying store if set. +func (target *ElasticsearchTarget) Store() event.TargetStore { + return target.store +} + // IsActive - Return true if target is up and active func (target *ElasticsearchTarget) IsActive() (bool, error) { if err := target.init(); err != nil { diff --git a/internal/event/target/kafka.go b/internal/event/target/kafka.go index 06adbfcb2..4e181b68f 100644 --- a/internal/event/target/kafka.go +++ b/internal/event/target/kafka.go @@ -33,7 +33,7 @@ import ( "github.com/minio/minio/internal/logger" xnet "github.com/minio/pkg/net" - sarama "github.com/Shopify/sarama" + "github.com/Shopify/sarama" saramatls "github.com/Shopify/sarama/tools/tls" ) @@ -139,6 +139,11 @@ func (target *KafkaTarget) ID() event.TargetID { return target.id } +// Store returns any underlying store if set. +func (target *KafkaTarget) Store() event.TargetStore { + return target.store +} + // IsActive - Return true if target is up and active func (target *KafkaTarget) IsActive() (bool, error) { if err := target.init(); err != nil { diff --git a/internal/event/target/mqtt.go b/internal/event/target/mqtt.go index 960c48b77..4f83f55c7 100644 --- a/internal/event/target/mqtt.go +++ b/internal/event/target/mqtt.go @@ -121,6 +121,11 @@ func (target *MQTTTarget) ID() event.TargetID { return target.id } +// Store returns any underlying store if set. +func (target *MQTTTarget) Store() event.TargetStore { + return target.store +} + // IsActive - Return true if target is up and active func (target *MQTTTarget) IsActive() (bool, error) { if err := target.init(); err != nil { diff --git a/internal/event/target/mysql.go b/internal/event/target/mysql.go index 8e2fb31d9..7b985e7d5 100644 --- a/internal/event/target/mysql.go +++ b/internal/event/target/mysql.go @@ -165,6 +165,11 @@ func (target *MySQLTarget) ID() event.TargetID { return target.id } +// Store returns any underlying store if set. +func (target *MySQLTarget) Store() event.TargetStore { + return target.store +} + // IsActive - Return true if target is up and active func (target *MySQLTarget) IsActive() (bool, error) { if err := target.init(); err != nil { diff --git a/internal/event/target/nats.go b/internal/event/target/nats.go index 4c9f0b1db..fef12fe26 100644 --- a/internal/event/target/nats.go +++ b/internal/event/target/nats.go @@ -230,6 +230,11 @@ func (target *NATSTarget) ID() event.TargetID { return target.id } +// Store returns any underlying store if set. +func (target *NATSTarget) Store() event.TargetStore { + return target.store +} + // IsActive - Return true if target is up and active func (target *NATSTarget) IsActive() (bool, error) { if err := target.init(); err != nil { diff --git a/internal/event/target/nsq.go b/internal/event/target/nsq.go index 261958def..395d57a1d 100644 --- a/internal/event/target/nsq.go +++ b/internal/event/target/nsq.go @@ -105,6 +105,11 @@ func (target *NSQTarget) ID() event.TargetID { return target.id } +// Store returns any underlying store if set. +func (target *NSQTarget) Store() event.TargetStore { + return target.store +} + // IsActive - Return true if target is up and active func (target *NSQTarget) IsActive() (bool, error) { if err := target.init(); err != nil { diff --git a/internal/event/target/postgresql.go b/internal/event/target/postgresql.go index a74b52ac1..02d5d98b4 100644 --- a/internal/event/target/postgresql.go +++ b/internal/event/target/postgresql.go @@ -157,6 +157,11 @@ func (target *PostgreSQLTarget) ID() event.TargetID { return target.id } +// Store returns any underlying store if set. +func (target *PostgreSQLTarget) Store() event.TargetStore { + return target.store +} + // IsActive - Return true if target is up and active func (target *PostgreSQLTarget) IsActive() (bool, error) { if err := target.init(); err != nil { diff --git a/internal/event/target/queuestore.go b/internal/event/target/queuestore.go index a2df8504f..38d9f8922 100644 --- a/internal/event/target/queuestore.go +++ b/internal/event/target/queuestore.go @@ -19,11 +19,12 @@ package target import ( "encoding/json" - "math" "os" "path/filepath" "sort" + "strings" "sync" + "time" "github.com/minio/minio/internal/event" ) @@ -36,9 +37,10 @@ const ( // QueueStore - Filestore for persisting events. type QueueStore struct { sync.RWMutex - currentEntries uint64 - entryLimit uint64 - directory string + entryLimit uint64 + directory string + + entries map[string]int64 // key -> modtime as unix nano } // NewQueueStore - Creates an instance for QueueStore. @@ -50,6 +52,7 @@ func NewQueueStore(directory string, limit uint64) Store { return &QueueStore{ directory: directory, entryLimit: limit, + entries: make(map[string]int64, limit), } } @@ -62,17 +65,24 @@ func (store *QueueStore) Open() error { return err } - names, err := store.list() + files, err := store.list() if err != nil { return err } - currentEntries := uint64(len(names)) - if currentEntries >= store.entryLimit { - return errLimitExceeded + // Truncate entries. + if uint64(len(files)) > store.entryLimit { + files = files[:store.entryLimit] + } + for _, file := range files { + if file.IsDir() { + continue + } + key := strings.TrimSuffix(file.Name(), eventExt) + if fi, err := file.Info(); err == nil { + store.entries[key] = fi.ModTime().UnixNano() + } } - - store.currentEntries = currentEntries return nil } @@ -91,7 +101,7 @@ func (store *QueueStore) write(key string, e event.Event) error { } // Increment the event count. - store.currentEntries++ + store.entries[key] = time.Now().UnixNano() return nil } @@ -100,7 +110,7 @@ func (store *QueueStore) write(key string, e event.Event) error { func (store *QueueStore) Put(e event.Event) error { store.Lock() defer store.Unlock() - if store.currentEntries >= store.entryLimit { + if uint64(len(store.entries)) >= store.entryLimit { return errLimitExceeded } key, err := getNewUUID() @@ -146,44 +156,51 @@ func (store *QueueStore) Del(key string) error { return store.del(key) } +// Len returns the entry count. +func (store *QueueStore) Len() int { + store.RLock() + l := len(store.entries) + defer store.RUnlock() + return l +} + // lockless call func (store *QueueStore) del(key string) error { - if err := os.Remove(filepath.Join(store.directory, key+eventExt)); err != nil { - return err - } + err := os.Remove(filepath.Join(store.directory, key+eventExt)) - // Decrement the current entries count. - store.currentEntries-- + // Delete as entry no matter the result + delete(store.entries, key) - // Current entries can underflow, when multiple - // events are being pushed in parallel, this code - // is needed to ensure that we don't underflow. - // - // queueStore replayEvents is not serialized, - // this code is needed to protect us under - // such situations. - if store.currentEntries == math.MaxUint64 { - store.currentEntries = 0 - } - return nil + return err } -// List - lists all files from the directory. +// List - lists all files registered in the store. func (store *QueueStore) List() ([]string, error) { store.RLock() - defer store.RUnlock() - return store.list() -} - -// list lock less. -func (store *QueueStore) list() ([]string, error) { - var names []string - files, err := os.ReadDir(store.directory) - if err != nil { - return names, err + l := make([]string, 0, len(store.entries)) + for k := range store.entries { + l = append(l, k) } - // Sort the dentries. + // Sort entries... + sort.Slice(l, func(i, j int) bool { + return store.entries[l[i]] < store.entries[l[j]] + }) + store.RUnlock() + + return l, nil +} + +// list will read all entries from disk. +// Entries are returned sorted by modtime, oldest first. +// Underlying entry list in store is *not* updated. +func (store *QueueStore) list() ([]os.DirEntry, error) { + files, err := os.ReadDir(store.directory) + if err != nil { + return nil, err + } + + // Sort the entries. sort.Slice(files, func(i, j int) bool { ii, err := files[i].Info() if err != nil { @@ -196,9 +213,5 @@ func (store *QueueStore) list() ([]string, error) { return ii.ModTime().Before(ji.ModTime()) }) - for _, file := range files { - names = append(names, file.Name()) - } - - return names, nil + return files, nil } diff --git a/internal/event/target/queuestore_test.go b/internal/event/target/queuestore_test.go index f82395a71..7c5637d90 100644 --- a/internal/event/target/queuestore_test.go +++ b/internal/event/target/queuestore_test.go @@ -202,12 +202,33 @@ func TestQueueStoreListN(t *testing.T) { t.Fatalf("List() Expected: 10, got %d", len(names)) } - if err = os.RemoveAll(queueDir); err != nil { + // re-open + store, err = setUpStore(queueDir, 10) + if err != nil { + t.Fatal("Failed to create a queue store ", err) + } + names, err = store.List() + if err != nil { t.Fatal(err) } - _, err = store.List() - if !os.IsNotExist(err) { - t.Fatalf("Expected List() to fail with os.ErrNotExist, %s", err) + if len(names) != 10 { + t.Fatalf("List() Expected: 10, got %d", len(names)) + } + if len(names) != store.Len() { + t.Fatalf("List() Expected: 10, got %d", len(names)) + } + + // Delete all + for _, key := range names { + err := store.Del(key) + if err != nil { + t.Fatal(err) + } + } + // Re-list + lst, err := store.List() + if len(lst) > 0 || err != nil { + t.Fatalf("Expected List() to return empty list and no error, got %v err: %v", lst, err) } } diff --git a/internal/event/target/redis.go b/internal/event/target/redis.go index d448d0d9b..cfc15a132 100644 --- a/internal/event/target/redis.go +++ b/internal/event/target/redis.go @@ -133,6 +133,11 @@ func (target *RedisTarget) ID() event.TargetID { return target.id } +// Store returns any underlying store if set. +func (target *RedisTarget) Store() event.TargetStore { + return target.store +} + // IsActive - Return true if target is up and active func (target *RedisTarget) IsActive() (bool, error) { if err := target.init(); err != nil { diff --git a/internal/event/target/store.go b/internal/event/target/store.go index 07733fcea..199c21a9e 100644 --- a/internal/event/target/store.go +++ b/internal/event/target/store.go @@ -41,6 +41,7 @@ var errLimitExceeded = errors.New("the maximum store limit reached") type Store interface { Put(event event.Event) error Get(key string) (event.Event, error) + Len() int List() ([]string, error) Del(key string) error Open() error diff --git a/internal/event/target/webhook.go b/internal/event/target/webhook.go index c2156bf08..9b16893ad 100644 --- a/internal/event/target/webhook.go +++ b/internal/event/target/webhook.go @@ -115,6 +115,11 @@ func (target *WebhookTarget) IsActive() (bool, error) { return target.isActive() } +// Store returns any underlying store if set. +func (target *WebhookTarget) Store() event.TargetStore { + return target.store +} + func (target *WebhookTarget) isActive() (bool, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() diff --git a/internal/event/targetlist.go b/internal/event/targetlist.go index b998ee2dc..f9988e897 100644 --- a/internal/event/targetlist.go +++ b/internal/event/targetlist.go @@ -19,6 +19,7 @@ package event import ( "fmt" + "strings" "sync" "sync/atomic" ) @@ -35,6 +36,26 @@ type Target interface { Save(Event) error Send(string) error Close() error + Store() TargetStore +} + +// TargetStore is a shallow version of a target.Store +type TargetStore interface { + Len() int +} + +// TargetStats is a collection of stats for multiple targets. +type TargetStats struct { + // CurrentSendCalls is the number of concurrent async Send calls to all targets + CurrentSendCalls int64 + + TargetStats map[string]TargetStat +} + +// TargetStat is the stats of a single target. +type TargetStat struct { + ID TargetID + CurrentQueue int // Populated if target has a store. } // TargetList - holds list of targets indexed by target ID. @@ -166,6 +187,26 @@ func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- }() } +// Stats returns stats for targets. +func (list *TargetList) Stats() TargetStats { + t := TargetStats{} + if list == nil { + return t + } + t.CurrentSendCalls = atomic.LoadInt64(&list.currentSendCalls) + list.RLock() + defer list.RUnlock() + t.TargetStats = make(map[string]TargetStat, len(list.targets)) + for id, target := range list.targets { + ts := TargetStat{ID: id} + if st := target.Store(); st != nil { + ts.CurrentQueue = st.Len() + } + t.TargetStats[strings.ReplaceAll(id.String(), ":", "_")] = ts + } + return t +} + // NewTargetList - creates TargetList. func NewTargetList() *TargetList { return &TargetList{targets: make(map[TargetID]Target)} diff --git a/internal/event/targetlist_test.go b/internal/event/targetlist_test.go index 1958611ff..07e85594b 100644 --- a/internal/event/targetlist_test.go +++ b/internal/event/targetlist_test.go @@ -40,6 +40,11 @@ func (target ExampleTarget) Save(eventData Event) error { return target.send(eventData) } +// Store - Returns a nil store. +func (target ExampleTarget) Store() TargetStore { + return nil +} + func (target ExampleTarget) send(eventData Event) error { b := make([]byte, 1) if _, err := rand.Read(b); err != nil {