diff --git a/cmd/event-notification.go b/cmd/event-notification.go index 09e8c3237..4cce51fd0 100644 --- a/cmd/event-notification.go +++ b/cmd/event-notification.go @@ -35,20 +35,18 @@ import ( // EventNotifier - notifies external systems about events in MinIO. type EventNotifier struct { sync.RWMutex - targetList *event.TargetList - targetResCh chan event.TargetIDResult - bucketRulesMap map[string]event.RulesMap - bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap + targetList *event.TargetList + targetResCh chan event.TargetIDResult + bucketRulesMap map[string]event.RulesMap } // NewEventNotifier - creates new event notification object. func NewEventNotifier() *EventNotifier { // targetList/bucketRulesMap/bucketRemoteTargetRulesMap are populated by NotificationSys.InitBucketTargets() return &EventNotifier{ - targetList: event.NewTargetList(), - targetResCh: make(chan event.TargetIDResult), - bucketRulesMap: make(map[string]event.RulesMap), - bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap), + targetList: event.NewTargetList(), + targetResCh: make(chan event.TargetIDResult), + bucketRulesMap: make(map[string]event.RulesMap), } } @@ -122,10 +120,6 @@ func (evnot *EventNotifier) AddRulesMap(bucketName string, rulesMap event.RulesM rulesMap = rulesMap.Clone() - for _, targetRulesMap := range evnot.bucketRemoteTargetRulesMap[bucketName] { - rulesMap.Add(targetRulesMap) - } - // Do not add for an empty rulesMap. if len(rulesMap) == 0 { delete(evnot.bucketRulesMap, bucketName) @@ -174,29 +168,18 @@ func (evnot *EventNotifier) RemoveNotification(bucketName string) { defer evnot.Unlock() delete(evnot.bucketRulesMap, bucketName) - - targetIDSet := event.NewTargetIDSet() - for targetID := range evnot.bucketRemoteTargetRulesMap[bucketName] { - targetIDSet[targetID] = struct{}{} - delete(evnot.bucketRemoteTargetRulesMap[bucketName], targetID) - } - evnot.targetList.Remove(targetIDSet) - - delete(evnot.bucketRemoteTargetRulesMap, bucketName) } -// RemoveAllRemoteTargets - closes and removes all notification targets. -func (evnot *EventNotifier) RemoveAllRemoteTargets() { +// RemoveAllBucketTargets - closes and removes all notification targets. +func (evnot *EventNotifier) RemoveAllBucketTargets() { evnot.Lock() defer evnot.Unlock() - for _, targetMap := range evnot.bucketRemoteTargetRulesMap { - targetIDSet := event.NewTargetIDSet() - for k := range targetMap { - targetIDSet[k] = struct{}{} - } - evnot.targetList.Remove(targetIDSet) + targetIDSet := event.NewTargetIDSet() + for k := range evnot.targetList.TargetMap() { + targetIDSet[k] = struct{}{} } + evnot.targetList.Remove(targetIDSet) } // Send - sends the event to all registered notification targets @@ -209,7 +192,8 @@ func (evnot *EventNotifier) Send(args eventArgs) { return } - evnot.targetList.Send(args.ToEvent(true), targetIDSet, evnot.targetResCh) + // If MINIO_API_SYNC_EVENTS is set, send events synchronously. + evnot.targetList.Send(args.ToEvent(true), targetIDSet, evnot.targetResCh, globalAPIConfig.isSyncEventsEnabled()) } type eventArgs struct { diff --git a/cmd/handler-api.go b/cmd/handler-api.go index 2ae505058..48d5545c8 100644 --- a/cmd/handler-api.go +++ b/cmd/handler-api.go @@ -52,6 +52,7 @@ type apiConfig struct { disableODirect bool gzipObjects bool rootAccess bool + syncEvents bool } const cgroupLimitFile = "/sys/fs/cgroup/memory/memory.limit_in_bytes" @@ -166,6 +167,7 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) { t.disableODirect = cfg.DisableODirect t.gzipObjects = cfg.GzipObjects t.rootAccess = cfg.RootAccess + t.syncEvents = cfg.SyncEvents } func (t *apiConfig) isDisableODirect() bool { @@ -353,3 +355,10 @@ func (t *apiConfig) getTransitionWorkers() int { return t.transitionWorkers } + +func (t *apiConfig) isSyncEventsEnabled() bool { + t.mu.RLock() + defer t.mu.RUnlock() + + return t.syncEvents +} diff --git a/cmd/notification.go b/cmd/notification.go index 825fd2433..4a0c3970e 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -1032,7 +1032,6 @@ func (sys *NotificationSys) GetPeerOnlineCount() (nodesOnline, nodesOffline int) // NewNotificationSys - creates new notification system object. func NewNotificationSys(endpoints EndpointServerPools) *NotificationSys { - // targetList/bucketRulesMap/bucketRemoteTargetRulesMap are populated by NotificationSys.Init() remote, all := newPeerRestClients(endpoints) return &NotificationSys{ peerClients: remote, diff --git a/cmd/signals.go b/cmd/signals.go index 616a4c26b..902d804ed 100644 --- a/cmd/signals.go +++ b/cmd/signals.go @@ -51,10 +51,6 @@ func handleSignals() { // send signal to various go-routines that they need to quit. cancelGlobalContext() - if globalEventNotifier != nil { - globalEventNotifier.RemoveAllRemoteTargets() - } - if httpServer := newHTTPServerFn(); httpServer != nil { err = httpServer.Shutdown() if !errors.Is(err, http.ErrServerClosed) { @@ -71,6 +67,10 @@ func handleSignals() { logger.LogIf(context.Background(), srv.Shutdown()) } + if globalEventNotifier != nil { + globalEventNotifier.RemoveAllBucketTargets() + } + return (err == nil && oerr == nil) } diff --git a/internal/config/api/api.go b/internal/config/api/api.go index ee0b7550f..9c844334b 100644 --- a/internal/config/api/api.go +++ b/internal/config/api/api.go @@ -45,6 +45,7 @@ const ( apiDisableODirect = "disable_odirect" apiGzipObjects = "gzip_objects" apiRootAccess = "root_access" + apiSyncEvents = "sync_events" EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX" EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE" @@ -62,6 +63,7 @@ const ( EnvAPIDisableODirect = "MINIO_API_DISABLE_ODIRECT" EnvAPIGzipObjects = "MINIO_API_GZIP_OBJECTS" EnvAPIRootAccess = "MINIO_API_ROOT_ACCESS" // default "on" + EnvAPISyncEvents = "MINIO_API_SYNC_EVENTS" // default "off" ) // Deprecated key and ENVs @@ -135,6 +137,10 @@ var ( Key: apiRootAccess, Value: "on", }, + config.KV{ + Key: apiSyncEvents, + Value: "off", + }, } ) @@ -154,6 +160,7 @@ type Config struct { DisableODirect bool `json:"disable_odirect"` GzipObjects bool `json:"gzip_objects"` RootAccess bool `json:"root_access"` + SyncEvents bool `json:"sync_events"` } // UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON. @@ -269,5 +276,7 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { } cfg.StaleUploadsExpiry = staleUploadsExpiry + cfg.SyncEvents = env.Get(EnvAPISyncEvents, kvs.Get(apiSyncEvents)) == config.EnableOn + return cfg, nil } diff --git a/internal/config/api/help.go b/internal/config/api/help.go index db48ed29c..ea4a408cb 100644 --- a/internal/config/api/help.go +++ b/internal/config/api/help.go @@ -104,5 +104,11 @@ var ( Optional: true, Type: "boolean", }, + config.HelpKV{ + Key: apiSyncEvents, + Description: "set to enable synchronous bucket notifications" + defaultHelpPostfix(apiSyncEvents), + Optional: true, + Type: "boolean", + }, } ) diff --git a/internal/event/targetlist.go b/internal/event/targetlist.go index 9da477be1..8f9600087 100644 --- a/internal/event/targetlist.go +++ b/internal/event/targetlist.go @@ -157,7 +157,7 @@ func (list *TargetList) TargetMap() map[TargetID]Target { } // Send - sends events to targets identified by target IDs. -func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult) { +func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult, synchronous bool) { if atomic.LoadInt64(&list.currentSendCalls) > maxConcurrentTargetSendCalls { err := fmt.Errorf("concurrent target notifications exceeded %d", maxConcurrentTargetSendCalls) for id := range targetIDset { @@ -165,33 +165,40 @@ func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- } return } - + if synchronous { + list.send(event, targetIDset, resCh) + return + } go func() { - var wg sync.WaitGroup - for id := range targetIDset { - list.RLock() - target, ok := list.targets[id] - list.RUnlock() - if ok { - wg.Add(1) - go func(id TargetID, target Target) { - atomic.AddInt64(&list.currentSendCalls, 1) - defer atomic.AddInt64(&list.currentSendCalls, -1) - defer wg.Done() - tgtRes := TargetIDResult{ID: id} - if err := target.Save(event); err != nil { - tgtRes.Err = err - } - resCh <- tgtRes - }(id, target) - } else { - resCh <- TargetIDResult{ID: id} - } - } - wg.Wait() + list.send(event, targetIDset, resCh) }() } +func (list *TargetList) send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult) { + var wg sync.WaitGroup + for id := range targetIDset { + list.RLock() + target, ok := list.targets[id] + list.RUnlock() + if ok { + wg.Add(1) + go func(id TargetID, target Target) { + atomic.AddInt64(&list.currentSendCalls, 1) + defer atomic.AddInt64(&list.currentSendCalls, -1) + defer wg.Done() + tgtRes := TargetIDResult{ID: id} + if err := target.Save(event); err != nil { + tgtRes.Err = err + } + resCh <- tgtRes + }(id, target) + } else { + resCh <- TargetIDResult{ID: id} + } + } + wg.Wait() +} + // Stats returns stats for targets. func (list *TargetList) Stats() TargetStats { t := TargetStats{} diff --git a/internal/event/targetlist_test.go b/internal/event/targetlist_test.go index 9a3bbda8b..6afd79b76 100644 --- a/internal/event/targetlist_test.go +++ b/internal/event/targetlist_test.go @@ -249,7 +249,7 @@ func TestTargetListSend(t *testing.T) { for i, testCase := range testCases { testCase.targetList.Send(Event{}, map[TargetID]struct{}{ testCase.targetID: {}, - }, resCh) + }, resCh, false) res := <-resCh expectErr := (res.Err != nil)