mirror of
https://github.com/minio/minio.git
synced 2025-11-07 12:52:58 -05:00
Add access format support for Redis notification target (#3989)
This change adds `access` format support for notifications to a Redis server, and it refactors `namespace` format support. In the case of `access` format, a list is used to store Minio operations in Redis. Each entry in the list is a JSON encoded list of two items - the first is the Minio server timestamp of the event, and the second is an object describing the operation that created/replaced the object in the server. In the case of `namespace` format, a hash is used. Entries in the hash may be updated or removed if objects in Minio are updated or deleted respectively. The field values in the Redis hash are JSON encoded. Also updates documentation on Redis notification target usage. Towards resolving #3928
This commit is contained in:
committed by
Harshavardhana
parent
1caad902cb
commit
61b08137b0
@@ -17,6 +17,7 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"time"
|
||||
@@ -25,6 +26,16 @@ import (
|
||||
"github.com/garyburd/redigo/redis"
|
||||
)
|
||||
|
||||
func makeRedisError(msg string, a ...interface{}) error {
|
||||
s := fmt.Sprintf(msg, a...)
|
||||
return fmt.Errorf("Redis Notifier Error: %s", s)
|
||||
}
|
||||
|
||||
var (
|
||||
rdNFormatError = makeRedisError(`"format" value is invalid - it must be one of "access" or "namespace".`)
|
||||
rdNKeyError = makeRedisError("Key was not specified in the configuration.")
|
||||
)
|
||||
|
||||
// redisNotify to send logs to Redis server
|
||||
type redisNotify struct {
|
||||
Enable bool `json:"enable"`
|
||||
@@ -38,14 +49,15 @@ func (r *redisNotify) Validate() error {
|
||||
if !r.Enable {
|
||||
return nil
|
||||
}
|
||||
if r.Format != formatNamespace {
|
||||
return fmt.Errorf(
|
||||
"Redis Notifier Error: \"format\" must be \"%s\"",
|
||||
formatNamespace)
|
||||
if r.Format != formatNamespace && r.Format != formatAccess {
|
||||
return rdNFormatError
|
||||
}
|
||||
if _, err := checkURL(r.Addr); err != nil {
|
||||
return err
|
||||
}
|
||||
if r.Key == "" {
|
||||
return rdNKeyError
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -92,7 +104,25 @@ func dialRedis(rNotify redisNotify) (*redis.Pool, error) {
|
||||
// Check connection.
|
||||
_, err := rConn.Do("PING")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, makeRedisError("Error connecting to server: %v", err)
|
||||
}
|
||||
|
||||
// Test that Key is of desired type
|
||||
reply, err := redis.String(rConn.Do("TYPE", rNotify.Key))
|
||||
if err != nil {
|
||||
return nil, makeRedisError("Error getting type of Key=%s: %v",
|
||||
rNotify.Key, err)
|
||||
}
|
||||
if reply != "none" {
|
||||
expectedType := "hash"
|
||||
if rNotify.Format == formatAccess {
|
||||
expectedType = "list"
|
||||
}
|
||||
if reply != expectedType {
|
||||
return nil, makeRedisError(
|
||||
"Key=%s has type %s, but we expect it to be a %s",
|
||||
rNotify.Key, reply, expectedType)
|
||||
}
|
||||
}
|
||||
|
||||
// Return pool.
|
||||
@@ -137,17 +167,51 @@ func (r redisConn) Fire(entry *logrus.Entry) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Match the event if its a delete request, attempt to delete the key
|
||||
if eventMatch(entryStr, []string{"s3:ObjectRemoved:*"}) {
|
||||
if _, err := rConn.Do("DEL", entry.Data["Key"]); err != nil {
|
||||
return err
|
||||
switch r.params.Format {
|
||||
case formatNamespace:
|
||||
// Match the event if its a delete request, attempt to delete the key
|
||||
if eventMatch(entryStr, []string{"s3:ObjectRemoved:*"}) {
|
||||
_, err := rConn.Do("HDEL", r.params.Key, entry.Data["Key"])
|
||||
if err != nil {
|
||||
return makeRedisError("Error deleting entry: %v",
|
||||
err)
|
||||
}
|
||||
return nil
|
||||
} // else save this as new entry or update any existing ones.
|
||||
|
||||
value, err := json.Marshal(map[string]interface{}{
|
||||
"Records": entry.Data["Records"],
|
||||
})
|
||||
if err != nil {
|
||||
return makeRedisError(
|
||||
"Unable to encode event %v to JSON: %v",
|
||||
entry.Data["Records"], err)
|
||||
}
|
||||
_, err = rConn.Do("HSET", r.params.Key, entry.Data["Key"],
|
||||
value)
|
||||
if err != nil {
|
||||
return makeRedisError("Error updating hash entry: %v",
|
||||
err)
|
||||
}
|
||||
case formatAccess:
|
||||
// eventTime is taken from the first entry in the
|
||||
// records.
|
||||
events, ok := entry.Data["Records"].([]NotificationEvent)
|
||||
if !ok {
|
||||
return makeRedisError("unable to extract event time due to conversion error of entry.Data[\"Records\"]=%v", entry.Data["Records"])
|
||||
}
|
||||
eventTime := events[0].EventTime
|
||||
|
||||
listEntry := []interface{}{eventTime, entry.Data["Records"]}
|
||||
jsonValue, err := json.Marshal(listEntry)
|
||||
if err != nil {
|
||||
return makeRedisError("JSON encoding error: %v", err)
|
||||
}
|
||||
_, err = rConn.Do("RPUSH", r.params.Key, jsonValue)
|
||||
if err != nil {
|
||||
return makeRedisError("Error appending to Redis list: %v",
|
||||
err)
|
||||
}
|
||||
return nil
|
||||
} // else save this as new entry or update any existing ones.
|
||||
if _, err := rConn.Do("SET", entry.Data["Key"], map[string]interface{}{
|
||||
"Records": entry.Data["Records"],
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user