events: Change event notifiers to delete and update keys. (#2742)

ElasticSearch and Redis are both treated like a database.
Each indexs are based on the object names uniquely indentifying
the event. Upon each delete event of the named object deletes
the index on elasticsearch and redis respectively.
This commit is contained in:
Harshavardhana 2016-09-20 02:11:17 -07:00 committed by GitHub
parent c4964232eb
commit 0a3448c8b6
3 changed files with 40 additions and 11 deletions

View File

@ -204,7 +204,9 @@ func eventNotify(event eventData) {
targetLog := globalEventNotifier.GetQueueTarget(qConfig.QueueARN)
if targetLog != nil {
targetLog.WithFields(logrus.Fields{
"Records": notificationEvent,
"Key": objectName,
"EventType": eventType,
"Records": notificationEvent,
}).Info()
}
}

View File

@ -96,11 +96,29 @@ func newElasticNotify(accountID string) (*logrus.Logger, error) {
// Fire is required to implement logrus hook
func (q elasticClient) Fire(entry *logrus.Entry) error {
_, err := q.Client.Index().Index(q.params.Index).
Type("event").
BodyJson(entry.Data).
Do()
// Reflect on eventType and Key on their native type.
entryStr, ok := entry.Data["EventType"].(string)
if !ok {
return nil
}
keyStr, ok := entry.Data["Key"].(string)
if !ok {
return nil
}
// If event matches as delete, we purge the previous index.
if eventMatch(entryStr, []string{"s3:ObjectRemoved:*"}) {
_, err := q.Client.DeleteIndex(keyStr).Do()
if err != nil {
return err
}
return nil
} // else we update elastic index or create a new one.
_, err := q.Client.Index().Index(keyStr).
Type("event").
BodyJson(map[string]interface{}{
"Records": entry.Data["Records"],
}).Do()
return err
}

View File

@ -47,7 +47,7 @@ func dialRedis(rNotify redisNotify) (*redis.Pool, error) {
password := rNotify.Password
rPool := &redis.Pool{
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
IdleTimeout: 240 * time.Second, // Time 2minutes.
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", addr)
if err != nil {
@ -113,13 +113,22 @@ func (r redisConn) Fire(entry *logrus.Entry) error {
rConn := r.Pool.Get()
defer rConn.Close()
data, err := entry.String()
if err != nil {
return err
// Fetch event type upon reflecting on its original type.
entryStr, ok := entry.Data["EventType"].(string)
if !ok {
return nil
}
_, err = rConn.Do("RPUSH", r.params.Key, data)
if err != nil {
// Match the event if its a delete request, attempt to delete the key
if eventMatch(entryStr, []string{"s3:ObjectRemoved:*"}) {
if _, err := rConn.Do("DEL", entry.Data["Key"]); err != nil {
return err
}
return nil
} // else save this as new entry or update any existing ones.
if _, err := rConn.Do("SET", entry.Data["Key"], map[string]interface{}{
"Records": entry.Data["Records"],
}); err != nil {
return err
}
return nil