mirror of
https://github.com/minio/minio.git
synced 2025-11-20 01:50:24 -05:00
server: Move all the top level files into cmd folder. (#2490)
This change brings a change which was done for the 'mc' package to allow for clean repo and have a cleaner github drop in experience.
This commit is contained in:
committed by
Anand Babu (AB) Periasamy
parent
73d1a46f3e
commit
bccf549463
384
cmd/event-notifier.go
Normal file
384
cmd/event-notifier.go
Normal file
@@ -0,0 +1,384 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/xml"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
)
|
||||
|
||||
// Global event notification queue. This is the queue that would be used to send all notifications.
|
||||
type eventNotifier struct {
|
||||
rwMutex *sync.RWMutex
|
||||
|
||||
// Collection of 'bucket' and notification config.
|
||||
notificationConfigs map[string]*notificationConfig
|
||||
snsTargets map[string][]chan []NotificationEvent
|
||||
queueTargets map[string]*logrus.Logger
|
||||
}
|
||||
|
||||
// Represents data to be sent with notification event.
|
||||
type eventData struct {
|
||||
Type EventName
|
||||
Bucket string
|
||||
ObjInfo ObjectInfo
|
||||
ReqParams map[string]string
|
||||
}
|
||||
|
||||
// New notification event constructs a new notification event message from
|
||||
// input request metadata which completed successfully.
|
||||
func newNotificationEvent(event eventData) NotificationEvent {
|
||||
/// Construct a new object created event.
|
||||
region := serverConfig.GetRegion()
|
||||
tnow := time.Now().UTC()
|
||||
sequencer := fmt.Sprintf("%X", tnow.UnixNano())
|
||||
// Following blocks fills in all the necessary details of s3 event message structure.
|
||||
// http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
|
||||
nEvent := NotificationEvent{
|
||||
EventVersion: "2.0",
|
||||
EventSource: "aws:s3",
|
||||
AwsRegion: region,
|
||||
EventTime: tnow.Format(timeFormatAMZ),
|
||||
EventName: event.Type.String(),
|
||||
UserIdentity: defaultIdentity(),
|
||||
RequestParameters: event.ReqParams,
|
||||
ResponseElements: map[string]string{},
|
||||
S3: eventMeta{
|
||||
SchemaVersion: "1.0",
|
||||
ConfigurationID: "Config",
|
||||
Bucket: bucketMeta{
|
||||
Name: event.Bucket,
|
||||
OwnerIdentity: defaultIdentity(),
|
||||
ARN: "arn:aws:s3:::" + event.Bucket,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
escapedObj := url.QueryEscape(event.ObjInfo.Name)
|
||||
// For delete object event type, we do not need to set ETag and Size.
|
||||
if event.Type == ObjectRemovedDelete {
|
||||
nEvent.S3.Object = objectMeta{
|
||||
Key: escapedObj,
|
||||
Sequencer: sequencer,
|
||||
}
|
||||
return nEvent
|
||||
}
|
||||
// For all other events we should set ETag and Size.
|
||||
nEvent.S3.Object = objectMeta{
|
||||
Key: escapedObj,
|
||||
ETag: event.ObjInfo.MD5Sum,
|
||||
Size: event.ObjInfo.Size,
|
||||
Sequencer: sequencer,
|
||||
}
|
||||
// Success.
|
||||
return nEvent
|
||||
}
|
||||
|
||||
// Fetch the saved queue target.
|
||||
func (en eventNotifier) GetQueueTarget(queueARN string) *logrus.Logger {
|
||||
return en.queueTargets[queueARN]
|
||||
}
|
||||
|
||||
func (en eventNotifier) GetSNSTarget(snsARN string) []chan []NotificationEvent {
|
||||
en.rwMutex.RLock()
|
||||
defer en.rwMutex.RUnlock()
|
||||
return en.snsTargets[snsARN]
|
||||
}
|
||||
|
||||
// Set a new sns target for an input sns ARN.
|
||||
func (en *eventNotifier) SetSNSTarget(snsARN string, listenerCh chan []NotificationEvent) error {
|
||||
en.rwMutex.Lock()
|
||||
defer en.rwMutex.Unlock()
|
||||
if listenerCh == nil {
|
||||
return errors.New("invalid argument")
|
||||
}
|
||||
en.snsTargets[snsARN] = append(en.snsTargets[snsARN], listenerCh)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove sns target for an input sns ARN.
|
||||
func (en *eventNotifier) RemoveSNSTarget(snsARN string, listenerCh chan []NotificationEvent) {
|
||||
en.rwMutex.Lock()
|
||||
defer en.rwMutex.Unlock()
|
||||
snsTarget, ok := en.snsTargets[snsARN]
|
||||
if ok {
|
||||
for i, savedListenerCh := range snsTarget {
|
||||
if listenerCh == savedListenerCh {
|
||||
snsTarget = append(snsTarget[:i], snsTarget[i+1:]...)
|
||||
if len(snsTarget) == 0 {
|
||||
delete(en.snsTargets, snsARN)
|
||||
break
|
||||
}
|
||||
en.snsTargets[snsARN] = snsTarget
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true if bucket notification is set for the bucket, false otherwise.
|
||||
func (en *eventNotifier) IsBucketNotificationSet(bucket string) bool {
|
||||
if en == nil {
|
||||
return false
|
||||
}
|
||||
en.rwMutex.RLock()
|
||||
defer en.rwMutex.RUnlock()
|
||||
_, ok := en.notificationConfigs[bucket]
|
||||
return ok
|
||||
}
|
||||
|
||||
// Fetch bucket notification config for an input bucket.
|
||||
func (en eventNotifier) GetBucketNotificationConfig(bucket string) *notificationConfig {
|
||||
en.rwMutex.RLock()
|
||||
defer en.rwMutex.RUnlock()
|
||||
return en.notificationConfigs[bucket]
|
||||
}
|
||||
|
||||
// Set a new notification config for a bucket, this operation will overwrite any previous
|
||||
// notification configs for the bucket.
|
||||
func (en *eventNotifier) SetBucketNotificationConfig(bucket string, notificationCfg *notificationConfig) error {
|
||||
en.rwMutex.Lock()
|
||||
defer en.rwMutex.Unlock()
|
||||
if notificationCfg == nil {
|
||||
return errors.New("invalid argument")
|
||||
}
|
||||
en.notificationConfigs[bucket] = notificationCfg
|
||||
return nil
|
||||
}
|
||||
|
||||
// eventNotify notifies an event to relevant targets based on their
|
||||
// bucket notification configs.
|
||||
func eventNotify(event eventData) {
|
||||
// Notifies a new event.
|
||||
// List of events reported through this function are
|
||||
// - s3:ObjectCreated:Put
|
||||
// - s3:ObjectCreated:Post
|
||||
// - s3:ObjectCreated:Copy
|
||||
// - s3:ObjectCreated:CompleteMultipartUpload
|
||||
// - s3:ObjectRemoved:Delete
|
||||
|
||||
nConfig := eventN.GetBucketNotificationConfig(event.Bucket)
|
||||
// No bucket notifications enabled, drop the event notification.
|
||||
if len(nConfig.QueueConfigs) == 0 && len(nConfig.TopicConfigs) == 0 && len(nConfig.LambdaConfigs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Event type.
|
||||
eventType := event.Type.String()
|
||||
|
||||
// Object name.
|
||||
objectName := event.ObjInfo.Name
|
||||
|
||||
// Save the notification event to be sent.
|
||||
notificationEvent := []NotificationEvent{newNotificationEvent(event)}
|
||||
|
||||
// Validate if the event and object match the queue configs.
|
||||
for _, qConfig := range nConfig.QueueConfigs {
|
||||
eventMatch := eventMatch(eventType, qConfig.Events)
|
||||
ruleMatch := filterRuleMatch(objectName, qConfig.Filter.Key.FilterRules)
|
||||
if eventMatch && ruleMatch {
|
||||
targetLog := eventN.GetQueueTarget(qConfig.QueueARN)
|
||||
if targetLog != nil {
|
||||
targetLog.WithFields(logrus.Fields{
|
||||
"Records": notificationEvent,
|
||||
}).Info()
|
||||
}
|
||||
}
|
||||
}
|
||||
// Validate if the event and object match the sns configs.
|
||||
for _, topicConfig := range nConfig.TopicConfigs {
|
||||
ruleMatch := filterRuleMatch(objectName, topicConfig.Filter.Key.FilterRules)
|
||||
eventMatch := eventMatch(eventType, topicConfig.Events)
|
||||
if eventMatch && ruleMatch {
|
||||
targetListeners := eventN.GetSNSTarget(topicConfig.TopicARN)
|
||||
for _, listener := range targetListeners {
|
||||
listener <- notificationEvent
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// loads notifcation config if any for a given bucket, returns back structured notification config.
|
||||
func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationConfig, error) {
|
||||
// Construct the notification config path.
|
||||
notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
|
||||
objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, notificationConfigPath)
|
||||
if err != nil {
|
||||
// 'notification.xml' not found return 'errNoSuchNotifications'.
|
||||
// This is default when no bucket notifications are found on the bucket.
|
||||
switch err.(type) {
|
||||
case ObjectNotFound:
|
||||
return nil, errNoSuchNotifications
|
||||
}
|
||||
// Returns error for other errors.
|
||||
return nil, err
|
||||
}
|
||||
var buffer bytes.Buffer
|
||||
err = objAPI.GetObject(minioMetaBucket, notificationConfigPath, 0, objInfo.Size, &buffer)
|
||||
if err != nil {
|
||||
// 'notification.xml' not found return 'errNoSuchNotifications'.
|
||||
// This is default when no bucket notifications are found on the bucket.
|
||||
switch err.(type) {
|
||||
case ObjectNotFound:
|
||||
return nil, errNoSuchNotifications
|
||||
}
|
||||
// Returns error for other errors.
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Unmarshal notification bytes.
|
||||
notificationConfigBytes := buffer.Bytes()
|
||||
notificationCfg := ¬ificationConfig{}
|
||||
if err = xml.Unmarshal(notificationConfigBytes, ¬ificationCfg); err != nil {
|
||||
return nil, err
|
||||
} // Successfully marshalled notification configuration.
|
||||
|
||||
// Return success.
|
||||
return notificationCfg, nil
|
||||
}
|
||||
|
||||
// loads all bucket notifications if present.
|
||||
func loadAllBucketNotifications(objAPI ObjectLayer) (map[string]*notificationConfig, error) {
|
||||
// List buckets to proceed loading all notification configuration.
|
||||
buckets, err := objAPI.ListBuckets()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
configs := make(map[string]*notificationConfig)
|
||||
|
||||
// Loads all bucket notifications.
|
||||
for _, bucket := range buckets {
|
||||
var nCfg *notificationConfig
|
||||
nCfg, err = loadNotificationConfig(bucket.Name, objAPI)
|
||||
if err != nil {
|
||||
if err == errNoSuchNotifications {
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
configs[bucket.Name] = nCfg
|
||||
}
|
||||
|
||||
// Success.
|
||||
return configs, nil
|
||||
}
|
||||
|
||||
// Loads all queue targets, initializes each queueARNs depending on their config.
|
||||
// Each instance of queueARN registers its own logrus to communicate with the
|
||||
// queue service. QueueARN once initialized is not initialized again for the
|
||||
// same queueARN, instead previous connection is used.
|
||||
func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
|
||||
queueTargets := make(map[string]*logrus.Logger)
|
||||
// Load all amqp targets, initialize their respective loggers.
|
||||
for accountID, amqpN := range serverConfig.GetAMQP() {
|
||||
if !amqpN.Enable {
|
||||
continue
|
||||
}
|
||||
// Construct the queue ARN for AMQP.
|
||||
queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeAMQP
|
||||
// Queue target if already initialized we move to the next ARN.
|
||||
_, ok := queueTargets[queueARN]
|
||||
if ok {
|
||||
continue
|
||||
}
|
||||
// Using accountID we can now initialize a new AMQP logrus instance.
|
||||
amqpLog, err := newAMQPNotify(accountID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
queueTargets[queueARN] = amqpLog
|
||||
}
|
||||
// Load redis targets, initialize their respective loggers.
|
||||
for accountID, redisN := range serverConfig.GetRedis() {
|
||||
if !redisN.Enable {
|
||||
continue
|
||||
}
|
||||
// Construct the queue ARN for Redis.
|
||||
queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeRedis
|
||||
// Queue target if already initialized we move to the next ARN.
|
||||
_, ok := queueTargets[queueARN]
|
||||
if ok {
|
||||
continue
|
||||
}
|
||||
// Using accountID we can now initialize a new Redis logrus instance.
|
||||
redisLog, err := newRedisNotify(accountID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
queueTargets[queueARN] = redisLog
|
||||
}
|
||||
// Load elastic targets, initialize their respective loggers.
|
||||
for accountID, elasticN := range serverConfig.GetElasticSearch() {
|
||||
if !elasticN.Enable {
|
||||
continue
|
||||
}
|
||||
// Construct the queue ARN for Elastic.
|
||||
queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeElastic
|
||||
_, ok := queueTargets[queueARN]
|
||||
if ok {
|
||||
continue
|
||||
}
|
||||
// Using accountID we can now initialize a new ElasticSearch logrus instance.
|
||||
elasticLog, err := newElasticNotify(accountID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
queueTargets[queueARN] = elasticLog
|
||||
}
|
||||
// Successfully initialized queue targets.
|
||||
return queueTargets, nil
|
||||
}
|
||||
|
||||
// Global instance of event notification queue.
|
||||
var eventN *eventNotifier
|
||||
|
||||
// Initialize event notifier.
|
||||
func initEventNotifier(objAPI ObjectLayer) error {
|
||||
if objAPI == nil {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
||||
// Read all saved bucket notifications.
|
||||
configs, err := loadAllBucketNotifications(objAPI)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Initializes all queue targets.
|
||||
queueTargets, err := loadAllQueueTargets()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Inititalize event notifier queue.
|
||||
eventN = &eventNotifier{
|
||||
rwMutex: &sync.RWMutex{},
|
||||
notificationConfigs: configs,
|
||||
queueTargets: queueTargets,
|
||||
snsTargets: make(map[string][]chan []NotificationEvent),
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user