From 0a3448c8b6f896fa1920a2e79917b06d0866d2a7 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 20 Sep 2016 02:11:17 -0700 Subject: [PATCH] events: Change event notifiers to delete and update keys. (#2742) ElasticSearch and Redis are both treated like a database. Each indexs are based on the object names uniquely indentifying the event. Upon each delete event of the named object deletes the index on elasticsearch and redis respectively. --- cmd/event-notifier.go | 4 +++- cmd/notify-elasticsearch.go | 26 ++++++++++++++++++++++---- cmd/notify-redis.go | 21 +++++++++++++++------ 3 files changed, 40 insertions(+), 11 deletions(-) diff --git a/cmd/event-notifier.go b/cmd/event-notifier.go index c872d1891..286894f70 100644 --- a/cmd/event-notifier.go +++ b/cmd/event-notifier.go @@ -204,7 +204,9 @@ func eventNotify(event eventData) { targetLog := globalEventNotifier.GetQueueTarget(qConfig.QueueARN) if targetLog != nil { targetLog.WithFields(logrus.Fields{ - "Records": notificationEvent, + "Key": objectName, + "EventType": eventType, + "Records": notificationEvent, }).Info() } } diff --git a/cmd/notify-elasticsearch.go b/cmd/notify-elasticsearch.go index a69818928..92f7cb3db 100644 --- a/cmd/notify-elasticsearch.go +++ b/cmd/notify-elasticsearch.go @@ -96,11 +96,29 @@ func newElasticNotify(accountID string) (*logrus.Logger, error) { // Fire is required to implement logrus hook func (q elasticClient) Fire(entry *logrus.Entry) error { - _, err := q.Client.Index().Index(q.params.Index). - Type("event"). - BodyJson(entry.Data). - Do() + // Reflect on eventType and Key on their native type. + entryStr, ok := entry.Data["EventType"].(string) + if !ok { + return nil + } + keyStr, ok := entry.Data["Key"].(string) + if !ok { + return nil + } + // If event matches as delete, we purge the previous index. + if eventMatch(entryStr, []string{"s3:ObjectRemoved:*"}) { + _, err := q.Client.DeleteIndex(keyStr).Do() + if err != nil { + return err + } + return nil + } // else we update elastic index or create a new one. + _, err := q.Client.Index().Index(keyStr). + Type("event"). + BodyJson(map[string]interface{}{ + "Records": entry.Data["Records"], + }).Do() return err } diff --git a/cmd/notify-redis.go b/cmd/notify-redis.go index a59ea51a9..6a2d2ddf0 100644 --- a/cmd/notify-redis.go +++ b/cmd/notify-redis.go @@ -47,7 +47,7 @@ func dialRedis(rNotify redisNotify) (*redis.Pool, error) { password := rNotify.Password rPool := &redis.Pool{ MaxIdle: 3, - IdleTimeout: 240 * time.Second, + IdleTimeout: 240 * time.Second, // Time 2minutes. Dial: func() (redis.Conn, error) { c, err := redis.Dial("tcp", addr) if err != nil { @@ -113,13 +113,22 @@ func (r redisConn) Fire(entry *logrus.Entry) error { rConn := r.Pool.Get() defer rConn.Close() - data, err := entry.String() - if err != nil { - return err + // Fetch event type upon reflecting on its original type. + entryStr, ok := entry.Data["EventType"].(string) + if !ok { + return nil } - _, err = rConn.Do("RPUSH", r.params.Key, data) - if err != nil { + // Match the event if its a delete request, attempt to delete the key + if eventMatch(entryStr, []string{"s3:ObjectRemoved:*"}) { + if _, err := rConn.Do("DEL", entry.Data["Key"]); err != nil { + return err + } + return nil + } // else save this as new entry or update any existing ones. + if _, err := rConn.Do("SET", entry.Data["Key"], map[string]interface{}{ + "Records": entry.Data["Records"], + }); err != nil { return err } return nil