2021-04-18 15:41:13 -04:00
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
2018-03-15 16:03:41 -04:00
package event
import (
2023-12-05 05:16:33 -05:00
"context"
2018-03-15 16:03:41 -04:00
"fmt"
2023-12-05 05:16:33 -05:00
"runtime"
2022-11-08 19:36:47 -05:00
"strings"
2018-03-15 16:03:41 -04:00
"sync"
2022-07-30 15:12:33 -04:00
"sync/atomic"
2023-10-07 11:07:38 -04:00
2023-12-05 05:16:33 -05:00
"github.com/minio/minio/internal/logger"
2023-10-07 11:07:38 -04:00
"github.com/minio/minio/internal/store"
2023-12-06 04:09:22 -05:00
"github.com/minio/pkg/v2/workers"
2022-07-30 15:12:33 -04:00
)
const (
// The maximum allowed number of concurrent Send() calls to all configured notifications targets
2023-12-05 05:16:33 -05:00
maxConcurrentAsyncSend = 50000
2018-03-15 16:03:41 -04:00
)
// Target - event target interface
type Target interface {
ID ( ) TargetID
2019-12-11 17:27:03 -05:00
IsActive ( ) ( bool , error )
2019-04-10 08:46:01 -04:00
Save ( Event ) error
2023-10-07 11:07:38 -04:00
SendFromStore ( store . Key ) error
2018-03-15 16:03:41 -04:00
Close ( ) error
2022-11-08 19:36:47 -05:00
Store ( ) TargetStore
}
// TargetStore is a shallow version of a target.Store
type TargetStore interface {
Len ( ) int
}
// TargetStats is a collection of stats for multiple targets.
type TargetStats struct {
// CurrentSendCalls is the number of concurrent async Send calls to all targets
2023-12-05 05:16:33 -05:00
CurrentSendCalls int64
TotalEvents int64
EventsSkipped int64
CurrentQueuedCalls int64
2022-11-08 19:36:47 -05:00
TargetStats map [ string ] TargetStat
}
// TargetStat is the stats of a single target.
type TargetStat struct {
ID TargetID
CurrentQueue int // Populated if target has a store.
2018-03-15 16:03:41 -04:00
}
// TargetList - holds list of targets indexed by target ID.
type TargetList struct {
2022-07-30 15:12:33 -04:00
// The number of concurrent async Send calls to all targets
2023-12-05 05:16:33 -05:00
currentSendCalls atomic . Int64
totalEvents atomic . Int64
eventsSkipped atomic . Int64
2022-07-30 15:12:33 -04:00
2018-03-15 16:03:41 -04:00
sync . RWMutex
targets map [ TargetID ] Target
2023-12-05 05:16:33 -05:00
queue chan asyncEvent
ctx context . Context
}
type asyncEvent struct {
ev Event
targetSet TargetIDSet
2018-03-15 16:03:41 -04:00
}
// Add - adds unique target to target list.
2019-11-09 12:27:23 -05:00
func ( list * TargetList ) Add ( targets ... Target ) error {
2018-03-15 16:03:41 -04:00
list . Lock ( )
defer list . Unlock ( )
2019-11-09 12:27:23 -05:00
for _ , target := range targets {
if _ , ok := list . targets [ target . ID ( ) ] ; ok {
return fmt . Errorf ( "target %v already exists" , target . ID ( ) )
}
list . targets [ target . ID ( ) ] = target
2018-03-15 16:03:41 -04:00
}
return nil
}
// Exists - checks whether target by target ID exists or not.
func ( list * TargetList ) Exists ( id TargetID ) bool {
list . RLock ( )
defer list . RUnlock ( )
_ , found := list . targets [ id ]
return found
}
2020-04-27 09:25:05 -04:00
// TargetIDResult returns result of Remove/Send operation, sets err if
// any for the associated TargetID
type TargetIDResult struct {
2018-05-09 18:11:51 -04:00
// ID where the remove or send were initiated.
ID TargetID
// Stores any error while removing a target or while sending an event.
Err error
}
2018-03-15 16:03:41 -04:00
// Remove - closes and removes targets by given target IDs.
2020-04-27 09:25:05 -04:00
func ( list * TargetList ) Remove ( targetIDSet TargetIDSet ) {
list . Lock ( )
defer list . Unlock ( )
2018-03-15 16:03:41 -04:00
2020-04-27 09:25:05 -04:00
for id := range targetIDSet {
target , ok := list . targets [ id ]
if ok {
target . Close ( )
2018-05-09 18:11:51 -04:00
delete ( list . targets , id )
}
2020-04-27 09:25:05 -04:00
}
2018-03-15 16:03:41 -04:00
}
2019-11-09 12:27:23 -05:00
// Targets - list all targets
func ( list * TargetList ) Targets ( ) [ ] Target {
2020-10-09 12:59:52 -04:00
if list == nil {
return [ ] Target { }
}
2019-11-09 12:27:23 -05:00
list . RLock ( )
defer list . RUnlock ( )
targets := [ ] Target { }
for _ , tgt := range list . targets {
targets = append ( targets , tgt )
}
return targets
}
2018-03-15 16:03:41 -04:00
// List - returns available target IDs.
func ( list * TargetList ) List ( ) [ ] TargetID {
list . RLock ( )
defer list . RUnlock ( )
keys := [ ] TargetID { }
for k := range list . targets {
keys = append ( keys , k )
}
return keys
}
2023-12-05 05:16:33 -05:00
func ( list * TargetList ) get ( id TargetID ) ( Target , bool ) {
list . RLock ( )
defer list . RUnlock ( )
target , ok := list . targets [ id ]
return target , ok
}
2019-12-11 17:27:03 -05:00
// TargetMap - returns available targets.
func ( list * TargetList ) TargetMap ( ) map [ TargetID ] Target {
list . RLock ( )
defer list . RUnlock ( )
2023-03-07 11:12:41 -05:00
ntargets := make ( map [ TargetID ] Target , len ( list . targets ) )
for k , v := range list . targets {
ntargets [ k ] = v
}
return ntargets
2019-12-11 17:27:03 -05:00
}
2018-03-15 16:03:41 -04:00
// Send - sends events to targets identified by target IDs.
2023-12-05 05:16:33 -05:00
func ( list * TargetList ) Send ( event Event , targetIDset TargetIDSet , sync bool ) {
if sync {
list . sendSync ( event , targetIDset )
} else {
list . sendAsync ( event , targetIDset )
2023-06-20 20:38:59 -04:00
}
2018-03-15 16:03:41 -04:00
}
2023-12-05 05:16:33 -05:00
func ( list * TargetList ) sendSync ( event Event , targetIDset TargetIDSet ) {
2023-06-20 20:38:59 -04:00
var wg sync . WaitGroup
for id := range targetIDset {
2023-12-05 05:16:33 -05:00
target , ok := list . get ( id )
if ! ok {
continue
2023-06-20 20:38:59 -04:00
}
2023-12-05 05:16:33 -05:00
wg . Add ( 1 )
go func ( id TargetID , target Target ) {
list . currentSendCalls . Add ( 1 )
defer list . currentSendCalls . Add ( - 1 )
defer wg . Done ( )
if err := target . Save ( event ) ; err != nil {
reqInfo := & logger . ReqInfo { }
reqInfo . AppendTags ( "targetID" , id . String ( ) )
logger . LogOnceIf ( logger . SetReqInfo ( context . Background ( ) , reqInfo ) , err , id . String ( ) )
}
} ( id , target )
2023-06-20 20:38:59 -04:00
}
wg . Wait ( )
2023-12-05 05:16:33 -05:00
list . totalEvents . Add ( 1 )
}
func ( list * TargetList ) sendAsync ( event Event , targetIDset TargetIDSet ) {
select {
case list . queue <- asyncEvent {
ev : event ,
targetSet : targetIDset . Clone ( ) ,
} :
case <- list . ctx . Done ( ) :
list . eventsSkipped . Add ( int64 ( len ( list . queue ) ) )
return
default :
list . eventsSkipped . Add ( 1 )
err := fmt . Errorf ( "concurrent target notifications exceeded %d, notification endpoint is too slow to accept events on incoming requests" , maxConcurrentAsyncSend )
for id := range targetIDset {
reqInfo := & logger . ReqInfo { }
reqInfo . AppendTags ( "targetID" , id . String ( ) )
logger . LogOnceIf ( logger . SetReqInfo ( context . Background ( ) , reqInfo ) , err , id . String ( ) )
}
return
}
2023-06-20 20:38:59 -04:00
}
2022-11-08 19:36:47 -05:00
// Stats returns stats for targets.
func ( list * TargetList ) Stats ( ) TargetStats {
t := TargetStats { }
if list == nil {
return t
}
2023-12-05 05:16:33 -05:00
t . CurrentSendCalls = list . currentSendCalls . Load ( )
t . EventsSkipped = list . eventsSkipped . Load ( )
t . TotalEvents = list . totalEvents . Load ( )
t . CurrentQueuedCalls = int64 ( len ( list . queue ) )
2023-10-12 18:39:22 -04:00
2022-11-08 19:36:47 -05:00
list . RLock ( )
defer list . RUnlock ( )
t . TargetStats = make ( map [ string ] TargetStat , len ( list . targets ) )
for id , target := range list . targets {
ts := TargetStat { ID : id }
if st := target . Store ( ) ; st != nil {
ts . CurrentQueue = st . Len ( )
}
t . TargetStats [ strings . ReplaceAll ( id . String ( ) , ":" , "_" ) ] = ts
}
return t
}
2023-12-05 05:16:33 -05:00
func ( list * TargetList ) startSendWorkers ( workerCount int ) {
if workerCount == 0 {
workerCount = runtime . GOMAXPROCS ( 0 )
}
wk , err := workers . New ( workerCount )
if err != nil {
panic ( err )
}
for i := 0 ; i < workerCount ; i ++ {
wk . Take ( )
go func ( ) {
defer wk . Give ( )
for {
select {
case av := <- list . queue :
list . sendSync ( av . ev , av . targetSet )
case <- list . ctx . Done ( ) :
return
}
}
} ( )
}
wk . Wait ( )
}
var startOnce sync . Once
// Init initialize target send workers.
func ( list * TargetList ) Init ( workers int ) * TargetList {
startOnce . Do ( func ( ) {
go list . startSendWorkers ( workers )
} )
return list
}
2018-03-15 16:03:41 -04:00
// NewTargetList - creates TargetList.
2023-12-05 05:16:33 -05:00
func NewTargetList ( ctx context . Context ) * TargetList {
list := & TargetList {
targets : make ( map [ TargetID ] Target ) ,
queue : make ( chan asyncEvent , maxConcurrentAsyncSend ) ,
ctx : ctx ,
}
return list
2018-03-15 16:03:41 -04:00
}