mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
Add an option to make bucket notifications synchronous (#17406)
With the current asynchronous behaviour in sending notification events to the targets, we can't provide guaranteed delivery as the systems might go for restarts. For such event-driven use-cases, we can provide an option to enable synchronous events where the APIs wait until the event is successfully sent or persisted. This commit adds 'MINIO_API_SYNC_EVENTS' env which when set to 'on' will enable sending/persisting events to targets synchronously.
This commit is contained in:
parent
02c2ec3027
commit
7c72b25ef0
@ -35,20 +35,18 @@ import (
|
|||||||
// EventNotifier - notifies external systems about events in MinIO.
|
// EventNotifier - notifies external systems about events in MinIO.
|
||||||
type EventNotifier struct {
|
type EventNotifier struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
targetList *event.TargetList
|
targetList *event.TargetList
|
||||||
targetResCh chan event.TargetIDResult
|
targetResCh chan event.TargetIDResult
|
||||||
bucketRulesMap map[string]event.RulesMap
|
bucketRulesMap map[string]event.RulesMap
|
||||||
bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEventNotifier - creates new event notification object.
|
// NewEventNotifier - creates new event notification object.
|
||||||
func NewEventNotifier() *EventNotifier {
|
func NewEventNotifier() *EventNotifier {
|
||||||
// targetList/bucketRulesMap/bucketRemoteTargetRulesMap are populated by NotificationSys.InitBucketTargets()
|
// targetList/bucketRulesMap/bucketRemoteTargetRulesMap are populated by NotificationSys.InitBucketTargets()
|
||||||
return &EventNotifier{
|
return &EventNotifier{
|
||||||
targetList: event.NewTargetList(),
|
targetList: event.NewTargetList(),
|
||||||
targetResCh: make(chan event.TargetIDResult),
|
targetResCh: make(chan event.TargetIDResult),
|
||||||
bucketRulesMap: make(map[string]event.RulesMap),
|
bucketRulesMap: make(map[string]event.RulesMap),
|
||||||
bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -122,10 +120,6 @@ func (evnot *EventNotifier) AddRulesMap(bucketName string, rulesMap event.RulesM
|
|||||||
|
|
||||||
rulesMap = rulesMap.Clone()
|
rulesMap = rulesMap.Clone()
|
||||||
|
|
||||||
for _, targetRulesMap := range evnot.bucketRemoteTargetRulesMap[bucketName] {
|
|
||||||
rulesMap.Add(targetRulesMap)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Do not add for an empty rulesMap.
|
// Do not add for an empty rulesMap.
|
||||||
if len(rulesMap) == 0 {
|
if len(rulesMap) == 0 {
|
||||||
delete(evnot.bucketRulesMap, bucketName)
|
delete(evnot.bucketRulesMap, bucketName)
|
||||||
@ -174,29 +168,18 @@ func (evnot *EventNotifier) RemoveNotification(bucketName string) {
|
|||||||
defer evnot.Unlock()
|
defer evnot.Unlock()
|
||||||
|
|
||||||
delete(evnot.bucketRulesMap, bucketName)
|
delete(evnot.bucketRulesMap, bucketName)
|
||||||
|
|
||||||
targetIDSet := event.NewTargetIDSet()
|
|
||||||
for targetID := range evnot.bucketRemoteTargetRulesMap[bucketName] {
|
|
||||||
targetIDSet[targetID] = struct{}{}
|
|
||||||
delete(evnot.bucketRemoteTargetRulesMap[bucketName], targetID)
|
|
||||||
}
|
|
||||||
evnot.targetList.Remove(targetIDSet)
|
|
||||||
|
|
||||||
delete(evnot.bucketRemoteTargetRulesMap, bucketName)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveAllRemoteTargets - closes and removes all notification targets.
|
// RemoveAllBucketTargets - closes and removes all notification targets.
|
||||||
func (evnot *EventNotifier) RemoveAllRemoteTargets() {
|
func (evnot *EventNotifier) RemoveAllBucketTargets() {
|
||||||
evnot.Lock()
|
evnot.Lock()
|
||||||
defer evnot.Unlock()
|
defer evnot.Unlock()
|
||||||
|
|
||||||
for _, targetMap := range evnot.bucketRemoteTargetRulesMap {
|
targetIDSet := event.NewTargetIDSet()
|
||||||
targetIDSet := event.NewTargetIDSet()
|
for k := range evnot.targetList.TargetMap() {
|
||||||
for k := range targetMap {
|
targetIDSet[k] = struct{}{}
|
||||||
targetIDSet[k] = struct{}{}
|
|
||||||
}
|
|
||||||
evnot.targetList.Remove(targetIDSet)
|
|
||||||
}
|
}
|
||||||
|
evnot.targetList.Remove(targetIDSet)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send - sends the event to all registered notification targets
|
// Send - sends the event to all registered notification targets
|
||||||
@ -209,7 +192,8 @@ func (evnot *EventNotifier) Send(args eventArgs) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
evnot.targetList.Send(args.ToEvent(true), targetIDSet, evnot.targetResCh)
|
// If MINIO_API_SYNC_EVENTS is set, send events synchronously.
|
||||||
|
evnot.targetList.Send(args.ToEvent(true), targetIDSet, evnot.targetResCh, globalAPIConfig.isSyncEventsEnabled())
|
||||||
}
|
}
|
||||||
|
|
||||||
type eventArgs struct {
|
type eventArgs struct {
|
||||||
|
@ -52,6 +52,7 @@ type apiConfig struct {
|
|||||||
disableODirect bool
|
disableODirect bool
|
||||||
gzipObjects bool
|
gzipObjects bool
|
||||||
rootAccess bool
|
rootAccess bool
|
||||||
|
syncEvents bool
|
||||||
}
|
}
|
||||||
|
|
||||||
const cgroupLimitFile = "/sys/fs/cgroup/memory/memory.limit_in_bytes"
|
const cgroupLimitFile = "/sys/fs/cgroup/memory/memory.limit_in_bytes"
|
||||||
@ -166,6 +167,7 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) {
|
|||||||
t.disableODirect = cfg.DisableODirect
|
t.disableODirect = cfg.DisableODirect
|
||||||
t.gzipObjects = cfg.GzipObjects
|
t.gzipObjects = cfg.GzipObjects
|
||||||
t.rootAccess = cfg.RootAccess
|
t.rootAccess = cfg.RootAccess
|
||||||
|
t.syncEvents = cfg.SyncEvents
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *apiConfig) isDisableODirect() bool {
|
func (t *apiConfig) isDisableODirect() bool {
|
||||||
@ -353,3 +355,10 @@ func (t *apiConfig) getTransitionWorkers() int {
|
|||||||
|
|
||||||
return t.transitionWorkers
|
return t.transitionWorkers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *apiConfig) isSyncEventsEnabled() bool {
|
||||||
|
t.mu.RLock()
|
||||||
|
defer t.mu.RUnlock()
|
||||||
|
|
||||||
|
return t.syncEvents
|
||||||
|
}
|
||||||
|
@ -1032,7 +1032,6 @@ func (sys *NotificationSys) GetPeerOnlineCount() (nodesOnline, nodesOffline int)
|
|||||||
|
|
||||||
// NewNotificationSys - creates new notification system object.
|
// NewNotificationSys - creates new notification system object.
|
||||||
func NewNotificationSys(endpoints EndpointServerPools) *NotificationSys {
|
func NewNotificationSys(endpoints EndpointServerPools) *NotificationSys {
|
||||||
// targetList/bucketRulesMap/bucketRemoteTargetRulesMap are populated by NotificationSys.Init()
|
|
||||||
remote, all := newPeerRestClients(endpoints)
|
remote, all := newPeerRestClients(endpoints)
|
||||||
return &NotificationSys{
|
return &NotificationSys{
|
||||||
peerClients: remote,
|
peerClients: remote,
|
||||||
|
@ -51,10 +51,6 @@ func handleSignals() {
|
|||||||
// send signal to various go-routines that they need to quit.
|
// send signal to various go-routines that they need to quit.
|
||||||
cancelGlobalContext()
|
cancelGlobalContext()
|
||||||
|
|
||||||
if globalEventNotifier != nil {
|
|
||||||
globalEventNotifier.RemoveAllRemoteTargets()
|
|
||||||
}
|
|
||||||
|
|
||||||
if httpServer := newHTTPServerFn(); httpServer != nil {
|
if httpServer := newHTTPServerFn(); httpServer != nil {
|
||||||
err = httpServer.Shutdown()
|
err = httpServer.Shutdown()
|
||||||
if !errors.Is(err, http.ErrServerClosed) {
|
if !errors.Is(err, http.ErrServerClosed) {
|
||||||
@ -71,6 +67,10 @@ func handleSignals() {
|
|||||||
logger.LogIf(context.Background(), srv.Shutdown())
|
logger.LogIf(context.Background(), srv.Shutdown())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if globalEventNotifier != nil {
|
||||||
|
globalEventNotifier.RemoveAllBucketTargets()
|
||||||
|
}
|
||||||
|
|
||||||
return (err == nil && oerr == nil)
|
return (err == nil && oerr == nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,6 +45,7 @@ const (
|
|||||||
apiDisableODirect = "disable_odirect"
|
apiDisableODirect = "disable_odirect"
|
||||||
apiGzipObjects = "gzip_objects"
|
apiGzipObjects = "gzip_objects"
|
||||||
apiRootAccess = "root_access"
|
apiRootAccess = "root_access"
|
||||||
|
apiSyncEvents = "sync_events"
|
||||||
|
|
||||||
EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX"
|
EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX"
|
||||||
EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE"
|
EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE"
|
||||||
@ -62,6 +63,7 @@ const (
|
|||||||
EnvAPIDisableODirect = "MINIO_API_DISABLE_ODIRECT"
|
EnvAPIDisableODirect = "MINIO_API_DISABLE_ODIRECT"
|
||||||
EnvAPIGzipObjects = "MINIO_API_GZIP_OBJECTS"
|
EnvAPIGzipObjects = "MINIO_API_GZIP_OBJECTS"
|
||||||
EnvAPIRootAccess = "MINIO_API_ROOT_ACCESS" // default "on"
|
EnvAPIRootAccess = "MINIO_API_ROOT_ACCESS" // default "on"
|
||||||
|
EnvAPISyncEvents = "MINIO_API_SYNC_EVENTS" // default "off"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Deprecated key and ENVs
|
// Deprecated key and ENVs
|
||||||
@ -135,6 +137,10 @@ var (
|
|||||||
Key: apiRootAccess,
|
Key: apiRootAccess,
|
||||||
Value: "on",
|
Value: "on",
|
||||||
},
|
},
|
||||||
|
config.KV{
|
||||||
|
Key: apiSyncEvents,
|
||||||
|
Value: "off",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -154,6 +160,7 @@ type Config struct {
|
|||||||
DisableODirect bool `json:"disable_odirect"`
|
DisableODirect bool `json:"disable_odirect"`
|
||||||
GzipObjects bool `json:"gzip_objects"`
|
GzipObjects bool `json:"gzip_objects"`
|
||||||
RootAccess bool `json:"root_access"`
|
RootAccess bool `json:"root_access"`
|
||||||
|
SyncEvents bool `json:"sync_events"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON.
|
// UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON.
|
||||||
@ -269,5 +276,7 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
|
|||||||
}
|
}
|
||||||
cfg.StaleUploadsExpiry = staleUploadsExpiry
|
cfg.StaleUploadsExpiry = staleUploadsExpiry
|
||||||
|
|
||||||
|
cfg.SyncEvents = env.Get(EnvAPISyncEvents, kvs.Get(apiSyncEvents)) == config.EnableOn
|
||||||
|
|
||||||
return cfg, nil
|
return cfg, nil
|
||||||
}
|
}
|
||||||
|
@ -104,5 +104,11 @@ var (
|
|||||||
Optional: true,
|
Optional: true,
|
||||||
Type: "boolean",
|
Type: "boolean",
|
||||||
},
|
},
|
||||||
|
config.HelpKV{
|
||||||
|
Key: apiSyncEvents,
|
||||||
|
Description: "set to enable synchronous bucket notifications" + defaultHelpPostfix(apiSyncEvents),
|
||||||
|
Optional: true,
|
||||||
|
Type: "boolean",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -157,7 +157,7 @@ func (list *TargetList) TargetMap() map[TargetID]Target {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Send - sends events to targets identified by target IDs.
|
// Send - sends events to targets identified by target IDs.
|
||||||
func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult) {
|
func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult, synchronous bool) {
|
||||||
if atomic.LoadInt64(&list.currentSendCalls) > maxConcurrentTargetSendCalls {
|
if atomic.LoadInt64(&list.currentSendCalls) > maxConcurrentTargetSendCalls {
|
||||||
err := fmt.Errorf("concurrent target notifications exceeded %d", maxConcurrentTargetSendCalls)
|
err := fmt.Errorf("concurrent target notifications exceeded %d", maxConcurrentTargetSendCalls)
|
||||||
for id := range targetIDset {
|
for id := range targetIDset {
|
||||||
@ -165,33 +165,40 @@ func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<-
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if synchronous {
|
||||||
|
list.send(event, targetIDset, resCh)
|
||||||
|
return
|
||||||
|
}
|
||||||
go func() {
|
go func() {
|
||||||
var wg sync.WaitGroup
|
list.send(event, targetIDset, resCh)
|
||||||
for id := range targetIDset {
|
|
||||||
list.RLock()
|
|
||||||
target, ok := list.targets[id]
|
|
||||||
list.RUnlock()
|
|
||||||
if ok {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(id TargetID, target Target) {
|
|
||||||
atomic.AddInt64(&list.currentSendCalls, 1)
|
|
||||||
defer atomic.AddInt64(&list.currentSendCalls, -1)
|
|
||||||
defer wg.Done()
|
|
||||||
tgtRes := TargetIDResult{ID: id}
|
|
||||||
if err := target.Save(event); err != nil {
|
|
||||||
tgtRes.Err = err
|
|
||||||
}
|
|
||||||
resCh <- tgtRes
|
|
||||||
}(id, target)
|
|
||||||
} else {
|
|
||||||
resCh <- TargetIDResult{ID: id}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (list *TargetList) send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for id := range targetIDset {
|
||||||
|
list.RLock()
|
||||||
|
target, ok := list.targets[id]
|
||||||
|
list.RUnlock()
|
||||||
|
if ok {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(id TargetID, target Target) {
|
||||||
|
atomic.AddInt64(&list.currentSendCalls, 1)
|
||||||
|
defer atomic.AddInt64(&list.currentSendCalls, -1)
|
||||||
|
defer wg.Done()
|
||||||
|
tgtRes := TargetIDResult{ID: id}
|
||||||
|
if err := target.Save(event); err != nil {
|
||||||
|
tgtRes.Err = err
|
||||||
|
}
|
||||||
|
resCh <- tgtRes
|
||||||
|
}(id, target)
|
||||||
|
} else {
|
||||||
|
resCh <- TargetIDResult{ID: id}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
// Stats returns stats for targets.
|
// Stats returns stats for targets.
|
||||||
func (list *TargetList) Stats() TargetStats {
|
func (list *TargetList) Stats() TargetStats {
|
||||||
t := TargetStats{}
|
t := TargetStats{}
|
||||||
|
@ -249,7 +249,7 @@ func TestTargetListSend(t *testing.T) {
|
|||||||
for i, testCase := range testCases {
|
for i, testCase := range testCases {
|
||||||
testCase.targetList.Send(Event{}, map[TargetID]struct{}{
|
testCase.targetList.Send(Event{}, map[TargetID]struct{}{
|
||||||
testCase.targetID: {},
|
testCase.targetID: {},
|
||||||
}, resCh)
|
}, resCh, false)
|
||||||
res := <-resCh
|
res := <-resCh
|
||||||
expectErr := (res.Err != nil)
|
expectErr := (res.Err != nil)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user