2021-04-18 12:41:13 -07:00
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
2018-03-16 01:33:41 +05:30
package event
import (
2023-12-05 02:16:33 -08:00
"context"
2018-03-16 01:33:41 +05:30
"fmt"
2023-12-05 02:16:33 -08:00
"runtime"
2018-03-16 01:33:41 +05:30
"sync"
2022-07-30 20:12:33 +01:00
"sync/atomic"
2023-10-07 20:37:38 +05:30
2023-12-05 02:16:33 -08:00
"github.com/minio/minio/internal/logger"
2023-10-07 20:37:38 +05:30
"github.com/minio/minio/internal/store"
2023-12-06 01:09:22 -08:00
"github.com/minio/pkg/v2/workers"
2022-07-30 20:12:33 +01:00
)
const (
// The maximum allowed number of concurrent Send() calls to all configured notifications targets
2023-12-05 02:16:33 -08:00
maxConcurrentAsyncSend = 50000
2018-03-16 01:33:41 +05:30
)
// Target - event target interface
type Target interface {
ID ( ) TargetID
2019-12-12 03:57:03 +05:30
IsActive ( ) ( bool , error )
2019-04-10 18:16:01 +05:30
Save ( Event ) error
2023-10-07 20:37:38 +05:30
SendFromStore ( store . Key ) error
2018-03-16 01:33:41 +05:30
Close ( ) error
2022-11-09 01:36:47 +01:00
Store ( ) TargetStore
}
// TargetStore is a shallow version of a target.Store
type TargetStore interface {
Len ( ) int
}
2023-12-14 22:39:26 +05:30
// Stats is a collection of stats for multiple targets.
type Stats struct {
TotalEvents int64 // Deprecated
2023-12-05 02:16:33 -08:00
EventsSkipped int64
2023-12-14 22:39:26 +05:30
CurrentQueuedCalls int64 // Deprecated
EventsErrorsTotal int64 // Deprecated
CurrentSendCalls int64 // Deprecated
2022-11-09 01:36:47 +01:00
2023-12-14 22:39:26 +05:30
TargetStats map [ TargetID ] TargetStat
2022-11-09 01:36:47 +01:00
}
// TargetStat is the stats of a single target.
type TargetStat struct {
2023-12-14 22:39:26 +05:30
CurrentSendCalls int64 // CurrentSendCalls is the number of concurrent async Send calls to all targets
CurrentQueue int // Populated if target has a store.
TotalEvents int64
FailedEvents int64 // Number of failed events per target
2018-03-16 01:33:41 +05:30
}
// TargetList - holds list of targets indexed by target ID.
type TargetList struct {
2022-07-30 20:12:33 +01:00
// The number of concurrent async Send calls to all targets
2023-12-08 08:21:17 +08:00
currentSendCalls atomic . Int64
totalEvents atomic . Int64
eventsSkipped atomic . Int64
eventsErrorsTotal atomic . Int64
2022-07-30 20:12:33 +01:00
2018-03-16 01:33:41 +05:30
sync . RWMutex
targets map [ TargetID ] Target
2023-12-05 02:16:33 -08:00
queue chan asyncEvent
ctx context . Context
2023-12-14 22:39:26 +05:30
statLock sync . RWMutex
targetStats map [ TargetID ] targetStat
}
type targetStat struct {
// The number of concurrent async Send calls per targets
currentSendCalls int64
// The number of total events per target
totalEvents int64
// The number of failed events per target
failedEvents int64
}
func ( list * TargetList ) getStatsByTargetID ( id TargetID ) ( stat targetStat ) {
list . statLock . RLock ( )
defer list . statLock . RUnlock ( )
return list . targetStats [ id ]
}
func ( list * TargetList ) incCurrentSendCalls ( id TargetID ) {
list . statLock . Lock ( )
defer list . statLock . Unlock ( )
stats , ok := list . targetStats [ id ]
if ! ok {
stats = targetStat { }
}
stats . currentSendCalls ++
list . targetStats [ id ] = stats
return
}
func ( list * TargetList ) decCurrentSendCalls ( id TargetID ) {
list . statLock . Lock ( )
defer list . statLock . Unlock ( )
stats , ok := list . targetStats [ id ]
if ! ok {
// should not happen
return
}
stats . currentSendCalls --
list . targetStats [ id ] = stats
return
}
func ( list * TargetList ) incFailedEvents ( id TargetID ) {
list . statLock . Lock ( )
defer list . statLock . Unlock ( )
stats , ok := list . targetStats [ id ]
if ! ok {
stats = targetStat { }
}
stats . failedEvents ++
list . targetStats [ id ] = stats
return
}
func ( list * TargetList ) incTotalEvents ( id TargetID ) {
list . statLock . Lock ( )
defer list . statLock . Unlock ( )
stats , ok := list . targetStats [ id ]
if ! ok {
stats = targetStat { }
}
stats . totalEvents ++
list . targetStats [ id ] = stats
return
2023-12-05 02:16:33 -08:00
}
type asyncEvent struct {
ev Event
targetSet TargetIDSet
2018-03-16 01:33:41 +05:30
}
// Add - adds unique target to target list.
2019-11-09 09:27:23 -08:00
func ( list * TargetList ) Add ( targets ... Target ) error {
2018-03-16 01:33:41 +05:30
list . Lock ( )
defer list . Unlock ( )
2019-11-09 09:27:23 -08:00
for _ , target := range targets {
if _ , ok := list . targets [ target . ID ( ) ] ; ok {
return fmt . Errorf ( "target %v already exists" , target . ID ( ) )
}
list . targets [ target . ID ( ) ] = target
2018-03-16 01:33:41 +05:30
}
return nil
}
// Exists - checks whether target by target ID exists or not.
func ( list * TargetList ) Exists ( id TargetID ) bool {
list . RLock ( )
defer list . RUnlock ( )
_ , found := list . targets [ id ]
return found
}
2020-04-27 06:25:05 -07:00
// TargetIDResult returns result of Remove/Send operation, sets err if
// any for the associated TargetID
type TargetIDResult struct {
2018-05-09 15:11:51 -07:00
// ID where the remove or send were initiated.
ID TargetID
// Stores any error while removing a target or while sending an event.
Err error
}
2018-03-16 01:33:41 +05:30
// Remove - closes and removes targets by given target IDs.
2020-04-27 06:25:05 -07:00
func ( list * TargetList ) Remove ( targetIDSet TargetIDSet ) {
list . Lock ( )
defer list . Unlock ( )
2018-03-16 01:33:41 +05:30
2020-04-27 06:25:05 -07:00
for id := range targetIDSet {
target , ok := list . targets [ id ]
if ok {
target . Close ( )
2018-05-09 15:11:51 -07:00
delete ( list . targets , id )
}
2020-04-27 06:25:05 -07:00
}
2018-03-16 01:33:41 +05:30
}
2019-11-09 09:27:23 -08:00
// Targets - list all targets
func ( list * TargetList ) Targets ( ) [ ] Target {
2020-10-09 09:59:52 -07:00
if list == nil {
return [ ] Target { }
}
2019-11-09 09:27:23 -08:00
list . RLock ( )
defer list . RUnlock ( )
targets := [ ] Target { }
for _ , tgt := range list . targets {
targets = append ( targets , tgt )
}
return targets
}
2018-03-16 01:33:41 +05:30
// List - returns available target IDs.
func ( list * TargetList ) List ( ) [ ] TargetID {
list . RLock ( )
defer list . RUnlock ( )
keys := [ ] TargetID { }
for k := range list . targets {
keys = append ( keys , k )
}
return keys
}
2023-12-05 02:16:33 -08:00
func ( list * TargetList ) get ( id TargetID ) ( Target , bool ) {
list . RLock ( )
defer list . RUnlock ( )
target , ok := list . targets [ id ]
return target , ok
}
2019-12-12 03:57:03 +05:30
// TargetMap - returns available targets.
func ( list * TargetList ) TargetMap ( ) map [ TargetID ] Target {
list . RLock ( )
defer list . RUnlock ( )
2023-03-07 08:12:41 -08:00
ntargets := make ( map [ TargetID ] Target , len ( list . targets ) )
for k , v := range list . targets {
ntargets [ k ] = v
}
return ntargets
2019-12-12 03:57:03 +05:30
}
2018-03-16 01:33:41 +05:30
// Send - sends events to targets identified by target IDs.
2023-12-05 02:16:33 -08:00
func ( list * TargetList ) Send ( event Event , targetIDset TargetIDSet , sync bool ) {
if sync {
list . sendSync ( event , targetIDset )
} else {
list . sendAsync ( event , targetIDset )
2023-06-21 06:08:59 +05:30
}
2018-03-16 01:33:41 +05:30
}
2023-12-05 02:16:33 -08:00
func ( list * TargetList ) sendSync ( event Event , targetIDset TargetIDSet ) {
2023-06-21 06:08:59 +05:30
var wg sync . WaitGroup
for id := range targetIDset {
2023-12-05 02:16:33 -08:00
target , ok := list . get ( id )
if ! ok {
continue
2023-06-21 06:08:59 +05:30
}
2023-12-05 02:16:33 -08:00
wg . Add ( 1 )
go func ( id TargetID , target Target ) {
list . currentSendCalls . Add ( 1 )
2023-12-14 22:39:26 +05:30
list . incCurrentSendCalls ( id )
list . incTotalEvents ( id )
defer list . decCurrentSendCalls ( id )
2023-12-05 02:16:33 -08:00
defer list . currentSendCalls . Add ( - 1 )
defer wg . Done ( )
if err := target . Save ( event ) ; err != nil {
2023-12-08 08:21:17 +08:00
list . eventsErrorsTotal . Add ( 1 )
2023-12-14 22:39:26 +05:30
list . incFailedEvents ( id )
2023-12-05 02:16:33 -08:00
reqInfo := & logger . ReqInfo { }
reqInfo . AppendTags ( "targetID" , id . String ( ) )
logger . LogOnceIf ( logger . SetReqInfo ( context . Background ( ) , reqInfo ) , err , id . String ( ) )
}
} ( id , target )
2023-06-21 06:08:59 +05:30
}
wg . Wait ( )
2023-12-05 02:16:33 -08:00
list . totalEvents . Add ( 1 )
}
func ( list * TargetList ) sendAsync ( event Event , targetIDset TargetIDSet ) {
select {
case list . queue <- asyncEvent {
ev : event ,
targetSet : targetIDset . Clone ( ) ,
} :
case <- list . ctx . Done ( ) :
list . eventsSkipped . Add ( int64 ( len ( list . queue ) ) )
return
default :
list . eventsSkipped . Add ( 1 )
err := fmt . Errorf ( "concurrent target notifications exceeded %d, notification endpoint is too slow to accept events on incoming requests" , maxConcurrentAsyncSend )
for id := range targetIDset {
reqInfo := & logger . ReqInfo { }
reqInfo . AppendTags ( "targetID" , id . String ( ) )
logger . LogOnceIf ( logger . SetReqInfo ( context . Background ( ) , reqInfo ) , err , id . String ( ) )
}
return
}
2023-06-21 06:08:59 +05:30
}
2022-11-09 01:36:47 +01:00
// Stats returns stats for targets.
2023-12-14 22:39:26 +05:30
func ( list * TargetList ) Stats ( ) Stats {
t := Stats { }
2022-11-09 01:36:47 +01:00
if list == nil {
return t
}
2023-12-05 02:16:33 -08:00
t . CurrentSendCalls = list . currentSendCalls . Load ( )
t . EventsSkipped = list . eventsSkipped . Load ( )
t . TotalEvents = list . totalEvents . Load ( )
t . CurrentQueuedCalls = int64 ( len ( list . queue ) )
2023-12-08 08:21:17 +08:00
t . EventsErrorsTotal = list . eventsErrorsTotal . Load ( )
2023-10-12 15:39:22 -07:00
2022-11-09 01:36:47 +01:00
list . RLock ( )
defer list . RUnlock ( )
2023-12-14 22:39:26 +05:30
t . TargetStats = make ( map [ TargetID ] TargetStat , len ( list . targets ) )
2022-11-09 01:36:47 +01:00
for id , target := range list . targets {
2023-12-14 22:39:26 +05:30
var currentQueue int
2022-11-09 01:36:47 +01:00
if st := target . Store ( ) ; st != nil {
2023-12-14 22:39:26 +05:30
currentQueue = st . Len ( )
}
stats := list . getStatsByTargetID ( id )
t . TargetStats [ id ] = TargetStat {
CurrentSendCalls : stats . currentSendCalls ,
CurrentQueue : currentQueue ,
FailedEvents : stats . failedEvents ,
TotalEvents : stats . totalEvents ,
2022-11-09 01:36:47 +01:00
}
}
2023-12-14 22:39:26 +05:30
2022-11-09 01:36:47 +01:00
return t
}
2023-12-05 02:16:33 -08:00
func ( list * TargetList ) startSendWorkers ( workerCount int ) {
if workerCount == 0 {
workerCount = runtime . GOMAXPROCS ( 0 )
}
wk , err := workers . New ( workerCount )
if err != nil {
panic ( err )
}
for i := 0 ; i < workerCount ; i ++ {
wk . Take ( )
go func ( ) {
defer wk . Give ( )
for {
select {
case av := <- list . queue :
list . sendSync ( av . ev , av . targetSet )
case <- list . ctx . Done ( ) :
return
}
}
} ( )
}
wk . Wait ( )
}
var startOnce sync . Once
// Init initialize target send workers.
func ( list * TargetList ) Init ( workers int ) * TargetList {
startOnce . Do ( func ( ) {
go list . startSendWorkers ( workers )
} )
return list
}
2018-03-16 01:33:41 +05:30
// NewTargetList - creates TargetList.
2023-12-05 02:16:33 -08:00
func NewTargetList ( ctx context . Context ) * TargetList {
list := & TargetList {
2023-12-14 22:39:26 +05:30
targets : make ( map [ TargetID ] Target ) ,
queue : make ( chan asyncEvent , maxConcurrentAsyncSend ) ,
targetStats : make ( map [ TargetID ] targetStat ) ,
ctx : ctx ,
2023-12-05 02:16:33 -08:00
}
return list
2018-03-16 01:33:41 +05:30
}