Factor out external event notification funcs (#15574)

This change moves external event notification functionality into
`event-notification.go`. This simplifies notification related code.
This commit is contained in:
Aditya Manthramurthy 2022-08-24 06:42:36 -07:00 committed by GitHub
parent 8902561f3c
commit afbb63a197
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 354 additions and 308 deletions

View File

@ -845,7 +845,7 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r *
} }
switch fileName { switch fileName {
case bucketNotificationConfig: case bucketNotificationConfig:
config, err := event.ParseConfig(io.LimitReader(reader, sz), globalSite.Region, globalNotificationSys.targetList) config, err := event.ParseConfig(io.LimitReader(reader, sz), globalSite.Region, globalEventNotifier.targetList)
if err != nil { if err != nil {
rpt.SetStatus(bucket, fileName, fmt.Errorf("%s (%s)", errorCodes[ErrMalformedXML].Description, err)) rpt.SetStatus(bucket, fileName, fmt.Errorf("%s (%s)", errorCodes[ErrMalformedXML].Description, err))
continue continue
@ -862,7 +862,7 @@ func (a adminAPIHandlers) ImportBucketMetadataHandler(w http.ResponseWriter, r *
continue continue
} }
rulesMap := config.ToRulesMap() rulesMap := config.ToRulesMap()
globalNotificationSys.AddRulesMap(bucket, rulesMap) globalEventNotifier.AddRulesMap(bucket, rulesMap)
rpt.SetStatus(bucket, fileName, nil) rpt.SetStatus(bucket, fileName, nil)
case bucketPolicyConfig: case bucketPolicyConfig:
// Error out if Content-Length is beyond allowed size. // Error out if Content-Length is beyond allowed size.

View File

@ -1879,7 +1879,7 @@ func getServerInfo(ctx context.Context, r *http.Request) madmin.InfoMessage {
Mode: string(mode), Mode: string(mode),
Domain: domain, Domain: domain,
Region: globalSite.Region, Region: globalSite.Region,
SQSARN: globalNotificationSys.GetARNList(false), SQSARN: globalEventNotifier.GetARNList(false),
DeploymentID: globalDeploymentID, DeploymentID: globalDeploymentID,
Buckets: buckets, Buckets: buckets,
Objects: objects, Objects: objects,

View File

@ -452,7 +452,7 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck
sys.metadataMap[buckets[index].Name] = meta sys.metadataMap[buckets[index].Name] = meta
sys.Unlock() sys.Unlock()
globalNotificationSys.set(buckets[index], meta) // set notification targets globalEventNotifier.set(buckets[index], meta) // set notification targets
globalBucketTargetSys.set(buckets[index], meta) // set remote replication targets globalBucketTargetSys.set(buckets[index], meta) // set remote replication targets

View File

@ -72,7 +72,7 @@ func (api objectAPIHandlers) GetBucketNotificationHandler(w http.ResponseWriter,
return return
} }
config.SetRegion(globalSite.Region) config.SetRegion(globalSite.Region)
if err = config.Validate(globalSite.Region, globalNotificationSys.targetList); err != nil { if err = config.Validate(globalSite.Region, globalEventNotifier.targetList); err != nil {
arnErr, ok := err.(*event.ErrARNNotFound) arnErr, ok := err.(*event.ErrARNNotFound)
if ok { if ok {
for i, queue := range config.QueueList { for i, queue := range config.QueueList {
@ -144,7 +144,7 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter,
return return
} }
config, err := event.ParseConfig(io.LimitReader(r.Body, r.ContentLength), globalSite.Region, globalNotificationSys.targetList) config, err := event.ParseConfig(io.LimitReader(r.Body, r.ContentLength), globalSite.Region, globalEventNotifier.targetList)
if err != nil { if err != nil {
apiErr := errorCodes.ToAPIErr(ErrMalformedXML) apiErr := errorCodes.ToAPIErr(ErrMalformedXML)
if event.IsEventError(err) { if event.IsEventError(err) {
@ -166,7 +166,7 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter,
} }
rulesMap := config.ToRulesMap() rulesMap := config.ToRulesMap()
globalNotificationSys.AddRulesMap(bucketName, rulesMap) globalEventNotifier.AddRulesMap(bucketName, rulesMap)
writeSuccessResponseHeadersOnly(w) writeSuccessResponseHeadersOnly(w)
} }

View File

@ -396,7 +396,7 @@ func validateSubSysConfig(s config.Config, subSys string, objAPI ObjectLayer) er
} }
if config.NotifySubSystems.Contains(subSys) { if config.NotifySubSystems.Contains(subSys) {
if err := notify.TestSubSysNotificationTargets(GlobalContext, s, NewGatewayHTTPTransport(), globalNotificationSys.ConfiguredTargetIDs(), subSys); err != nil { if err := notify.TestSubSysNotificationTargets(GlobalContext, s, NewGatewayHTTPTransport(), globalEventNotifier.ConfiguredTargetIDs(), subSys); err != nil {
return err return err
} }
} }

323
cmd/event-notification.go Normal file
View File

@ -0,0 +1,323 @@
// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"fmt"
"net/url"
"strings"
"sync"
"github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/event"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/bucket/policy"
)
// EventNotifier - notifies external systems about events in MinIO.
type EventNotifier struct {
sync.RWMutex
targetList *event.TargetList
targetResCh chan event.TargetIDResult
bucketRulesMap map[string]event.RulesMap
bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap
}
// NewEventNotifier - creates new event notification object.
func NewEventNotifier() *EventNotifier {
// targetList/bucketRulesMap/bucketRemoteTargetRulesMap are populated by NotificationSys.Init()
return &EventNotifier{
targetList: event.NewTargetList(),
targetResCh: make(chan event.TargetIDResult),
bucketRulesMap: make(map[string]event.RulesMap),
bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap),
}
}
// GetARNList - returns available ARNs.
func (evnot *EventNotifier) GetARNList(onlyActive bool) []string {
arns := []string{}
if evnot == nil {
return arns
}
region := globalSite.Region
for targetID, target := range evnot.targetList.TargetMap() {
// httpclient target is part of ListenNotification
// which doesn't need to be listed as part of the ARN list
// This list is only meant for external targets, filter
// this out pro-actively.
if !strings.HasPrefix(targetID.ID, "httpclient+") {
if onlyActive && !target.HasQueueStore() {
if _, err := target.IsActive(); err != nil {
continue
}
}
arns = append(arns, targetID.ToARN(region).String())
}
}
return arns
}
// Loads notification policies for all buckets into EventNotifier.
func (evnot *EventNotifier) set(bucket BucketInfo, meta BucketMetadata) {
config := meta.notificationConfig
if config == nil {
return
}
config.SetRegion(globalSite.Region)
if err := config.Validate(globalSite.Region, globalEventNotifier.targetList); err != nil {
if _, ok := err.(*event.ErrARNNotFound); !ok {
logger.LogIf(GlobalContext, err)
}
}
evnot.AddRulesMap(bucket.Name, config.ToRulesMap())
}
// InitBucketTargets - initializes notification system from notification.xml of all buckets.
func (evnot *EventNotifier) InitBucketTargets(ctx context.Context, objAPI ObjectLayer) error {
if objAPI == nil {
return errServerNotInitialized
}
// In gateway mode, notifications are not supported - except NAS gateway.
if globalIsGateway && !objAPI.IsNotificationSupported() {
return nil
}
logger.LogIf(ctx, evnot.targetList.Add(globalConfigTargetList.Targets()...))
go func() {
for res := range evnot.targetResCh {
if res.Err != nil {
reqInfo := &logger.ReqInfo{}
reqInfo.AppendTags("targetID", res.ID.Name)
logger.LogOnceIf(logger.SetReqInfo(GlobalContext, reqInfo), res.Err, res.ID.String())
}
}
}()
return nil
}
// AddRulesMap - adds rules map for bucket name.
func (evnot *EventNotifier) AddRulesMap(bucketName string, rulesMap event.RulesMap) {
evnot.Lock()
defer evnot.Unlock()
rulesMap = rulesMap.Clone()
for _, targetRulesMap := range evnot.bucketRemoteTargetRulesMap[bucketName] {
rulesMap.Add(targetRulesMap)
}
// Do not add for an empty rulesMap.
if len(rulesMap) == 0 {
delete(evnot.bucketRulesMap, bucketName)
} else {
evnot.bucketRulesMap[bucketName] = rulesMap
}
}
// RemoveRulesMap - removes rules map for bucket name.
func (evnot *EventNotifier) RemoveRulesMap(bucketName string, rulesMap event.RulesMap) {
evnot.Lock()
defer evnot.Unlock()
evnot.bucketRulesMap[bucketName].Remove(rulesMap)
if len(evnot.bucketRulesMap[bucketName]) == 0 {
delete(evnot.bucketRulesMap, bucketName)
}
}
// ConfiguredTargetIDs - returns list of configured target id's
func (evnot *EventNotifier) ConfiguredTargetIDs() []event.TargetID {
if evnot == nil {
return nil
}
evnot.RLock()
defer evnot.RUnlock()
var targetIDs []event.TargetID
for _, rmap := range evnot.bucketRulesMap {
for _, rules := range rmap {
for _, targetSet := range rules {
for id := range targetSet {
targetIDs = append(targetIDs, id)
}
}
}
}
// Filter out targets configured via env
var tIDs []event.TargetID
for _, targetID := range targetIDs {
if !globalEnvTargetList.Exists(targetID) {
tIDs = append(tIDs, targetID)
}
}
return tIDs
}
// RemoveNotification - removes all notification configuration for bucket name.
func (evnot *EventNotifier) RemoveNotification(bucketName string) {
evnot.Lock()
defer evnot.Unlock()
delete(evnot.bucketRulesMap, bucketName)
targetIDSet := event.NewTargetIDSet()
for targetID := range evnot.bucketRemoteTargetRulesMap[bucketName] {
targetIDSet[targetID] = struct{}{}
delete(evnot.bucketRemoteTargetRulesMap[bucketName], targetID)
}
evnot.targetList.Remove(targetIDSet)
delete(evnot.bucketRemoteTargetRulesMap, bucketName)
}
// RemoveAllRemoteTargets - closes and removes all notification targets.
func (evnot *EventNotifier) RemoveAllRemoteTargets() {
evnot.Lock()
defer evnot.Unlock()
for _, targetMap := range evnot.bucketRemoteTargetRulesMap {
targetIDSet := event.NewTargetIDSet()
for k := range targetMap {
targetIDSet[k] = struct{}{}
}
evnot.targetList.Remove(targetIDSet)
}
}
// Send - sends event data to all matching targets.
func (evnot *EventNotifier) Send(args eventArgs) {
evnot.RLock()
targetIDSet := evnot.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name)
evnot.RUnlock()
if len(targetIDSet) == 0 {
return
}
evnot.targetList.Send(args.ToEvent(true), targetIDSet, evnot.targetResCh)
}
type eventArgs struct {
EventName event.Name
BucketName string
Object ObjectInfo
ReqParams map[string]string
RespElements map[string]string
Host string
UserAgent string
}
// ToEvent - converts to notification event.
func (args eventArgs) ToEvent(escape bool) event.Event {
eventTime := UTCNow()
uniqueID := fmt.Sprintf("%X", eventTime.UnixNano())
respElements := map[string]string{
"x-amz-request-id": args.RespElements["requestId"],
"x-minio-origin-endpoint": func() string {
if globalMinioEndpoint != "" {
return globalMinioEndpoint
}
return getAPIEndpoints()[0]
}(), // MinIO specific custom elements.
}
// Add deployment as part of
if globalDeploymentID != "" {
respElements["x-minio-deployment-id"] = globalDeploymentID
}
if args.RespElements["content-length"] != "" {
respElements["content-length"] = args.RespElements["content-length"]
}
keyName := args.Object.Name
if escape {
keyName = url.QueryEscape(args.Object.Name)
}
newEvent := event.Event{
EventVersion: "2.0",
EventSource: "minio:s3",
AwsRegion: args.ReqParams["region"],
EventTime: eventTime.Format(event.AMZTimeFormat),
EventName: args.EventName,
UserIdentity: event.Identity{PrincipalID: args.ReqParams["principalId"]},
RequestParameters: args.ReqParams,
ResponseElements: respElements,
S3: event.Metadata{
SchemaVersion: "1.0",
ConfigurationID: "Config",
Bucket: event.Bucket{
Name: args.BucketName,
OwnerIdentity: event.Identity{PrincipalID: args.ReqParams["principalId"]},
ARN: policy.ResourceARNPrefix + args.BucketName,
},
Object: event.Object{
Key: keyName,
VersionID: args.Object.VersionID,
Sequencer: uniqueID,
},
},
Source: event.Source{
Host: args.Host,
UserAgent: args.UserAgent,
},
}
if args.EventName != event.ObjectRemovedDelete && args.EventName != event.ObjectRemovedDeleteMarkerCreated {
newEvent.S3.Object.ETag = args.Object.ETag
newEvent.S3.Object.Size = args.Object.Size
newEvent.S3.Object.ContentType = args.Object.ContentType
newEvent.S3.Object.UserMetadata = make(map[string]string, len(args.Object.UserDefined))
for k, v := range args.Object.UserDefined {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
continue
}
newEvent.S3.Object.UserMetadata[k] = v
}
}
return newEvent
}
func sendEvent(args eventArgs) {
args.Object.Size, _ = args.Object.GetActualSize()
// avoid generating a notification for REPLICA creation event.
if _, ok := args.ReqParams[xhttp.MinIOSourceReplicationRequest]; ok {
return
}
// remove sensitive encryption entries in metadata.
crypto.RemoveSensitiveEntries(args.Object.UserDefined)
crypto.RemoveInternalEntries(args.Object.UserDefined)
// globalNotificationSys is not initialized in gateway mode.
if globalNotificationSys == nil {
return
}
if globalHTTPListen.NumSubscribers(args.EventName) > 0 {
globalHTTPListen.Publish(args.ToEvent(false))
}
globalEventNotifier.Send(args)
}

View File

@ -316,7 +316,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
} }
logger.FatalIf(globalBucketMetadataSys.Init(GlobalContext, buckets, newObject), "Unable to initialize bucket metadata") logger.FatalIf(globalBucketMetadataSys.Init(GlobalContext, buckets, newObject), "Unable to initialize bucket metadata")
logger.FatalIf(globalNotificationSys.InitBucketTargets(GlobalContext, newObject), "Unable to initialize bucket targets for notification system") logger.FatalIf(globalEventNotifier.InitBucketTargets(GlobalContext, newObject), "Unable to initialize bucket targets for notification system")
} }
if globalCacheConfig.Enabled { if globalCacheConfig.Enabled {

View File

@ -188,7 +188,9 @@ var (
// globalConfigSys server config system. // globalConfigSys server config system.
globalConfigSys *ConfigSys globalConfigSys *ConfigSys
globalNotificationSys *NotificationSys globalNotificationSys *NotificationSys
globalEventNotifier *EventNotifier
globalConfigTargetList *event.TargetList globalConfigTargetList *event.TargetList
// globalEnvTargetList has list of targets configured via env. // globalEnvTargetList has list of targets configured via env.
globalEnvTargetList *event.TargetList globalEnvTargetList *event.TargetList

View File

@ -26,7 +26,6 @@ import (
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
"strings"
"sync" "sync"
"time" "time"
@ -35,49 +34,18 @@ import (
"github.com/klauspost/compress/zip" "github.com/klauspost/compress/zip"
"github.com/minio/madmin-go" "github.com/minio/madmin-go"
bucketBandwidth "github.com/minio/minio/internal/bucket/bandwidth" bucketBandwidth "github.com/minio/minio/internal/bucket/bandwidth"
"github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/event"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup" "github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/pkg/bucket/policy"
xnet "github.com/minio/pkg/net" xnet "github.com/minio/pkg/net"
) )
// This file contains peer related notifications. For sending notifications to
// external systems, see event-notification.go
// NotificationSys - notification system. // NotificationSys - notification system.
type NotificationSys struct { type NotificationSys struct {
sync.RWMutex peerClients []*peerRESTClient // Excludes self
targetList *event.TargetList allPeerClients []*peerRESTClient // Includes nil client for self
targetResCh chan event.TargetIDResult
bucketRulesMap map[string]event.RulesMap
bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap
peerClients []*peerRESTClient // Excludes self
allPeerClients []*peerRESTClient // Includes nil client for self
}
// GetARNList - returns available ARNs.
func (sys *NotificationSys) GetARNList(onlyActive bool) []string {
arns := []string{}
if sys == nil {
return arns
}
region := globalSite.Region
for targetID, target := range sys.targetList.TargetMap() {
// httpclient target is part of ListenNotification
// which doesn't need to be listed as part of the ARN list
// This list is only meant for external targets, filter
// this out pro-actively.
if !strings.HasPrefix(targetID.ID, "httpclient+") {
if onlyActive && !target.HasQueueStore() {
if _, err := target.IsActive(); err != nil {
continue
}
}
arns = append(arns, targetID.ToARN(region).String())
}
}
return arns
} }
// NotificationPeerErr returns error associated for a remote peer. // NotificationPeerErr returns error associated for a remote peer.
@ -550,7 +518,7 @@ func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName
globalReplicationStats.Delete(bucketName) globalReplicationStats.Delete(bucketName)
globalBucketMetadataSys.Remove(bucketName) globalBucketMetadataSys.Remove(bucketName)
globalBucketTargetSys.Delete(bucketName) globalBucketTargetSys.Delete(bucketName)
globalNotificationSys.RemoveNotification(bucketName) globalEventNotifier.RemoveNotification(bucketName)
globalBucketConnStats.delete(bucketName) globalBucketConnStats.delete(bucketName)
if localMetacacheMgr != nil { if localMetacacheMgr != nil {
localMetacacheMgr.deleteBucketCache(bucketName) localMetacacheMgr.deleteBucketCache(bucketName)
@ -684,150 +652,6 @@ func (sys *NotificationSys) LoadTransitionTierConfig(ctx context.Context) {
} }
} }
// Loads notification policies for all buckets into NotificationSys.
func (sys *NotificationSys) set(bucket BucketInfo, meta BucketMetadata) {
config := meta.notificationConfig
if config == nil {
return
}
config.SetRegion(globalSite.Region)
if err := config.Validate(globalSite.Region, globalNotificationSys.targetList); err != nil {
if _, ok := err.(*event.ErrARNNotFound); !ok {
logger.LogIf(GlobalContext, err)
}
}
sys.AddRulesMap(bucket.Name, config.ToRulesMap())
}
// InitBucketTargets - initializes notification system from notification.xml of all buckets.
func (sys *NotificationSys) InitBucketTargets(ctx context.Context, objAPI ObjectLayer) error {
if objAPI == nil {
return errServerNotInitialized
}
// In gateway mode, notifications are not supported - except NAS gateway.
if globalIsGateway && !objAPI.IsNotificationSupported() {
return nil
}
logger.LogIf(ctx, sys.targetList.Add(globalConfigTargetList.Targets()...))
go func() {
for res := range sys.targetResCh {
if res.Err != nil {
reqInfo := &logger.ReqInfo{}
reqInfo.AppendTags("targetID", res.ID.Name)
logger.LogOnceIf(logger.SetReqInfo(GlobalContext, reqInfo), res.Err, res.ID.String())
}
}
}()
return nil
}
// AddRulesMap - adds rules map for bucket name.
func (sys *NotificationSys) AddRulesMap(bucketName string, rulesMap event.RulesMap) {
sys.Lock()
defer sys.Unlock()
rulesMap = rulesMap.Clone()
for _, targetRulesMap := range sys.bucketRemoteTargetRulesMap[bucketName] {
rulesMap.Add(targetRulesMap)
}
// Do not add for an empty rulesMap.
if len(rulesMap) == 0 {
delete(sys.bucketRulesMap, bucketName)
} else {
sys.bucketRulesMap[bucketName] = rulesMap
}
}
// RemoveRulesMap - removes rules map for bucket name.
func (sys *NotificationSys) RemoveRulesMap(bucketName string, rulesMap event.RulesMap) {
sys.Lock()
defer sys.Unlock()
sys.bucketRulesMap[bucketName].Remove(rulesMap)
if len(sys.bucketRulesMap[bucketName]) == 0 {
delete(sys.bucketRulesMap, bucketName)
}
}
// ConfiguredTargetIDs - returns list of configured target id's
func (sys *NotificationSys) ConfiguredTargetIDs() []event.TargetID {
if sys == nil {
return nil
}
sys.RLock()
defer sys.RUnlock()
var targetIDs []event.TargetID
for _, rmap := range sys.bucketRulesMap {
for _, rules := range rmap {
for _, targetSet := range rules {
for id := range targetSet {
targetIDs = append(targetIDs, id)
}
}
}
}
// Filter out targets configured via env
var tIDs []event.TargetID
for _, targetID := range targetIDs {
if !globalEnvTargetList.Exists(targetID) {
tIDs = append(tIDs, targetID)
}
}
return tIDs
}
// RemoveNotification - removes all notification configuration for bucket name.
func (sys *NotificationSys) RemoveNotification(bucketName string) {
sys.Lock()
defer sys.Unlock()
delete(sys.bucketRulesMap, bucketName)
targetIDSet := event.NewTargetIDSet()
for targetID := range sys.bucketRemoteTargetRulesMap[bucketName] {
targetIDSet[targetID] = struct{}{}
delete(sys.bucketRemoteTargetRulesMap[bucketName], targetID)
}
sys.targetList.Remove(targetIDSet)
delete(sys.bucketRemoteTargetRulesMap, bucketName)
}
// RemoveAllRemoteTargets - closes and removes all notification targets.
func (sys *NotificationSys) RemoveAllRemoteTargets() {
sys.Lock()
defer sys.Unlock()
for _, targetMap := range sys.bucketRemoteTargetRulesMap {
targetIDSet := event.NewTargetIDSet()
for k := range targetMap {
targetIDSet[k] = struct{}{}
}
sys.targetList.Remove(targetIDSet)
}
}
// Send - sends event data to all matching targets.
func (sys *NotificationSys) Send(args eventArgs) {
sys.RLock()
targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name)
sys.RUnlock()
if len(targetIDSet) == 0 {
return
}
sys.targetList.Send(args.ToEvent(true), targetIDSet, sys.targetResCh)
}
// GetCPUs - Get all CPU information. // GetCPUs - Get all CPU information.
func (sys *NotificationSys) GetCPUs(ctx context.Context) []madmin.CPUs { func (sys *NotificationSys) GetCPUs(ctx context.Context) []madmin.CPUs {
reply := make([]madmin.CPUs, len(sys.peerClients)) reply := make([]madmin.CPUs, len(sys.peerClients))
@ -1190,117 +1014,11 @@ func NewNotificationSys(endpoints EndpointServerPools) *NotificationSys {
// targetList/bucketRulesMap/bucketRemoteTargetRulesMap are populated by NotificationSys.Init() // targetList/bucketRulesMap/bucketRemoteTargetRulesMap are populated by NotificationSys.Init()
remote, all := newPeerRestClients(endpoints) remote, all := newPeerRestClients(endpoints)
return &NotificationSys{ return &NotificationSys{
targetList: event.NewTargetList(), peerClients: remote,
targetResCh: make(chan event.TargetIDResult), allPeerClients: all,
bucketRulesMap: make(map[string]event.RulesMap),
bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap),
peerClients: remote,
allPeerClients: all,
} }
} }
type eventArgs struct {
EventName event.Name
BucketName string
Object ObjectInfo
ReqParams map[string]string
RespElements map[string]string
Host string
UserAgent string
}
// ToEvent - converts to notification event.
func (args eventArgs) ToEvent(escape bool) event.Event {
eventTime := UTCNow()
uniqueID := fmt.Sprintf("%X", eventTime.UnixNano())
respElements := map[string]string{
"x-amz-request-id": args.RespElements["requestId"],
"x-minio-origin-endpoint": func() string {
if globalMinioEndpoint != "" {
return globalMinioEndpoint
}
return getAPIEndpoints()[0]
}(), // MinIO specific custom elements.
}
// Add deployment as part of
if globalDeploymentID != "" {
respElements["x-minio-deployment-id"] = globalDeploymentID
}
if args.RespElements["content-length"] != "" {
respElements["content-length"] = args.RespElements["content-length"]
}
keyName := args.Object.Name
if escape {
keyName = url.QueryEscape(args.Object.Name)
}
newEvent := event.Event{
EventVersion: "2.0",
EventSource: "minio:s3",
AwsRegion: args.ReqParams["region"],
EventTime: eventTime.Format(event.AMZTimeFormat),
EventName: args.EventName,
UserIdentity: event.Identity{PrincipalID: args.ReqParams["principalId"]},
RequestParameters: args.ReqParams,
ResponseElements: respElements,
S3: event.Metadata{
SchemaVersion: "1.0",
ConfigurationID: "Config",
Bucket: event.Bucket{
Name: args.BucketName,
OwnerIdentity: event.Identity{PrincipalID: args.ReqParams["principalId"]},
ARN: policy.ResourceARNPrefix + args.BucketName,
},
Object: event.Object{
Key: keyName,
VersionID: args.Object.VersionID,
Sequencer: uniqueID,
},
},
Source: event.Source{
Host: args.Host,
UserAgent: args.UserAgent,
},
}
if args.EventName != event.ObjectRemovedDelete && args.EventName != event.ObjectRemovedDeleteMarkerCreated {
newEvent.S3.Object.ETag = args.Object.ETag
newEvent.S3.Object.Size = args.Object.Size
newEvent.S3.Object.ContentType = args.Object.ContentType
newEvent.S3.Object.UserMetadata = make(map[string]string, len(args.Object.UserDefined))
for k, v := range args.Object.UserDefined {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
continue
}
newEvent.S3.Object.UserMetadata[k] = v
}
}
return newEvent
}
func sendEvent(args eventArgs) {
args.Object.Size, _ = args.Object.GetActualSize()
// avoid generating a notification for REPLICA creation event.
if _, ok := args.ReqParams[xhttp.MinIOSourceReplicationRequest]; ok {
return
}
// remove sensitive encryption entries in metadata.
crypto.RemoveSensitiveEntries(args.Object.UserDefined)
crypto.RemoveInternalEntries(args.Object.UserDefined)
// globalNotificationSys is not initialized in gateway mode.
if globalNotificationSys == nil {
return
}
if globalHTTPListen.NumSubscribers(args.EventName) > 0 {
globalHTTPListen.Publish(args.ToEvent(false))
}
globalNotificationSys.Send(args)
}
// GetBandwidthReports - gets the bandwidth report from all nodes including self. // GetBandwidthReports - gets the bandwidth report from all nodes including self.
func (sys *NotificationSys) GetBandwidthReports(ctx context.Context, buckets ...string) madmin.BucketBandwidthReport { func (sys *NotificationSys) GetBandwidthReports(ctx context.Context, buckets ...string) madmin.BucketBandwidthReport {
reports := make([]*madmin.BucketBandwidthReport, len(sys.peerClients)) reports := make([]*madmin.BucketBandwidthReport, len(sys.peerClients))

View File

@ -512,7 +512,7 @@ func (s *peerRESTServer) DeleteBucketMetadataHandler(w http.ResponseWriter, r *h
globalReplicationStats.Delete(bucketName) globalReplicationStats.Delete(bucketName)
globalBucketMetadataSys.Remove(bucketName) globalBucketMetadataSys.Remove(bucketName)
globalBucketTargetSys.Delete(bucketName) globalBucketTargetSys.Delete(bucketName)
globalNotificationSys.RemoveNotification(bucketName) globalEventNotifier.RemoveNotification(bucketName)
globalBucketConnStats.delete(bucketName) globalBucketConnStats.delete(bucketName)
if localMetacacheMgr != nil { if localMetacacheMgr != nil {
localMetacacheMgr.deleteBucketCache(bucketName) localMetacacheMgr.deleteBucketCache(bucketName)
@ -605,7 +605,7 @@ func (s *peerRESTServer) LoadBucketMetadataHandler(w http.ResponseWriter, r *htt
globalBucketMetadataSys.Set(bucketName, meta) globalBucketMetadataSys.Set(bucketName, meta)
if meta.notificationConfig != nil { if meta.notificationConfig != nil {
globalNotificationSys.AddRulesMap(bucketName, meta.notificationConfig.ToRulesMap()) globalEventNotifier.AddRulesMap(bucketName, meta.notificationConfig.ToRulesMap())
} }
if meta.bucketTargetConfig != nil { if meta.bucketTargetConfig != nil {
@ -702,7 +702,7 @@ func (s *peerRESTServer) PutBucketNotificationHandler(w http.ResponseWriter, r *
return return
} }
globalNotificationSys.AddRulesMap(bucketName, rulesMap) globalEventNotifier.AddRulesMap(bucketName, rulesMap)
} }
// Return disk IDs of all the local disks. // Return disk IDs of all the local disks.

View File

@ -249,9 +249,12 @@ func initAllSubsystems() {
globalBackgroundHealState = newHealState(false) globalBackgroundHealState = newHealState(false)
globalHealStateLK.Unlock() globalHealStateLK.Unlock()
// Create new notification system and initialize notification peer targets // Initialize notification peer targets
globalNotificationSys = NewNotificationSys(globalEndpoints) globalNotificationSys = NewNotificationSys(globalEndpoints)
// Create new notification system
globalEventNotifier = NewEventNotifier()
// Create new bucket metadata system. // Create new bucket metadata system.
if globalBucketMetadataSys == nil { if globalBucketMetadataSys == nil {
globalBucketMetadataSys = NewBucketMetadataSys() globalBucketMetadataSys = NewBucketMetadataSys()
@ -630,7 +633,7 @@ func serverMain(ctx *cli.Context) {
globalSiteReplicationSys.Init(GlobalContext, newObject) globalSiteReplicationSys.Init(GlobalContext, newObject)
// Initialize bucket notification targets. // Initialize bucket notification targets.
globalNotificationSys.InitBucketTargets(GlobalContext, newObject) globalEventNotifier.InitBucketTargets(GlobalContext, newObject)
// initialize the new disk cache objects. // initialize the new disk cache objects.
if globalCacheConfig.Enabled { if globalCacheConfig.Enabled {

View File

@ -163,7 +163,7 @@ func printEventNotifiers() {
return return
} }
arns := globalNotificationSys.GetARNList(true) arns := globalEventNotifier.GetARNList(true)
if len(arns) == 0 { if len(arns) == 0 {
return return
} }

View File

@ -50,8 +50,8 @@ func handleSignals() {
// send signal to various go-routines that they need to quit. // send signal to various go-routines that they need to quit.
cancelGlobalContext() cancelGlobalContext()
if globalNotificationSys != nil { if globalEventNotifier != nil {
globalNotificationSys.RemoveAllRemoteTargets() globalEventNotifier.RemoveAllRemoteTargets()
} }
if httpServer := newHTTPServerFn(); httpServer != nil { if httpServer := newHTTPServerFn(); httpServer != nil {