From afbb63a1975cac8a6d4085c84f051b6ac6adbe58 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Wed, 24 Aug 2022 06:42:36 -0700 Subject: [PATCH] Factor out external event notification funcs (#15574) This change moves external event notification functionality into `event-notification.go`. This simplifies notification related code. --- cmd/admin-bucket-handlers.go | 4 +- cmd/admin-handlers.go | 2 +- cmd/bucket-metadata-sys.go | 2 +- cmd/bucket-notification-handlers.go | 6 +- cmd/config-current.go | 2 +- cmd/event-notification.go | 323 ++++++++++++++++++++++++++++ cmd/gateway-main.go | 2 +- cmd/globals.go | 4 +- cmd/notification.go | 298 +------------------------ cmd/peer-rest-server.go | 6 +- cmd/server-main.go | 7 +- cmd/server-startup-msg.go | 2 +- cmd/signals.go | 4 +- 13 files changed, 354 insertions(+), 308 deletions(-) create mode 100644 cmd/event-notification.go diff --git a/cmd/admin-bucket-handlers.go b/cmd/admin-bucket-handlers.go index 3ca303797..0d1ed2c40 100644 --- a/cmd/admin-bucket-handlers.go +++ b/cmd/admin-bucket-handlers.go @@ -845,7 +845,7 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r * } switch fileName { case bucketNotificationConfig: - config, err := event.ParseConfig(io.LimitReader(reader, sz), globalSite.Region, globalNotificationSys.targetList) + config, err := event.ParseConfig(io.LimitReader(reader, sz), globalSite.Region, globalEventNotifier.targetList) if err != nil { rpt.SetStatus(bucket, fileName, fmt.Errorf("%s (%s)", errorCodes[ErrMalformedXML].Description, err)) continue @@ -862,7 +862,7 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r * continue } rulesMap := config.ToRulesMap() - globalNotificationSys.AddRulesMap(bucket, rulesMap) + globalEventNotifier.AddRulesMap(bucket, rulesMap) rpt.SetStatus(bucket, fileName, nil) case bucketPolicyConfig: // Error out if Content-Length is beyond allowed size. diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 2a844841c..e714f945a 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -1879,7 +1879,7 @@ func getServerInfo(ctx context.Context, r *http.Request) madmin.InfoMessage { Mode: string(mode), Domain: domain, Region: globalSite.Region, - SQSARN: globalNotificationSys.GetARNList(false), + SQSARN: globalEventNotifier.GetARNList(false), DeploymentID: globalDeploymentID, Buckets: buckets, Objects: objects, diff --git a/cmd/bucket-metadata-sys.go b/cmd/bucket-metadata-sys.go index 9b6253b2e..08deb0185 100644 --- a/cmd/bucket-metadata-sys.go +++ b/cmd/bucket-metadata-sys.go @@ -452,7 +452,7 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck sys.metadataMap[buckets[index].Name] = meta sys.Unlock() - globalNotificationSys.set(buckets[index], meta) // set notification targets + globalEventNotifier.set(buckets[index], meta) // set notification targets globalBucketTargetSys.set(buckets[index], meta) // set remote replication targets diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index 8d59ebbd8..2e7573d89 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -72,7 +72,7 @@ func (api objectAPIHandlers) GetBucketNotificationHandler(w http.ResponseWriter, return } config.SetRegion(globalSite.Region) - if err = config.Validate(globalSite.Region, globalNotificationSys.targetList); err != nil { + if err = config.Validate(globalSite.Region, globalEventNotifier.targetList); err != nil { arnErr, ok := err.(*event.ErrARNNotFound) if ok { for i, queue := range config.QueueList { @@ -144,7 +144,7 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter, return } - config, err := event.ParseConfig(io.LimitReader(r.Body, r.ContentLength), globalSite.Region, globalNotificationSys.targetList) + config, err := event.ParseConfig(io.LimitReader(r.Body, r.ContentLength), globalSite.Region, globalEventNotifier.targetList) if err != nil { apiErr := errorCodes.ToAPIErr(ErrMalformedXML) if event.IsEventError(err) { @@ -166,7 +166,7 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter, } rulesMap := config.ToRulesMap() - globalNotificationSys.AddRulesMap(bucketName, rulesMap) + globalEventNotifier.AddRulesMap(bucketName, rulesMap) writeSuccessResponseHeadersOnly(w) } diff --git a/cmd/config-current.go b/cmd/config-current.go index 110b7f69d..8344edba3 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -396,7 +396,7 @@ func validateSubSysConfig(s config.Config, subSys string, objAPI ObjectLayer) er } if config.NotifySubSystems.Contains(subSys) { - if err := notify.TestSubSysNotificationTargets(GlobalContext, s, NewGatewayHTTPTransport(), globalNotificationSys.ConfiguredTargetIDs(), subSys); err != nil { + if err := notify.TestSubSysNotificationTargets(GlobalContext, s, NewGatewayHTTPTransport(), globalEventNotifier.ConfiguredTargetIDs(), subSys); err != nil { return err } } diff --git a/cmd/event-notification.go b/cmd/event-notification.go new file mode 100644 index 000000000..fde2cdd8b --- /dev/null +++ b/cmd/event-notification.go @@ -0,0 +1,323 @@ +// Copyright (c) 2015-2022 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "context" + "fmt" + "net/url" + "strings" + "sync" + + "github.com/minio/minio/internal/crypto" + "github.com/minio/minio/internal/event" + xhttp "github.com/minio/minio/internal/http" + "github.com/minio/minio/internal/logger" + "github.com/minio/pkg/bucket/policy" +) + +// EventNotifier - notifies external systems about events in MinIO. +type EventNotifier struct { + sync.RWMutex + targetList *event.TargetList + targetResCh chan event.TargetIDResult + bucketRulesMap map[string]event.RulesMap + bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap +} + +// NewEventNotifier - creates new event notification object. +func NewEventNotifier() *EventNotifier { + // targetList/bucketRulesMap/bucketRemoteTargetRulesMap are populated by NotificationSys.Init() + return &EventNotifier{ + targetList: event.NewTargetList(), + targetResCh: make(chan event.TargetIDResult), + bucketRulesMap: make(map[string]event.RulesMap), + bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap), + } +} + +// GetARNList - returns available ARNs. +func (evnot *EventNotifier) GetARNList(onlyActive bool) []string { + arns := []string{} + if evnot == nil { + return arns + } + region := globalSite.Region + for targetID, target := range evnot.targetList.TargetMap() { + // httpclient target is part of ListenNotification + // which doesn't need to be listed as part of the ARN list + // This list is only meant for external targets, filter + // this out pro-actively. + if !strings.HasPrefix(targetID.ID, "httpclient+") { + if onlyActive && !target.HasQueueStore() { + if _, err := target.IsActive(); err != nil { + continue + } + } + arns = append(arns, targetID.ToARN(region).String()) + } + } + + return arns +} + +// Loads notification policies for all buckets into EventNotifier. +func (evnot *EventNotifier) set(bucket BucketInfo, meta BucketMetadata) { + config := meta.notificationConfig + if config == nil { + return + } + config.SetRegion(globalSite.Region) + if err := config.Validate(globalSite.Region, globalEventNotifier.targetList); err != nil { + if _, ok := err.(*event.ErrARNNotFound); !ok { + logger.LogIf(GlobalContext, err) + } + } + evnot.AddRulesMap(bucket.Name, config.ToRulesMap()) +} + +// InitBucketTargets - initializes notification system from notification.xml of all buckets. +func (evnot *EventNotifier) InitBucketTargets(ctx context.Context, objAPI ObjectLayer) error { + if objAPI == nil { + return errServerNotInitialized + } + + // In gateway mode, notifications are not supported - except NAS gateway. + if globalIsGateway && !objAPI.IsNotificationSupported() { + return nil + } + + logger.LogIf(ctx, evnot.targetList.Add(globalConfigTargetList.Targets()...)) + + go func() { + for res := range evnot.targetResCh { + if res.Err != nil { + reqInfo := &logger.ReqInfo{} + reqInfo.AppendTags("targetID", res.ID.Name) + logger.LogOnceIf(logger.SetReqInfo(GlobalContext, reqInfo), res.Err, res.ID.String()) + } + } + }() + + return nil +} + +// AddRulesMap - adds rules map for bucket name. +func (evnot *EventNotifier) AddRulesMap(bucketName string, rulesMap event.RulesMap) { + evnot.Lock() + defer evnot.Unlock() + + rulesMap = rulesMap.Clone() + + for _, targetRulesMap := range evnot.bucketRemoteTargetRulesMap[bucketName] { + rulesMap.Add(targetRulesMap) + } + + // Do not add for an empty rulesMap. + if len(rulesMap) == 0 { + delete(evnot.bucketRulesMap, bucketName) + } else { + evnot.bucketRulesMap[bucketName] = rulesMap + } +} + +// RemoveRulesMap - removes rules map for bucket name. +func (evnot *EventNotifier) RemoveRulesMap(bucketName string, rulesMap event.RulesMap) { + evnot.Lock() + defer evnot.Unlock() + + evnot.bucketRulesMap[bucketName].Remove(rulesMap) + if len(evnot.bucketRulesMap[bucketName]) == 0 { + delete(evnot.bucketRulesMap, bucketName) + } +} + +// ConfiguredTargetIDs - returns list of configured target id's +func (evnot *EventNotifier) ConfiguredTargetIDs() []event.TargetID { + if evnot == nil { + return nil + } + + evnot.RLock() + defer evnot.RUnlock() + + var targetIDs []event.TargetID + for _, rmap := range evnot.bucketRulesMap { + for _, rules := range rmap { + for _, targetSet := range rules { + for id := range targetSet { + targetIDs = append(targetIDs, id) + } + } + } + } + // Filter out targets configured via env + var tIDs []event.TargetID + for _, targetID := range targetIDs { + if !globalEnvTargetList.Exists(targetID) { + tIDs = append(tIDs, targetID) + } + } + return tIDs +} + +// RemoveNotification - removes all notification configuration for bucket name. +func (evnot *EventNotifier) RemoveNotification(bucketName string) { + evnot.Lock() + defer evnot.Unlock() + + delete(evnot.bucketRulesMap, bucketName) + + targetIDSet := event.NewTargetIDSet() + for targetID := range evnot.bucketRemoteTargetRulesMap[bucketName] { + targetIDSet[targetID] = struct{}{} + delete(evnot.bucketRemoteTargetRulesMap[bucketName], targetID) + } + evnot.targetList.Remove(targetIDSet) + + delete(evnot.bucketRemoteTargetRulesMap, bucketName) +} + +// RemoveAllRemoteTargets - closes and removes all notification targets. +func (evnot *EventNotifier) RemoveAllRemoteTargets() { + evnot.Lock() + defer evnot.Unlock() + + for _, targetMap := range evnot.bucketRemoteTargetRulesMap { + targetIDSet := event.NewTargetIDSet() + for k := range targetMap { + targetIDSet[k] = struct{}{} + } + evnot.targetList.Remove(targetIDSet) + } +} + +// Send - sends event data to all matching targets. +func (evnot *EventNotifier) Send(args eventArgs) { + evnot.RLock() + targetIDSet := evnot.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name) + evnot.RUnlock() + + if len(targetIDSet) == 0 { + return + } + + evnot.targetList.Send(args.ToEvent(true), targetIDSet, evnot.targetResCh) +} + +type eventArgs struct { + EventName event.Name + BucketName string + Object ObjectInfo + ReqParams map[string]string + RespElements map[string]string + Host string + UserAgent string +} + +// ToEvent - converts to notification event. +func (args eventArgs) ToEvent(escape bool) event.Event { + eventTime := UTCNow() + uniqueID := fmt.Sprintf("%X", eventTime.UnixNano()) + + respElements := map[string]string{ + "x-amz-request-id": args.RespElements["requestId"], + "x-minio-origin-endpoint": func() string { + if globalMinioEndpoint != "" { + return globalMinioEndpoint + } + return getAPIEndpoints()[0] + }(), // MinIO specific custom elements. + } + // Add deployment as part of + if globalDeploymentID != "" { + respElements["x-minio-deployment-id"] = globalDeploymentID + } + if args.RespElements["content-length"] != "" { + respElements["content-length"] = args.RespElements["content-length"] + } + keyName := args.Object.Name + if escape { + keyName = url.QueryEscape(args.Object.Name) + } + newEvent := event.Event{ + EventVersion: "2.0", + EventSource: "minio:s3", + AwsRegion: args.ReqParams["region"], + EventTime: eventTime.Format(event.AMZTimeFormat), + EventName: args.EventName, + UserIdentity: event.Identity{PrincipalID: args.ReqParams["principalId"]}, + RequestParameters: args.ReqParams, + ResponseElements: respElements, + S3: event.Metadata{ + SchemaVersion: "1.0", + ConfigurationID: "Config", + Bucket: event.Bucket{ + Name: args.BucketName, + OwnerIdentity: event.Identity{PrincipalID: args.ReqParams["principalId"]}, + ARN: policy.ResourceARNPrefix + args.BucketName, + }, + Object: event.Object{ + Key: keyName, + VersionID: args.Object.VersionID, + Sequencer: uniqueID, + }, + }, + Source: event.Source{ + Host: args.Host, + UserAgent: args.UserAgent, + }, + } + + if args.EventName != event.ObjectRemovedDelete && args.EventName != event.ObjectRemovedDeleteMarkerCreated { + newEvent.S3.Object.ETag = args.Object.ETag + newEvent.S3.Object.Size = args.Object.Size + newEvent.S3.Object.ContentType = args.Object.ContentType + newEvent.S3.Object.UserMetadata = make(map[string]string, len(args.Object.UserDefined)) + for k, v := range args.Object.UserDefined { + if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) { + continue + } + newEvent.S3.Object.UserMetadata[k] = v + } + } + + return newEvent +} + +func sendEvent(args eventArgs) { + args.Object.Size, _ = args.Object.GetActualSize() + + // avoid generating a notification for REPLICA creation event. + if _, ok := args.ReqParams[xhttp.MinIOSourceReplicationRequest]; ok { + return + } + // remove sensitive encryption entries in metadata. + crypto.RemoveSensitiveEntries(args.Object.UserDefined) + crypto.RemoveInternalEntries(args.Object.UserDefined) + + // globalNotificationSys is not initialized in gateway mode. + if globalNotificationSys == nil { + return + } + if globalHTTPListen.NumSubscribers(args.EventName) > 0 { + globalHTTPListen.Publish(args.ToEvent(false)) + } + + globalEventNotifier.Send(args) +} diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index 7451bfcd1..5f8da6370 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -316,7 +316,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) { } logger.FatalIf(globalBucketMetadataSys.Init(GlobalContext, buckets, newObject), "Unable to initialize bucket metadata") - logger.FatalIf(globalNotificationSys.InitBucketTargets(GlobalContext, newObject), "Unable to initialize bucket targets for notification system") + logger.FatalIf(globalEventNotifier.InitBucketTargets(GlobalContext, newObject), "Unable to initialize bucket targets for notification system") } if globalCacheConfig.Enabled { diff --git a/cmd/globals.go b/cmd/globals.go index 313e889fd..13afe315b 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -188,7 +188,9 @@ var ( // globalConfigSys server config system. globalConfigSys *ConfigSys - globalNotificationSys *NotificationSys + globalNotificationSys *NotificationSys + + globalEventNotifier *EventNotifier globalConfigTargetList *event.TargetList // globalEnvTargetList has list of targets configured via env. globalEnvTargetList *event.TargetList diff --git a/cmd/notification.go b/cmd/notification.go index 6f412178c..6bffae46c 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -26,7 +26,6 @@ import ( "io" "net/http" "net/url" - "strings" "sync" "time" @@ -35,49 +34,18 @@ import ( "github.com/klauspost/compress/zip" "github.com/minio/madmin-go" bucketBandwidth "github.com/minio/minio/internal/bucket/bandwidth" - "github.com/minio/minio/internal/crypto" - "github.com/minio/minio/internal/event" - xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/sync/errgroup" - "github.com/minio/pkg/bucket/policy" xnet "github.com/minio/pkg/net" ) +// This file contains peer related notifications. For sending notifications to +// external systems, see event-notification.go + // NotificationSys - notification system. type NotificationSys struct { - sync.RWMutex - targetList *event.TargetList - targetResCh chan event.TargetIDResult - bucketRulesMap map[string]event.RulesMap - bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap - peerClients []*peerRESTClient // Excludes self - allPeerClients []*peerRESTClient // Includes nil client for self -} - -// GetARNList - returns available ARNs. -func (sys *NotificationSys) GetARNList(onlyActive bool) []string { - arns := []string{} - if sys == nil { - return arns - } - region := globalSite.Region - for targetID, target := range sys.targetList.TargetMap() { - // httpclient target is part of ListenNotification - // which doesn't need to be listed as part of the ARN list - // This list is only meant for external targets, filter - // this out pro-actively. - if !strings.HasPrefix(targetID.ID, "httpclient+") { - if onlyActive && !target.HasQueueStore() { - if _, err := target.IsActive(); err != nil { - continue - } - } - arns = append(arns, targetID.ToARN(region).String()) - } - } - - return arns + peerClients []*peerRESTClient // Excludes self + allPeerClients []*peerRESTClient // Includes nil client for self } // NotificationPeerErr returns error associated for a remote peer. @@ -550,7 +518,7 @@ func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName globalReplicationStats.Delete(bucketName) globalBucketMetadataSys.Remove(bucketName) globalBucketTargetSys.Delete(bucketName) - globalNotificationSys.RemoveNotification(bucketName) + globalEventNotifier.RemoveNotification(bucketName) globalBucketConnStats.delete(bucketName) if localMetacacheMgr != nil { localMetacacheMgr.deleteBucketCache(bucketName) @@ -684,150 +652,6 @@ func (sys *NotificationSys) LoadTransitionTierConfig(ctx context.Context) { } } -// Loads notification policies for all buckets into NotificationSys. -func (sys *NotificationSys) set(bucket BucketInfo, meta BucketMetadata) { - config := meta.notificationConfig - if config == nil { - return - } - config.SetRegion(globalSite.Region) - if err := config.Validate(globalSite.Region, globalNotificationSys.targetList); err != nil { - if _, ok := err.(*event.ErrARNNotFound); !ok { - logger.LogIf(GlobalContext, err) - } - } - sys.AddRulesMap(bucket.Name, config.ToRulesMap()) -} - -// InitBucketTargets - initializes notification system from notification.xml of all buckets. -func (sys *NotificationSys) InitBucketTargets(ctx context.Context, objAPI ObjectLayer) error { - if objAPI == nil { - return errServerNotInitialized - } - - // In gateway mode, notifications are not supported - except NAS gateway. - if globalIsGateway && !objAPI.IsNotificationSupported() { - return nil - } - - logger.LogIf(ctx, sys.targetList.Add(globalConfigTargetList.Targets()...)) - - go func() { - for res := range sys.targetResCh { - if res.Err != nil { - reqInfo := &logger.ReqInfo{} - reqInfo.AppendTags("targetID", res.ID.Name) - logger.LogOnceIf(logger.SetReqInfo(GlobalContext, reqInfo), res.Err, res.ID.String()) - } - } - }() - - return nil -} - -// AddRulesMap - adds rules map for bucket name. -func (sys *NotificationSys) AddRulesMap(bucketName string, rulesMap event.RulesMap) { - sys.Lock() - defer sys.Unlock() - - rulesMap = rulesMap.Clone() - - for _, targetRulesMap := range sys.bucketRemoteTargetRulesMap[bucketName] { - rulesMap.Add(targetRulesMap) - } - - // Do not add for an empty rulesMap. - if len(rulesMap) == 0 { - delete(sys.bucketRulesMap, bucketName) - } else { - sys.bucketRulesMap[bucketName] = rulesMap - } -} - -// RemoveRulesMap - removes rules map for bucket name. -func (sys *NotificationSys) RemoveRulesMap(bucketName string, rulesMap event.RulesMap) { - sys.Lock() - defer sys.Unlock() - - sys.bucketRulesMap[bucketName].Remove(rulesMap) - if len(sys.bucketRulesMap[bucketName]) == 0 { - delete(sys.bucketRulesMap, bucketName) - } -} - -// ConfiguredTargetIDs - returns list of configured target id's -func (sys *NotificationSys) ConfiguredTargetIDs() []event.TargetID { - if sys == nil { - return nil - } - - sys.RLock() - defer sys.RUnlock() - - var targetIDs []event.TargetID - for _, rmap := range sys.bucketRulesMap { - for _, rules := range rmap { - for _, targetSet := range rules { - for id := range targetSet { - targetIDs = append(targetIDs, id) - } - } - } - } - // Filter out targets configured via env - var tIDs []event.TargetID - for _, targetID := range targetIDs { - if !globalEnvTargetList.Exists(targetID) { - tIDs = append(tIDs, targetID) - } - } - return tIDs -} - -// RemoveNotification - removes all notification configuration for bucket name. -func (sys *NotificationSys) RemoveNotification(bucketName string) { - sys.Lock() - defer sys.Unlock() - - delete(sys.bucketRulesMap, bucketName) - - targetIDSet := event.NewTargetIDSet() - for targetID := range sys.bucketRemoteTargetRulesMap[bucketName] { - targetIDSet[targetID] = struct{}{} - delete(sys.bucketRemoteTargetRulesMap[bucketName], targetID) - } - sys.targetList.Remove(targetIDSet) - - delete(sys.bucketRemoteTargetRulesMap, bucketName) -} - -// RemoveAllRemoteTargets - closes and removes all notification targets. -func (sys *NotificationSys) RemoveAllRemoteTargets() { - sys.Lock() - defer sys.Unlock() - - for _, targetMap := range sys.bucketRemoteTargetRulesMap { - targetIDSet := event.NewTargetIDSet() - for k := range targetMap { - targetIDSet[k] = struct{}{} - } - sys.targetList.Remove(targetIDSet) - } -} - -// Send - sends event data to all matching targets. -func (sys *NotificationSys) Send(args eventArgs) { - sys.RLock() - targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name) - sys.RUnlock() - - if len(targetIDSet) == 0 { - return - } - - sys.targetList.Send(args.ToEvent(true), targetIDSet, sys.targetResCh) -} - // GetCPUs - Get all CPU information. func (sys *NotificationSys) GetCPUs(ctx context.Context) []madmin.CPUs { reply := make([]madmin.CPUs, len(sys.peerClients)) @@ -1190,117 +1014,11 @@ func NewNotificationSys(endpoints EndpointServerPools) *NotificationSys { // targetList/bucketRulesMap/bucketRemoteTargetRulesMap are populated by NotificationSys.Init() remote, all := newPeerRestClients(endpoints) return &NotificationSys{ - targetList: event.NewTargetList(), - targetResCh: make(chan event.TargetIDResult), - bucketRulesMap: make(map[string]event.RulesMap), - bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap), - peerClients: remote, - allPeerClients: all, + peerClients: remote, + allPeerClients: all, } } -type eventArgs struct { - EventName event.Name - BucketName string - Object ObjectInfo - ReqParams map[string]string - RespElements map[string]string - Host string - UserAgent string -} - -// ToEvent - converts to notification event. -func (args eventArgs) ToEvent(escape bool) event.Event { - eventTime := UTCNow() - uniqueID := fmt.Sprintf("%X", eventTime.UnixNano()) - - respElements := map[string]string{ - "x-amz-request-id": args.RespElements["requestId"], - "x-minio-origin-endpoint": func() string { - if globalMinioEndpoint != "" { - return globalMinioEndpoint - } - return getAPIEndpoints()[0] - }(), // MinIO specific custom elements. - } - // Add deployment as part of - if globalDeploymentID != "" { - respElements["x-minio-deployment-id"] = globalDeploymentID - } - if args.RespElements["content-length"] != "" { - respElements["content-length"] = args.RespElements["content-length"] - } - keyName := args.Object.Name - if escape { - keyName = url.QueryEscape(args.Object.Name) - } - newEvent := event.Event{ - EventVersion: "2.0", - EventSource: "minio:s3", - AwsRegion: args.ReqParams["region"], - EventTime: eventTime.Format(event.AMZTimeFormat), - EventName: args.EventName, - UserIdentity: event.Identity{PrincipalID: args.ReqParams["principalId"]}, - RequestParameters: args.ReqParams, - ResponseElements: respElements, - S3: event.Metadata{ - SchemaVersion: "1.0", - ConfigurationID: "Config", - Bucket: event.Bucket{ - Name: args.BucketName, - OwnerIdentity: event.Identity{PrincipalID: args.ReqParams["principalId"]}, - ARN: policy.ResourceARNPrefix + args.BucketName, - }, - Object: event.Object{ - Key: keyName, - VersionID: args.Object.VersionID, - Sequencer: uniqueID, - }, - }, - Source: event.Source{ - Host: args.Host, - UserAgent: args.UserAgent, - }, - } - - if args.EventName != event.ObjectRemovedDelete && args.EventName != event.ObjectRemovedDeleteMarkerCreated { - newEvent.S3.Object.ETag = args.Object.ETag - newEvent.S3.Object.Size = args.Object.Size - newEvent.S3.Object.ContentType = args.Object.ContentType - newEvent.S3.Object.UserMetadata = make(map[string]string, len(args.Object.UserDefined)) - for k, v := range args.Object.UserDefined { - if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) { - continue - } - newEvent.S3.Object.UserMetadata[k] = v - } - } - - return newEvent -} - -func sendEvent(args eventArgs) { - args.Object.Size, _ = args.Object.GetActualSize() - - // avoid generating a notification for REPLICA creation event. - if _, ok := args.ReqParams[xhttp.MinIOSourceReplicationRequest]; ok { - return - } - // remove sensitive encryption entries in metadata. - crypto.RemoveSensitiveEntries(args.Object.UserDefined) - crypto.RemoveInternalEntries(args.Object.UserDefined) - - // globalNotificationSys is not initialized in gateway mode. - if globalNotificationSys == nil { - return - } - if globalHTTPListen.NumSubscribers(args.EventName) > 0 { - globalHTTPListen.Publish(args.ToEvent(false)) - } - - globalNotificationSys.Send(args) -} - // GetBandwidthReports - gets the bandwidth report from all nodes including self. func (sys *NotificationSys) GetBandwidthReports(ctx context.Context, buckets ...string) madmin.BucketBandwidthReport { reports := make([]*madmin.BucketBandwidthReport, len(sys.peerClients)) diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index d6fdd0336..c0dc80736 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -512,7 +512,7 @@ func (s *peerRESTServer) DeleteBucketMetadataHandler(w http.ResponseWriter, r *h globalReplicationStats.Delete(bucketName) globalBucketMetadataSys.Remove(bucketName) globalBucketTargetSys.Delete(bucketName) - globalNotificationSys.RemoveNotification(bucketName) + globalEventNotifier.RemoveNotification(bucketName) globalBucketConnStats.delete(bucketName) if localMetacacheMgr != nil { localMetacacheMgr.deleteBucketCache(bucketName) @@ -605,7 +605,7 @@ func (s *peerRESTServer) LoadBucketMetadataHandler(w http.ResponseWriter, r *htt globalBucketMetadataSys.Set(bucketName, meta) if meta.notificationConfig != nil { - globalNotificationSys.AddRulesMap(bucketName, meta.notificationConfig.ToRulesMap()) + globalEventNotifier.AddRulesMap(bucketName, meta.notificationConfig.ToRulesMap()) } if meta.bucketTargetConfig != nil { @@ -702,7 +702,7 @@ func (s *peerRESTServer) PutBucketNotificationHandler(w http.ResponseWriter, r * return } - globalNotificationSys.AddRulesMap(bucketName, rulesMap) + globalEventNotifier.AddRulesMap(bucketName, rulesMap) } // Return disk IDs of all the local disks. diff --git a/cmd/server-main.go b/cmd/server-main.go index 90589889d..78a80d2b0 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -249,9 +249,12 @@ func initAllSubsystems() { globalBackgroundHealState = newHealState(false) globalHealStateLK.Unlock() - // Create new notification system and initialize notification peer targets + // Initialize notification peer targets globalNotificationSys = NewNotificationSys(globalEndpoints) + // Create new notification system + globalEventNotifier = NewEventNotifier() + // Create new bucket metadata system. if globalBucketMetadataSys == nil { globalBucketMetadataSys = NewBucketMetadataSys() @@ -630,7 +633,7 @@ func serverMain(ctx *cli.Context) { globalSiteReplicationSys.Init(GlobalContext, newObject) // Initialize bucket notification targets. - globalNotificationSys.InitBucketTargets(GlobalContext, newObject) + globalEventNotifier.InitBucketTargets(GlobalContext, newObject) // initialize the new disk cache objects. if globalCacheConfig.Enabled { diff --git a/cmd/server-startup-msg.go b/cmd/server-startup-msg.go index 1ab5fec8b..0b3185103 100644 --- a/cmd/server-startup-msg.go +++ b/cmd/server-startup-msg.go @@ -163,7 +163,7 @@ func printEventNotifiers() { return } - arns := globalNotificationSys.GetARNList(true) + arns := globalEventNotifier.GetARNList(true) if len(arns) == 0 { return } diff --git a/cmd/signals.go b/cmd/signals.go index 8e43dae5d..9fbb42814 100644 --- a/cmd/signals.go +++ b/cmd/signals.go @@ -50,8 +50,8 @@ func handleSignals() { // send signal to various go-routines that they need to quit. cancelGlobalContext() - if globalNotificationSys != nil { - globalNotificationSys.RemoveAllRemoteTargets() + if globalEventNotifier != nil { + globalEventNotifier.RemoveAllRemoteTargets() } if httpServer := newHTTPServerFn(); httpServer != nil {