Peer RPCs for bucket notifications (#2877)

* Implements a Peer RPC router that sends info to all Minio servers in the cluster.
* Bucket notifications are propagated to all nodes via this RPC router.
* Bucket listener configuration is persisted to separate object layer
  file (`listener.json`) and peer RPCs are used to communicate changes
  throughout the cluster.
* When events are generated, RPC calls to send them to other servers
  where bucket listeners may be connected is implemented.
* Some bucket notification tests are now disabled as they cannot work in
  the new design.
* Minor fix in `funcFromPC` to use `path.Join`
This commit is contained in:
Aditya Manthramurthy
2016-10-12 01:03:50 -07:00
committed by Harshavardhana
parent a5921b5743
commit 6199aa0707
24 changed files with 1365 additions and 1113 deletions

View File

@@ -18,6 +18,7 @@ package cmd
import (
"bytes"
"encoding/json"
"encoding/xml"
"fmt"
"net"
@@ -29,14 +30,51 @@ import (
"github.com/Sirupsen/logrus"
)
// Global event notification queue. This is the queue that would be used to send all notifications.
type eventNotifier struct {
rwMutex *sync.RWMutex
// Collection of 'bucket' and notification config.
type externalNotifier struct {
// Per-bucket notification config. This is updated via
// PutBucketNotification API.
notificationConfigs map[string]*notificationConfig
snsTargets map[string][]chan []NotificationEvent
queueTargets map[string]*logrus.Logger
// An external target keeps a connection to an external
// service to which events are to be sent. It is a mapping
// from an ARN to a log object
targets map[string]*logrus.Logger
rwMutex *sync.RWMutex
}
type internalNotifier struct {
// per-bucket listener configuration. This is updated
// when listeners connect or disconnect.
listenerConfigs map[string][]listenerConfig
// An internal target is a peer Minio server, that is
// connected to a listening client. Here, targets is a map of
// listener ARN to log object.
targets map[string]*listenerLogger
// Connected listeners is a map of listener ARNs to channels
// on which the ListenBucket API handler go routine is waiting
// for events to send to a client.
connectedListeners map[string]chan []NotificationEvent
rwMutex *sync.RWMutex
}
// Global event notification configuration. This structure has state
// about configured external notifications, and run-time configuration
// for listener notifications.
type eventNotifier struct {
// `external` here refers to notification configuration to
// send events to supported external systems
external externalNotifier
// `internal` refers to notification configuration for live
// listening clients. Events for a client are send from all
// servers, internally to a particular server that is
// connected to the client.
internal internalNotifier
}
// Represents data to be sent with notification event.
@@ -54,7 +92,8 @@ func newNotificationEvent(event eventData) NotificationEvent {
region := serverConfig.GetRegion()
tnow := time.Now().UTC()
sequencer := fmt.Sprintf("%X", tnow.UnixNano())
// Following blocks fills in all the necessary details of s3 event message structure.
// Following blocks fills in all the necessary details of s3
// event message structure.
// http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
nEvent := NotificationEvent{
EventVersion: "2.0",
@@ -96,85 +135,147 @@ func newNotificationEvent(event eventData) NotificationEvent {
return nEvent
}
// Fetch the saved queue target.
func (en eventNotifier) GetQueueTarget(queueARN string) *logrus.Logger {
return en.queueTargets[queueARN]
// Fetch the external target. No locking needed here since this map is
// never written after initial startup.
func (en eventNotifier) GetExternalTarget(queueARN string) *logrus.Logger {
return en.external.targets[queueARN]
}
func (en eventNotifier) GetSNSTarget(snsARN string) []chan []NotificationEvent {
en.rwMutex.RLock()
defer en.rwMutex.RUnlock()
return en.snsTargets[snsARN]
func (en eventNotifier) GetInternalTarget(arn string) *listenerLogger {
en.internal.rwMutex.RLock()
defer en.internal.rwMutex.RUnlock()
return en.internal.targets[arn]
}
// Set a new sns target for an input sns ARN.
func (en *eventNotifier) SetSNSTarget(snsARN string, listenerCh chan []NotificationEvent) error {
en.rwMutex.Lock()
defer en.rwMutex.Unlock()
func (en *eventNotifier) AddListenerChan(snsARN string, listenerCh chan []NotificationEvent) error {
if listenerCh == nil {
return errInvalidArgument
}
en.snsTargets[snsARN] = append(en.snsTargets[snsARN], listenerCh)
en.internal.rwMutex.Lock()
defer en.internal.rwMutex.Unlock()
en.internal.connectedListeners[snsARN] = listenerCh
return nil
}
// Remove sns target for an input sns ARN.
func (en *eventNotifier) RemoveSNSTarget(snsARN string, listenerCh chan []NotificationEvent) {
en.rwMutex.Lock()
defer en.rwMutex.Unlock()
snsTarget, ok := en.snsTargets[snsARN]
func (en *eventNotifier) RemoveListenerChan(snsARN string) {
en.internal.rwMutex.Lock()
defer en.internal.rwMutex.Unlock()
if en.internal.connectedListeners != nil {
delete(en.internal.connectedListeners, snsARN)
}
}
func (en *eventNotifier) SendListenerEvent(arn string, event []NotificationEvent) error {
en.internal.rwMutex.Lock()
defer en.internal.rwMutex.Unlock()
ch, ok := en.internal.connectedListeners[arn]
if ok {
for i, savedListenerCh := range snsTarget {
if listenerCh == savedListenerCh {
snsTarget = append(snsTarget[:i], snsTarget[i+1:]...)
if len(snsTarget) == 0 {
delete(en.snsTargets, snsARN)
break
}
en.snsTargets[snsARN] = snsTarget
ch <- event
}
// If the channel is not present we ignore the event.
return nil
}
// Fetch bucket notification config for an input bucket.
func (en eventNotifier) GetBucketNotificationConfig(bucket string) *notificationConfig {
en.external.rwMutex.RLock()
defer en.external.rwMutex.RUnlock()
return en.external.notificationConfigs[bucket]
}
func (en *eventNotifier) SetBucketNotificationConfig(bucket string, ncfg *notificationConfig) {
en.external.rwMutex.Lock()
if ncfg == nil {
delete(en.external.notificationConfigs, bucket)
} else {
en.external.notificationConfigs[bucket] = ncfg
}
en.external.rwMutex.Unlock()
}
func (en *eventNotifier) GetBucketListenerConfig(bucket string) []listenerConfig {
en.internal.rwMutex.RLock()
defer en.internal.rwMutex.RUnlock()
return en.internal.listenerConfigs[bucket]
}
func (en *eventNotifier) SetBucketListenerConfig(bucket string, lcfg []listenerConfig) error {
en.internal.rwMutex.Lock()
defer en.internal.rwMutex.Unlock()
if lcfg == nil {
delete(en.internal.listenerConfigs, bucket)
} else {
en.internal.listenerConfigs[bucket] = lcfg
}
// close all existing loggers and initialize again.
for _, v := range en.internal.targets {
v.lconn.Close()
}
en.internal.targets = make(map[string]*listenerLogger)
for _, lc := range lcfg {
logger, err := newListenerLogger(lc.TopicConfig.TopicARN,
lc.TargetServer)
if err != nil {
return err
}
en.internal.targets[lc.TopicConfig.TopicARN] = logger
}
return nil
}
func eventNotifyForBucketNotifications(eventType, objectName, bucketName string,
nEvent []NotificationEvent) {
nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName)
if nConfig == nil {
return
}
// Validate if the event and object match the queue configs.
for _, qConfig := range nConfig.QueueConfigs {
eventMatch := eventMatch(eventType, qConfig.Events)
ruleMatch := filterRuleMatch(objectName, qConfig.Filter.Key.FilterRules)
if eventMatch && ruleMatch {
targetLog := globalEventNotifier.GetExternalTarget(qConfig.QueueARN)
if targetLog != nil {
targetLog.WithFields(logrus.Fields{
"Key": path.Join(bucketName, objectName),
"EventType": eventType,
"Records": nEvent,
}).Info()
}
}
}
}
// Fetch bucket notification config for an input bucket.
func (en eventNotifier) GetBucketNotificationConfig(bucket string) *notificationConfig {
en.rwMutex.RLock()
defer en.rwMutex.RUnlock()
return en.notificationConfigs[bucket]
}
// Set a new notification config for a bucket, this operation will overwrite any previous
// notification configs for the bucket.
func (en *eventNotifier) SetBucketNotificationConfig(bucket string, notificationCfg *notificationConfig) error {
en.rwMutex.Lock()
defer en.rwMutex.Unlock()
if notificationCfg == nil {
return errInvalidArgument
func eventNotifyForBucketListeners(eventType, objectName, bucketName string,
nEvent []NotificationEvent) {
lCfgs := globalEventNotifier.GetBucketListenerConfig(bucketName)
if lCfgs == nil {
return
}
en.notificationConfigs[bucket] = notificationCfg
return nil
}
func (en *eventNotifier) AddTopicConfig(bucket string, topicCfg *topicConfig) error {
en.rwMutex.Lock()
defer en.rwMutex.Unlock()
if topicCfg == nil {
return errInvalidArgument
}
notificationCfg := en.notificationConfigs[bucket]
if notificationCfg == nil {
en.notificationConfigs[bucket] = &notificationConfig{
TopicConfigs: []topicConfig{*topicCfg},
// Validate if the event and object match listener configs
for _, lcfg := range lCfgs {
ruleMatch := filterRuleMatch(objectName, lcfg.TopicConfig.Filter.Key.FilterRules)
eventMatch := eventMatch(eventType, lcfg.TopicConfig.Events)
if eventMatch && ruleMatch {
targetLog := globalEventNotifier.GetInternalTarget(
lcfg.TopicConfig.TopicARN)
if targetLog != nil && targetLog.log != nil {
targetLog.log.WithFields(logrus.Fields{
"Key": path.Join(bucketName, objectName),
"EventType": eventType,
"Records": nEvent,
}).Info()
}
}
return nil
}
notificationCfg.TopicConfigs = append(notificationCfg.TopicConfigs, *topicCfg)
return nil
}
// eventNotify notifies an event to relevant targets based on their
// bucket notification configs.
// bucket configuration (notifications and listeners).
func eventNotify(event eventData) {
// Notifies a new event.
// List of events reported through this function are
@@ -184,15 +285,6 @@ func eventNotify(event eventData) {
// - s3:ObjectCreated:CompleteMultipartUpload
// - s3:ObjectRemoved:Delete
nConfig := globalEventNotifier.GetBucketNotificationConfig(event.Bucket)
// No bucket notifications enabled, drop the event notification.
if nConfig == nil {
return
}
if len(nConfig.QueueConfigs) == 0 && len(nConfig.TopicConfigs) == 0 && len(nConfig.LambdaConfigs) == 0 {
return
}
// Event type.
eventType := event.Type.String()
@@ -202,43 +294,26 @@ func eventNotify(event eventData) {
// Save the notification event to be sent.
notificationEvent := []NotificationEvent{newNotificationEvent(event)}
// Validate if the event and object match the queue configs.
for _, qConfig := range nConfig.QueueConfigs {
eventMatch := eventMatch(eventType, qConfig.Events)
ruleMatch := filterRuleMatch(objectName, qConfig.Filter.Key.FilterRules)
if eventMatch && ruleMatch {
targetLog := globalEventNotifier.GetQueueTarget(qConfig.QueueARN)
if targetLog != nil {
targetLog.WithFields(logrus.Fields{
"Key": path.Join(event.Bucket, objectName),
"EventType": eventType,
"Records": notificationEvent,
}).Info()
}
}
}
// Validate if the event and object match the sns configs.
for _, topicConfig := range nConfig.TopicConfigs {
ruleMatch := filterRuleMatch(objectName, topicConfig.Filter.Key.FilterRules)
eventMatch := eventMatch(eventType, topicConfig.Events)
if eventMatch && ruleMatch {
targetListeners := globalEventNotifier.GetSNSTarget(topicConfig.TopicARN)
for _, listener := range targetListeners {
listener <- notificationEvent
}
}
}
// Notify external targets.
eventNotifyForBucketNotifications(eventType, objectName, event.Bucket,
notificationEvent)
// Notify internal targets.
eventNotifyForBucketListeners(eventType, objectName, event.Bucket,
notificationEvent)
}
// loads notifcation config if any for a given bucket, returns back structured notification config.
// loads notification config if any for a given bucket, returns
// structured notification config.
func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationConfig, error) {
// Construct the notification config path.
notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, notificationConfigPath)
err = errorCause(err)
if err != nil {
// 'notification.xml' not found return 'errNoSuchNotifications'.
// This is default when no bucket notifications are found on the bucket.
// 'notification.xml' not found return
// 'errNoSuchNotifications'. This is default when no
// bucket notifications are found on the bucket.
switch err.(type) {
case ObjectNotFound:
return nil, errNoSuchNotifications
@@ -251,8 +326,9 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon
err = objAPI.GetObject(minioMetaBucket, notificationConfigPath, 0, objInfo.Size, &buffer)
err = errorCause(err)
if err != nil {
// 'notification.xml' not found return 'errNoSuchNotifications'.
// This is default when no bucket notifications are found on the bucket.
// 'notification.xml' not found return
// 'errNoSuchNotifications'. This is default when no
// bucket notifications are found on the bucket.
switch err.(type) {
case ObjectNotFound:
return nil, errNoSuchNotifications
@@ -267,36 +343,144 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon
notificationCfg := &notificationConfig{}
if err = xml.Unmarshal(notificationConfigBytes, &notificationCfg); err != nil {
return nil, err
} // Successfully marshalled notification configuration.
}
// Return success.
return notificationCfg, nil
}
// loads all bucket notifications if present.
func loadAllBucketNotifications(objAPI ObjectLayer) (map[string]*notificationConfig, error) {
// List buckets to proceed loading all notification configuration.
buckets, err := objAPI.ListBuckets()
// loads notification config if any for a given bucket, returns
// structured notification config.
func loadListenerConfig(bucket string, objAPI ObjectLayer) ([]listenerConfig, error) {
// Construct the notification config path.
listenerConfigPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig)
objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, listenerConfigPath)
err = errorCause(err)
if err != nil {
// 'listener.json' not found return
// 'errNoSuchNotifications'. This is default when no
// bucket notifications are found on the bucket.
switch err.(type) {
case ObjectNotFound:
return nil, errNoSuchNotifications
}
errorIf(err, "Unable to load bucket-listeners for bucket %s", bucket)
// Returns error for other errors.
return nil, err
}
var buffer bytes.Buffer
err = objAPI.GetObject(minioMetaBucket, listenerConfigPath, 0, objInfo.Size, &buffer)
err = errorCause(err)
if err != nil {
// 'notification.xml' not found return
// 'errNoSuchNotifications'. This is default when no
// bucket listners are found on the bucket.
switch err.(type) {
case ObjectNotFound:
return nil, errNoSuchNotifications
}
errorIf(err, "Unable to load bucket-listeners for bucket %s", bucket)
// Returns error for other errors.
return nil, err
}
configs := make(map[string]*notificationConfig)
// Unmarshal notification bytes.
var lCfg []listenerConfig
lConfigBytes := buffer.Bytes()
if err = json.Unmarshal(lConfigBytes, &lCfg); err != nil {
errorIf(err, "Unable to unmarshal listener config from JSON.")
return nil, err
}
// Return success.
return lCfg, nil
}
func persistNotificationConfig(bucket string, ncfg *notificationConfig, obj ObjectLayer) error {
// marshal to xml
buf, err := xml.Marshal(ncfg)
if err != nil {
errorIf(err, "Unable to marshal notification configuration into XML")
return err
}
// verify bucket exists
// FIXME: There is a race between this check and PutObject
if err = isBucketExist(bucket, obj); err != nil {
return err
}
// build path
ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
// write object to path
_, err = obj.PutObject(minioMetaBucket, ncPath, int64(len(buf)),
bytes.NewReader(buf), nil, "")
if err != nil {
errorIf(err, "Unable to write bucket notification configuration.")
return err
}
return nil
}
// Persists validated listener config to object layer.
func persistListenerConfig(bucket string, lcfg []listenerConfig, obj ObjectLayer) error {
buf, err := json.Marshal(lcfg)
if err != nil {
errorIf(err, "Unable to marshal listener config to JSON.")
return err
}
// verify bucket exists
// FIXME: There is a race between this check and PutObject
if err = isBucketExist(bucket, obj); err != nil {
return err
}
// build path
lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig)
// write object to path
_, err = obj.PutObject(minioMetaBucket, lcPath, int64(len(buf)),
bytes.NewReader(buf), nil, "")
if err != nil {
errorIf(err, "Unable to write bucket listener configuration to object layer.")
}
return err
}
// loads all bucket notifications if present.
func loadAllBucketNotifications(objAPI ObjectLayer) (map[string]*notificationConfig, map[string][]listenerConfig, error) {
// List buckets to proceed loading all notification configuration.
buckets, err := objAPI.ListBuckets()
if err != nil {
return nil, nil, err
}
nConfigs := make(map[string]*notificationConfig)
lConfigs := make(map[string][]listenerConfig)
// Loads all bucket notifications.
for _, bucket := range buckets {
nCfg, nErr := loadNotificationConfig(bucket.Name, objAPI)
if nErr != nil {
if nErr == errNoSuchNotifications {
continue
if nErr != errNoSuchNotifications {
return nil, nil, nErr
}
return nil, nErr
} else {
nConfigs[bucket.Name] = nCfg
}
lCfg, lErr := loadListenerConfig(bucket.Name, objAPI)
if lErr != nil {
if lErr != errNoSuchNotifications {
return nil, nil, lErr
}
} else {
lConfigs[bucket.Name] = lCfg
}
configs[bucket.Name] = nCfg
}
// Success.
return configs, nil
return nConfigs, lConfigs, nil
}
// Loads all queue targets, initializes each queueARNs depending on their config.
@@ -452,8 +636,9 @@ func initEventNotifier(objAPI ObjectLayer) error {
}
// Read all saved bucket notifications.
configs, err := loadAllBucketNotifications(objAPI)
nConfigs, lConfigs, err := loadAllBucketNotifications(objAPI)
if err != nil {
errorIf(err, "Error loading bucket notifications - %v", err)
return err
}
@@ -463,12 +648,36 @@ func initEventNotifier(objAPI ObjectLayer) error {
return err
}
// Inititalize event notifier queue.
// Initialize internal listener targets
listenTargets := make(map[string]*listenerLogger)
for _, listeners := range lConfigs {
for _, listener := range listeners {
ln, err := newListenerLogger(
listener.TopicConfig.TopicARN,
listener.TargetServer,
)
if err != nil {
errorIf(err, "Unable to initialize listener target logger.")
//TODO: improve error
return fmt.Errorf("Error initializing listner target logger - %v", err)
}
listenTargets[listener.TopicConfig.TopicARN] = ln
}
}
// Initialize event notifier queue.
globalEventNotifier = &eventNotifier{
rwMutex: &sync.RWMutex{},
notificationConfigs: configs,
queueTargets: queueTargets,
snsTargets: make(map[string][]chan []NotificationEvent),
external: externalNotifier{
notificationConfigs: nConfigs,
targets: queueTargets,
rwMutex: &sync.RWMutex{},
},
internal: internalNotifier{
rwMutex: &sync.RWMutex{},
targets: listenTargets,
listenerConfigs: lConfigs,
connectedListeners: make(map[string]chan []NotificationEvent),
},
}
return nil