mirror of
https://github.com/minio/minio.git
synced 2025-11-07 21:02:58 -05:00
Add access format support for Elasticsearch notification target (#4006)
This change adds `access` format support for notifications to a Elasticsearch server, and it refactors `namespace` format support. In the case of `access` format, for each event in Minio, a JSON document is inserted into Elasticsearch with its timestamp set to the event's timestamp, and with the ID generated automatically by elasticsearch. No events are modified or deleted in this mode. In the case of `namespace` format, for each event in Minio, a JSON document is keyed together by the bucket and object name is updated in Elasticsearch. In the case of an object being created or over-written in Minio, a new document or an existing document is inserted into the Elasticsearch index. If an object is deleted in Minio, the corresponding document is deleted from the Elasticsearch index. Additionally, this change upgrades Elasticsearch support to the 5.x series. This is a breaking change, and users of previous elasticsearch versions should upgrade. Also updates documentation on Elasticsearch notification target usage and has a link to an elasticsearch upgrade guide. This is the last patch that finally resolves #3928.
This commit is contained in:
committed by
Harshavardhana
parent
2040d32ef8
commit
a2a8d54bb6
@@ -18,6 +18,7 @@ package cmd
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/minio/minio/pkg/wildcard"
|
||||
)
|
||||
@@ -221,3 +222,16 @@ func filterRuleMatch(object string, frs []filterRule) bool {
|
||||
}
|
||||
return prefixMatch && suffixMatch
|
||||
}
|
||||
|
||||
// A type to represent dynamic error generation functions for
|
||||
// notifications.
|
||||
type notificationErrorFactoryFunc func(string, ...interface{}) error
|
||||
|
||||
// A function to build dynamic error generation functions for
|
||||
// notifications by setting an error prefix string.
|
||||
func newNotificationErrorFactory(prefix string) notificationErrorFactoryFunc {
|
||||
return func(msg string, a ...interface{}) error {
|
||||
s := fmt.Sprintf(msg, a...)
|
||||
return fmt.Errorf("%s: %s", prefix, s)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,14 +17,20 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/minio/sha256-simd"
|
||||
"gopkg.in/olivere/elastic.v3"
|
||||
"gopkg.in/olivere/elastic.v5"
|
||||
)
|
||||
|
||||
var (
|
||||
esErrFunc = newNotificationErrorFactory("Elasticsearch")
|
||||
|
||||
errESFormat = esErrFunc(`"format" value is invalid - it must be one of "%s" or "%s".`, formatNamespace, formatAccess)
|
||||
errESIndex = esErrFunc("Index name was not specified in the configuration.")
|
||||
)
|
||||
|
||||
// elasticQueue is a elasticsearch event notification queue.
|
||||
@@ -39,14 +45,15 @@ func (e *elasticSearchNotify) Validate() error {
|
||||
if !e.Enable {
|
||||
return nil
|
||||
}
|
||||
if e.Format != formatNamespace {
|
||||
return fmt.Errorf(
|
||||
"Elasticsearch Notifier Error: \"format\" must be \"%s\"",
|
||||
formatNamespace)
|
||||
if e.Format != formatNamespace && e.Format != formatAccess {
|
||||
return errESFormat
|
||||
}
|
||||
if _, err := checkURL(e.URL); err != nil {
|
||||
return err
|
||||
}
|
||||
if e.Index == "" {
|
||||
return errESIndex
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -60,15 +67,11 @@ func dialElastic(esNotify elasticSearchNotify) (*elastic.Client, error) {
|
||||
if !esNotify.Enable {
|
||||
return nil, errNotifyNotEnabled
|
||||
}
|
||||
client, err := elastic.NewClient(
|
||||
return elastic.NewClient(
|
||||
elastic.SetURL(esNotify.URL),
|
||||
elastic.SetSniff(false),
|
||||
elastic.SetMaxRetries(10),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func newElasticNotify(accountID string) (*logrus.Logger, error) {
|
||||
@@ -77,23 +80,26 @@ func newElasticNotify(accountID string) (*logrus.Logger, error) {
|
||||
// Dial to elastic search.
|
||||
client, err := dialElastic(esNotify)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, esErrFunc("Error dialing the server: %v", err)
|
||||
}
|
||||
|
||||
// Use the IndexExists service to check if a specified index exists.
|
||||
exists, err := client.IndexExists(esNotify.Index).Do()
|
||||
exists, err := client.IndexExists(esNotify.Index).
|
||||
Do(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, esErrFunc("Error checking if index exists: %v", err)
|
||||
}
|
||||
// Index does not exist, attempt to create it.
|
||||
if !exists {
|
||||
var createIndex *elastic.IndicesCreateResult
|
||||
createIndex, err = client.CreateIndex(esNotify.Index).Do()
|
||||
createIndex, err = client.CreateIndex(esNotify.Index).
|
||||
Do(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, esErrFunc("Error creating index `%s`: %v",
|
||||
esNotify.Index, err)
|
||||
}
|
||||
if !createIndex.Acknowledged {
|
||||
return nil, errors.New("Index not created")
|
||||
return nil, esErrFunc("Index not created")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,7 +124,7 @@ func newElasticNotify(accountID string) (*logrus.Logger, error) {
|
||||
}
|
||||
|
||||
// Fire is required to implement logrus hook
|
||||
func (q elasticClient) Fire(entry *logrus.Entry) error {
|
||||
func (q elasticClient) Fire(entry *logrus.Entry) (err error) {
|
||||
// Reflect on eventType and Key on their native type.
|
||||
entryStr, ok := entry.Data["EventType"].(string)
|
||||
if !ok {
|
||||
@@ -129,25 +135,44 @@ func (q elasticClient) Fire(entry *logrus.Entry) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Calculate a unique key id. Choosing sha256 here.
|
||||
shaKey := sha256.Sum256([]byte(keyStr))
|
||||
keyStr = hex.EncodeToString(shaKey[:])
|
||||
|
||||
// If event matches as delete, we purge the previous index.
|
||||
if eventMatch(entryStr, []string{"s3:ObjectRemoved:*"}) {
|
||||
_, err := q.Client.Delete().Index(q.params.Index).
|
||||
Type("event").Id(keyStr).Do()
|
||||
if err != nil {
|
||||
return err
|
||||
switch q.params.Format {
|
||||
case formatNamespace:
|
||||
// If event matches as delete, we purge the previous index.
|
||||
if eventMatch(entryStr, []string{"s3:ObjectRemoved:*"}) {
|
||||
_, err = q.Client.Delete().Index(q.params.Index).
|
||||
Type("event").Id(keyStr).Do(context.Background())
|
||||
break
|
||||
} // else we update elastic index or create a new one.
|
||||
_, err = q.Client.Index().Index(q.params.Index).
|
||||
Type("event").
|
||||
BodyJson(map[string]interface{}{
|
||||
"Records": entry.Data["Records"],
|
||||
}).Id(keyStr).Do(context.Background())
|
||||
case formatAccess:
|
||||
// eventTime is taken from the first entry in the
|
||||
// records.
|
||||
events, ok := entry.Data["Records"].([]NotificationEvent)
|
||||
if !ok {
|
||||
return esErrFunc("Unable to extract event time due to conversion error of entry.Data[\"Records\"]=%v", entry.Data["Records"])
|
||||
}
|
||||
return nil
|
||||
} // else we update elastic index or create a new one.
|
||||
_, err := q.Client.Index().Index(q.params.Index).
|
||||
Type("event").
|
||||
BodyJson(map[string]interface{}{
|
||||
"Records": entry.Data["Records"],
|
||||
}).Id(keyStr).Do()
|
||||
return err
|
||||
var eventTime time.Time
|
||||
eventTime, err = time.Parse(timeFormatAMZ, events[0].EventTime)
|
||||
if err != nil {
|
||||
return esErrFunc("Unable to parse event time \"%s\": %v",
|
||||
events[0].EventTime, err)
|
||||
}
|
||||
// Extract event time in milliseconds for Elasticsearch.
|
||||
eventTimeStr := fmt.Sprintf("%d", eventTime.UnixNano()/1000000)
|
||||
_, err = q.Client.Index().Index(q.params.Index).Type("event").
|
||||
Timestamp(eventTimeStr).
|
||||
BodyJson(map[string]interface{}{
|
||||
"Records": entry.Data["Records"],
|
||||
}).Do(context.Background())
|
||||
}
|
||||
if err != nil {
|
||||
return esErrFunc("Error inserting/deleting entry: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Required for logrus hook implementation
|
||||
|
||||
@@ -90,14 +90,11 @@ VALUES (?, ?);`
|
||||
tableExistsMySQL = `SELECT 1 FROM %s;`
|
||||
)
|
||||
|
||||
func makeMySQLError(msg string, a ...interface{}) error {
|
||||
s := fmt.Sprintf(msg, a...)
|
||||
return fmt.Errorf("MySQL Notifier Error: %s", s)
|
||||
}
|
||||
|
||||
var (
|
||||
myNFormatError = makeMySQLError(`"format" value is invalid - it must be one of "%s" or "%s".`, formatNamespace, formatAccess)
|
||||
myNTableError = makeMySQLError("Table was not specified in the configuration.")
|
||||
mysqlErrFunc = newNotificationErrorFactory("MySQL")
|
||||
|
||||
errMysqlFormat = mysqlErrFunc(`"format" value is invalid - it must be one of "%s" or "%s".`, formatNamespace, formatAccess)
|
||||
errMysqlTable = mysqlErrFunc("Table was not specified in the configuration.")
|
||||
)
|
||||
|
||||
type mySQLNotify struct {
|
||||
@@ -127,7 +124,7 @@ func (m *mySQLNotify) Validate() error {
|
||||
return nil
|
||||
}
|
||||
if m.Format != formatNamespace && m.Format != formatAccess {
|
||||
return myNFormatError
|
||||
return errMysqlFormat
|
||||
}
|
||||
if m.DsnString == "" {
|
||||
if _, err := checkURL(m.Host); err != nil {
|
||||
@@ -135,7 +132,7 @@ func (m *mySQLNotify) Validate() error {
|
||||
}
|
||||
}
|
||||
if m.Table == "" {
|
||||
return myNTableError
|
||||
return errMysqlTable
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -169,7 +166,7 @@ func dialMySQL(msql mySQLNotify) (mySQLConn, error) {
|
||||
|
||||
db, err := sql.Open("mysql", dsnStr)
|
||||
if err != nil {
|
||||
return mySQLConn{}, makeMySQLError(
|
||||
return mySQLConn{}, mysqlErrFunc(
|
||||
"Connection opening failure (dsnStr=%s): %v",
|
||||
dsnStr, err)
|
||||
}
|
||||
@@ -177,7 +174,7 @@ func dialMySQL(msql mySQLNotify) (mySQLConn, error) {
|
||||
// ping to check that server is actually reachable.
|
||||
err = db.Ping()
|
||||
if err != nil {
|
||||
return mySQLConn{}, makeMySQLError(
|
||||
return mySQLConn{}, mysqlErrFunc(
|
||||
"Ping to server failed with: %v", err)
|
||||
}
|
||||
|
||||
@@ -193,7 +190,7 @@ func dialMySQL(msql mySQLNotify) (mySQLConn, error) {
|
||||
_, errCreate := db.Exec(fmt.Sprintf(createStmt, msql.Table))
|
||||
if errCreate != nil {
|
||||
// failed to create the table. error out.
|
||||
return mySQLConn{}, makeMySQLError(
|
||||
return mySQLConn{}, mysqlErrFunc(
|
||||
"'Select' failed with %v, then 'Create Table' failed with %v",
|
||||
err, errCreate,
|
||||
)
|
||||
@@ -209,21 +206,21 @@ func dialMySQL(msql mySQLNotify) (mySQLConn, error) {
|
||||
msql.Table))
|
||||
if err != nil {
|
||||
return mySQLConn{},
|
||||
makeMySQLError("create UPSERT prepared statement failed with: %v", err)
|
||||
mysqlErrFunc("create UPSERT prepared statement failed with: %v", err)
|
||||
}
|
||||
// delete statement
|
||||
stmts["deleteRow"], err = db.Prepare(fmt.Sprintf(deleteRowForNSMySQL,
|
||||
msql.Table))
|
||||
if err != nil {
|
||||
return mySQLConn{},
|
||||
makeMySQLError("create DELETE prepared statement failed with: %v", err)
|
||||
mysqlErrFunc("create DELETE prepared statement failed with: %v", err)
|
||||
}
|
||||
case formatAccess:
|
||||
// insert statement
|
||||
stmts["insertRow"], err = db.Prepare(fmt.Sprintf(insertRowForAccessMySQL,
|
||||
msql.Table))
|
||||
if err != nil {
|
||||
return mySQLConn{}, makeMySQLError(
|
||||
return mySQLConn{}, mysqlErrFunc(
|
||||
"create INSERT prepared statement failed with: %v", err)
|
||||
}
|
||||
|
||||
@@ -274,7 +271,7 @@ func (myC mySQLConn) Fire(entry *logrus.Entry) error {
|
||||
"Records": d,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, makeMySQLError(
|
||||
return nil, mysqlErrFunc(
|
||||
"Unable to encode event %v to JSON: %v", d, err)
|
||||
}
|
||||
return value, nil
|
||||
@@ -287,7 +284,7 @@ func (myC mySQLConn) Fire(entry *logrus.Entry) error {
|
||||
// delete row from the table
|
||||
_, err := myC.preparedStmts["deleteRow"].Exec(entry.Data["Key"])
|
||||
if err != nil {
|
||||
return makeMySQLError(
|
||||
return mysqlErrFunc(
|
||||
"Error deleting event with key = %v - got mysql error - %v",
|
||||
entry.Data["Key"], err,
|
||||
)
|
||||
@@ -301,7 +298,7 @@ func (myC mySQLConn) Fire(entry *logrus.Entry) error {
|
||||
// upsert row into the table
|
||||
_, err = myC.preparedStmts["upsertRow"].Exec(entry.Data["Key"], value)
|
||||
if err != nil {
|
||||
return makeMySQLError(
|
||||
return mysqlErrFunc(
|
||||
"Unable to upsert event with Key=%v and Value=%v - got mysql error - %v",
|
||||
entry.Data["Key"], entry.Data["Records"], err,
|
||||
)
|
||||
@@ -312,11 +309,11 @@ func (myC mySQLConn) Fire(entry *logrus.Entry) error {
|
||||
// records.
|
||||
events, ok := entry.Data["Records"].([]NotificationEvent)
|
||||
if !ok {
|
||||
return makeMySQLError("unable to extract event time due to conversion error of entry.Data[\"Records\"]=%v", entry.Data["Records"])
|
||||
return mysqlErrFunc("unable to extract event time due to conversion error of entry.Data[\"Records\"]=%v", entry.Data["Records"])
|
||||
}
|
||||
eventTime, err := time.Parse(timeFormatAMZ, events[0].EventTime)
|
||||
if err != nil {
|
||||
return makeMySQLError("unable to parse event time \"%s\": %v",
|
||||
return mysqlErrFunc("unable to parse event time \"%s\": %v",
|
||||
events[0].EventTime, err)
|
||||
}
|
||||
|
||||
@@ -327,7 +324,7 @@ func (myC mySQLConn) Fire(entry *logrus.Entry) error {
|
||||
|
||||
_, err = myC.preparedStmts["insertRow"].Exec(eventTime, value)
|
||||
if err != nil {
|
||||
return makeMySQLError("Unable to insert event with value=%v: %v",
|
||||
return mysqlErrFunc("Unable to insert event with value=%v: %v",
|
||||
value, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,14 +98,11 @@ VALUES ($1, $2);`
|
||||
tableExists = `SELECT 1 FROM %s;`
|
||||
)
|
||||
|
||||
func makePGError(msg string, a ...interface{}) error {
|
||||
s := fmt.Sprintf(msg, a...)
|
||||
return fmt.Errorf("PostgreSQL Notifier Error: %s", s)
|
||||
}
|
||||
|
||||
var (
|
||||
pgNFormatError = makePGError(`"format" value is invalid - it must be one of "%s" or "%s".`, formatNamespace, formatAccess)
|
||||
pgNTableError = makePGError("Table was not specified in the configuration.")
|
||||
pgErrFunc = newNotificationErrorFactory("PostgreSQL")
|
||||
|
||||
errPGFormatError = pgErrFunc(`"format" value is invalid - it must be one of "%s" or "%s".`, formatNamespace, formatAccess)
|
||||
errPGTableError = pgErrFunc("Table was not specified in the configuration.")
|
||||
)
|
||||
|
||||
type postgreSQLNotify struct {
|
||||
@@ -135,7 +132,7 @@ func (p *postgreSQLNotify) Validate() error {
|
||||
return nil
|
||||
}
|
||||
if p.Format != formatNamespace && p.Format != formatAccess {
|
||||
return pgNFormatError
|
||||
return errPGFormatError
|
||||
}
|
||||
if p.ConnectionString == "" {
|
||||
if _, err := checkURL(p.Host); err != nil {
|
||||
@@ -143,7 +140,7 @@ func (p *postgreSQLNotify) Validate() error {
|
||||
}
|
||||
}
|
||||
if p.Table == "" {
|
||||
return pgNTableError
|
||||
return errPGTableError
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -182,7 +179,7 @@ func dialPostgreSQL(pgN postgreSQLNotify) (pgConn, error) {
|
||||
|
||||
db, err := sql.Open("postgres", connStr)
|
||||
if err != nil {
|
||||
return pgConn{}, makePGError(
|
||||
return pgConn{}, pgErrFunc(
|
||||
"Connection opening failure (connectionString=%s): %v",
|
||||
connStr, err)
|
||||
}
|
||||
@@ -190,7 +187,7 @@ func dialPostgreSQL(pgN postgreSQLNotify) (pgConn, error) {
|
||||
// ping to check that server is actually reachable.
|
||||
err = db.Ping()
|
||||
if err != nil {
|
||||
return pgConn{}, makePGError("Ping to server failed with: %v",
|
||||
return pgConn{}, pgErrFunc("Ping to server failed with: %v",
|
||||
err)
|
||||
}
|
||||
|
||||
@@ -206,7 +203,7 @@ func dialPostgreSQL(pgN postgreSQLNotify) (pgConn, error) {
|
||||
_, errCreate := db.Exec(fmt.Sprintf(createStmt, pgN.Table))
|
||||
if errCreate != nil {
|
||||
// failed to create the table. error out.
|
||||
return pgConn{}, makePGError(
|
||||
return pgConn{}, pgErrFunc(
|
||||
"'Select' failed with %v, then 'Create Table' failed with %v",
|
||||
err, errCreate,
|
||||
)
|
||||
@@ -221,14 +218,14 @@ func dialPostgreSQL(pgN postgreSQLNotify) (pgConn, error) {
|
||||
stmts["upsertRow"], err = db.Prepare(fmt.Sprintf(upsertRowForNS,
|
||||
pgN.Table))
|
||||
if err != nil {
|
||||
return pgConn{}, makePGError(
|
||||
return pgConn{}, pgErrFunc(
|
||||
"create UPSERT prepared statement failed with: %v", err)
|
||||
}
|
||||
// delete statement
|
||||
stmts["deleteRow"], err = db.Prepare(fmt.Sprintf(deleteRowForNS,
|
||||
pgN.Table))
|
||||
if err != nil {
|
||||
return pgConn{}, makePGError(
|
||||
return pgConn{}, pgErrFunc(
|
||||
"create DELETE prepared statement failed with: %v", err)
|
||||
}
|
||||
case formatAccess:
|
||||
@@ -236,7 +233,7 @@ func dialPostgreSQL(pgN postgreSQLNotify) (pgConn, error) {
|
||||
stmts["insertRow"], err = db.Prepare(fmt.Sprintf(insertRowForAccess,
|
||||
pgN.Table))
|
||||
if err != nil {
|
||||
return pgConn{}, makePGError(
|
||||
return pgConn{}, pgErrFunc(
|
||||
"create INSERT prepared statement failed with: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -279,7 +276,7 @@ func jsonEncodeEventData(d interface{}) ([]byte, error) {
|
||||
"Records": d,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, makePGError(
|
||||
return nil, pgErrFunc(
|
||||
"Unable to encode event %v to JSON: %v", d, err)
|
||||
}
|
||||
return value, nil
|
||||
@@ -301,7 +298,7 @@ func (pgC pgConn) Fire(entry *logrus.Entry) error {
|
||||
// delete row from the table
|
||||
_, err := pgC.preparedStmts["deleteRow"].Exec(entry.Data["Key"])
|
||||
if err != nil {
|
||||
return makePGError(
|
||||
return pgErrFunc(
|
||||
"Error deleting event with key=%v: %v",
|
||||
entry.Data["Key"], err,
|
||||
)
|
||||
@@ -315,7 +312,7 @@ func (pgC pgConn) Fire(entry *logrus.Entry) error {
|
||||
// upsert row into the table
|
||||
_, err = pgC.preparedStmts["upsertRow"].Exec(entry.Data["Key"], value)
|
||||
if err != nil {
|
||||
return makePGError(
|
||||
return pgErrFunc(
|
||||
"Unable to upsert event with key=%v and value=%v: %v",
|
||||
entry.Data["Key"], entry.Data["Records"], err,
|
||||
)
|
||||
@@ -326,11 +323,11 @@ func (pgC pgConn) Fire(entry *logrus.Entry) error {
|
||||
// records.
|
||||
events, ok := entry.Data["Records"].([]NotificationEvent)
|
||||
if !ok {
|
||||
return makePGError("unable to extract event time due to conversion error of entry.Data[\"Records\"]=%v", entry.Data["Records"])
|
||||
return pgErrFunc("unable to extract event time due to conversion error of entry.Data[\"Records\"]=%v", entry.Data["Records"])
|
||||
}
|
||||
eventTime, err := time.Parse(timeFormatAMZ, events[0].EventTime)
|
||||
if err != nil {
|
||||
return makePGError("unable to parse event time \"%s\": %v",
|
||||
return pgErrFunc("unable to parse event time \"%s\": %v",
|
||||
events[0].EventTime, err)
|
||||
}
|
||||
|
||||
@@ -341,7 +338,7 @@ func (pgC pgConn) Fire(entry *logrus.Entry) error {
|
||||
|
||||
_, err = pgC.preparedStmts["insertRow"].Exec(eventTime, value)
|
||||
if err != nil {
|
||||
return makePGError("Unable to insert event with value=%v: %v",
|
||||
return pgErrFunc("Unable to insert event with value=%v: %v",
|
||||
value, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@ package cmd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"time"
|
||||
@@ -27,14 +26,11 @@ import (
|
||||
"github.com/garyburd/redigo/redis"
|
||||
)
|
||||
|
||||
func makeRedisError(msg string, a ...interface{}) error {
|
||||
s := fmt.Sprintf(msg, a...)
|
||||
return fmt.Errorf("Redis Notifier Error: %s", s)
|
||||
}
|
||||
|
||||
var (
|
||||
rdNFormatError = makeRedisError(`"format" value is invalid - it must be one of "access" or "namespace".`)
|
||||
rdNKeyError = makeRedisError("Key was not specified in the configuration.")
|
||||
redisErrFunc = newNotificationErrorFactory("Redis")
|
||||
|
||||
errRedisFormat = redisErrFunc(`"format" value is invalid - it must be one of "access" or "namespace".`)
|
||||
errRedisKeyError = redisErrFunc("Key was not specified in the configuration.")
|
||||
)
|
||||
|
||||
// redisNotify to send logs to Redis server
|
||||
@@ -51,13 +47,13 @@ func (r *redisNotify) Validate() error {
|
||||
return nil
|
||||
}
|
||||
if r.Format != formatNamespace && r.Format != formatAccess {
|
||||
return rdNFormatError
|
||||
return errRedisFormat
|
||||
}
|
||||
if _, _, err := net.SplitHostPort(r.Addr); err != nil {
|
||||
return err
|
||||
}
|
||||
if r.Key == "" {
|
||||
return rdNKeyError
|
||||
return errRedisKeyError
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -106,13 +102,13 @@ func dialRedis(rNotify redisNotify) (*redis.Pool, error) {
|
||||
// Check connection.
|
||||
_, err := rConn.Do("PING")
|
||||
if err != nil {
|
||||
return nil, makeRedisError("Error connecting to server: %v", err)
|
||||
return nil, redisErrFunc("Error connecting to server: %v", err)
|
||||
}
|
||||
|
||||
// Test that Key is of desired type
|
||||
reply, err := redis.String(rConn.Do("TYPE", rNotify.Key))
|
||||
if err != nil {
|
||||
return nil, makeRedisError("Error getting type of Key=%s: %v",
|
||||
return nil, redisErrFunc("Error getting type of Key=%s: %v",
|
||||
rNotify.Key, err)
|
||||
}
|
||||
if reply != "none" {
|
||||
@@ -121,7 +117,7 @@ func dialRedis(rNotify redisNotify) (*redis.Pool, error) {
|
||||
expectedType = "list"
|
||||
}
|
||||
if reply != expectedType {
|
||||
return nil, makeRedisError(
|
||||
return nil, redisErrFunc(
|
||||
"Key=%s has type %s, but we expect it to be a %s",
|
||||
rNotify.Key, reply, expectedType)
|
||||
}
|
||||
@@ -137,7 +133,7 @@ func newRedisNotify(accountID string) (*logrus.Logger, error) {
|
||||
// Dial redis.
|
||||
rPool, err := dialRedis(rNotify)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, redisErrFunc("Error dialing server: %v", err)
|
||||
}
|
||||
|
||||
rrConn := redisConn{
|
||||
@@ -175,7 +171,7 @@ func (r redisConn) Fire(entry *logrus.Entry) error {
|
||||
if eventMatch(entryStr, []string{"s3:ObjectRemoved:*"}) {
|
||||
_, err := rConn.Do("HDEL", r.params.Key, entry.Data["Key"])
|
||||
if err != nil {
|
||||
return makeRedisError("Error deleting entry: %v",
|
||||
return redisErrFunc("Error deleting entry: %v",
|
||||
err)
|
||||
}
|
||||
return nil
|
||||
@@ -185,14 +181,14 @@ func (r redisConn) Fire(entry *logrus.Entry) error {
|
||||
"Records": entry.Data["Records"],
|
||||
})
|
||||
if err != nil {
|
||||
return makeRedisError(
|
||||
return redisErrFunc(
|
||||
"Unable to encode event %v to JSON: %v",
|
||||
entry.Data["Records"], err)
|
||||
}
|
||||
_, err = rConn.Do("HSET", r.params.Key, entry.Data["Key"],
|
||||
value)
|
||||
if err != nil {
|
||||
return makeRedisError("Error updating hash entry: %v",
|
||||
return redisErrFunc("Error updating hash entry: %v",
|
||||
err)
|
||||
}
|
||||
case formatAccess:
|
||||
@@ -200,18 +196,18 @@ func (r redisConn) Fire(entry *logrus.Entry) error {
|
||||
// records.
|
||||
events, ok := entry.Data["Records"].([]NotificationEvent)
|
||||
if !ok {
|
||||
return makeRedisError("unable to extract event time due to conversion error of entry.Data[\"Records\"]=%v", entry.Data["Records"])
|
||||
return redisErrFunc("unable to extract event time due to conversion error of entry.Data[\"Records\"]=%v", entry.Data["Records"])
|
||||
}
|
||||
eventTime := events[0].EventTime
|
||||
|
||||
listEntry := []interface{}{eventTime, entry.Data["Records"]}
|
||||
jsonValue, err := json.Marshal(listEntry)
|
||||
if err != nil {
|
||||
return makeRedisError("JSON encoding error: %v", err)
|
||||
return redisErrFunc("JSON encoding error: %v", err)
|
||||
}
|
||||
_, err = rConn.Do("RPUSH", r.params.Key, jsonValue)
|
||||
if err != nil {
|
||||
return makeRedisError("Error appending to Redis list: %v",
|
||||
return redisErrFunc("Error appending to Redis list: %v",
|
||||
err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user