Add PostgreSQL notifier (#2739) (#2824)

* The user is required to specify a table name and database connection
  information in the configuration file.

* INSERTs and DELETEs are done via prepared statements for speed.

* Assumes a table structure, and requires PostgreSQL 9.5 or above due to
  the use of UPSERT.

* Creates the table if it does not exist with the given table name using
  a query like:

    CREATE TABLE myminio (
        key varchar PRIMARY KEY,
        value JSONB
    );

* Vendors some required libraries.
This commit is contained in:
Aditya Manthramurthy
2016-10-03 17:29:55 -07:00
committed by Harshavardhana
parent 4f902d42b2
commit 315e66858c
28 changed files with 6068 additions and 46 deletions

View File

@@ -149,11 +149,12 @@ func isMinioSNSConfigured(topicARN string, topicConfigs []topicConfig) bool {
// Validate if we recognize the queue type.
func isValidQueue(sqsARN arnSQS) bool {
amqpQ := isAMQPQueue(sqsARN) // Is amqp queue?.
natsQ := isNATSQueue(sqsARN) // Is nats queue?.
elasticQ := isElasticQueue(sqsARN) // Is elastic queue?.
redisQ := isRedisQueue(sqsARN) // Is redis queue?.
return amqpQ || natsQ || elasticQ || redisQ
amqpQ := isAMQPQueue(sqsARN) // Is amqp queue?
natsQ := isNATSQueue(sqsARN) // Is nats queue?
elasticQ := isElasticQueue(sqsARN) // Is elastic queue?
redisQ := isRedisQueue(sqsARN) // Is redis queue?
postgresQ := isPostgreSQLQueue(sqsARN) // Is postgres queue?
return amqpQ || natsQ || elasticQ || redisQ || postgresQ
}
// Validate if we recognize the topic type.
@@ -178,6 +179,10 @@ func isValidQueueID(queueARN string) bool {
} else if isRedisQueue(sqsARN) { // Redis queue.
redisN := serverConfig.GetRedisNotifyByID(sqsARN.AccountID)
return redisN.Enable && redisN.Addr != ""
} else if isPostgreSQLQueue(sqsARN) {
pgN := serverConfig.GetPostgreSQLNotifyByID(sqsARN.AccountID)
// Postgres can work with only default conn. info.
return pgN.Enable
}
return false
}
@@ -354,6 +359,7 @@ func unmarshalTopicARN(topicARN string) arnTopic {
// - nats
// - elasticsearch
// - redis
// - postgresql
func unmarshalSqsARN(queueARN string) (mSqs arnSQS) {
mSqs = arnSQS{}
if !strings.HasPrefix(queueARN, minioSqs+serverConfig.GetRegion()+":") {
@@ -369,6 +375,8 @@ func unmarshalSqsARN(queueARN string) (mSqs arnSQS) {
mSqs.Type = queueTypeElastic
case strings.HasSuffix(sqsType, queueTypeRedis):
mSqs.Type = queueTypeRedis
case strings.HasSuffix(sqsType, queueTypePostgreSQL):
mSqs.Type = queueTypePostgreSQL
} // Add more queues here.
mSqs.AccountID = strings.TrimSuffix(sqsType, ":"+mSqs.Type)
return mSqs

View File

@@ -336,7 +336,7 @@ func migrateV6ToV7() error {
// Save only the new fields, ignore the rest.
srvConfig := &serverConfigV7{}
srvConfig.Version = globalMinioConfigVersion
srvConfig.Version = "7"
srvConfig.Credential = cv6.Credential
srvConfig.Region = cv6.Region
if srvConfig.Region == "" {
@@ -400,7 +400,7 @@ func migrateV7ToV8() error {
// Save only the new fields, ignore the rest.
srvConfig := &serverConfigV8{}
srvConfig.Version = globalMinioConfigVersion
srvConfig.Version = "8"
srvConfig.Credential = cv7.Credential
srvConfig.Region = cv7.Region
if srvConfig.Region == "" {
@@ -414,6 +414,7 @@ func migrateV7ToV8() error {
srvConfig.Notify.NATS = make(map[string]natsNotify)
srvConfig.Notify.ElasticSearch = make(map[string]elasticSearchNotify)
srvConfig.Notify.Redis = make(map[string]redisNotify)
srvConfig.Notify.PostgreSQL = make(map[string]postgreSQLNotify)
if len(cv7.Notify.AMQP) == 0 {
srvConfig.Notify.AMQP["1"] = amqpNotify{}
} else {
@@ -452,3 +453,89 @@ func migrateV7ToV8() error {
console.Println("Migration from version " + cv7.Version + " to " + srvConfig.Version + " completed successfully.")
return nil
}
// Version '8' to '9' migration. Adds postgresql notifier
// configuration, but it's otherwise the same as V8.
func migrateV8ToV9() error {
cv8, err := loadConfigV8()
if err != nil {
if os.IsNotExist(err) {
return nil
}
return fmt.Errorf("Unable to load config version 8. %v", err)
}
if cv8.Version != "8" {
return nil
}
// Copy over fields from V8 into V9 config struct
srvConfig := &serverConfigV9{}
srvConfig.Version = "9"
srvConfig.Credential = cv8.Credential
srvConfig.Region = cv8.Region
if srvConfig.Region == "" {
// Region needs to be set for AWS Signature Version 4.
srvConfig.Region = "us-east-1"
}
srvConfig.Logger.Console = cv8.Logger.Console
srvConfig.Logger.File = cv8.Logger.File
srvConfig.Logger.Syslog = cv8.Logger.Syslog
// check and set notifiers config
if len(cv8.Notify.AMQP) == 0 {
srvConfig.Notify.AMQP = make(map[string]amqpNotify)
srvConfig.Notify.AMQP["1"] = amqpNotify{}
} else {
srvConfig.Notify.AMQP = cv8.Notify.AMQP
}
if len(cv8.Notify.NATS) == 0 {
srvConfig.Notify.NATS = make(map[string]natsNotify)
srvConfig.Notify.NATS["1"] = natsNotify{}
} else {
srvConfig.Notify.NATS = cv8.Notify.NATS
}
if len(cv8.Notify.ElasticSearch) == 0 {
srvConfig.Notify.ElasticSearch = make(map[string]elasticSearchNotify)
srvConfig.Notify.ElasticSearch["1"] = elasticSearchNotify{}
} else {
srvConfig.Notify.ElasticSearch = cv8.Notify.ElasticSearch
}
if len(cv8.Notify.Redis) == 0 {
srvConfig.Notify.Redis = make(map[string]redisNotify)
srvConfig.Notify.Redis["1"] = redisNotify{}
} else {
srvConfig.Notify.Redis = cv8.Notify.Redis
}
if len(cv8.Notify.PostgreSQL) == 0 {
srvConfig.Notify.PostgreSQL = make(map[string]postgreSQLNotify)
srvConfig.Notify.PostgreSQL["1"] = postgreSQLNotify{}
} else {
srvConfig.Notify.PostgreSQL = cv8.Notify.PostgreSQL
}
qc, err := quick.New(srvConfig)
if err != nil {
return fmt.Errorf("Unable to initialize the quick config. %v",
err)
}
configFile, err := getConfigFile()
if err != nil {
return fmt.Errorf("Unable to get config file. %v", err)
}
err = qc.Save(configFile)
if err != nil {
return fmt.Errorf(
"Failed to migrate config from "+
cv8.Version+" to "+srvConfig.Version+
" failed. %v", err,
)
}
console.Println(
"Migration from version " +
cv8.Version + " to " + srvConfig.Version +
" completed successfully.",
)
return nil
}

View File

@@ -23,9 +23,9 @@ import (
"testing"
)
const lastConfigVersion = 8
const lastConfigVersion = 9
// TestServerConfigMigrateV1 - tests if a config v1 is purged
// Test if config v1 is purged
func TestServerConfigMigrateV1(t *testing.T) {
rootPath, err := newTestConfig("us-east-1")
if err != nil {
@@ -58,7 +58,8 @@ func TestServerConfigMigrateV1(t *testing.T) {
}
}
// TestServerConfigMigrateV1 - tests if all migrate code return nil when config file is not existent
// Test if all migrate code returns nil when config file does not
// exist
func TestServerConfigMigrateInexistentConfig(t *testing.T) {
rootPath, err := newTestConfig("us-east-1")
if err != nil {
@@ -93,11 +94,13 @@ func TestServerConfigMigrateInexistentConfig(t *testing.T) {
if err := migrateV7ToV8(); err != nil {
t.Fatal("migrate v7 to v8 should succeed when no config file is found")
}
if err := migrateV8ToV9(); err != nil {
t.Fatal("migrate v8 to v9 should succeed when no config file is found")
}
}
// TestServerConfigMigrateV2toV8 - tests if a config from v2 to v8 is successfully done
func TestServerConfigMigrateV2toV8(t *testing.T) {
// Test if a config migration from v2 to v9 is successfully done
func TestServerConfigMigrateV2toV9(t *testing.T) {
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("Init Test config failed")
@@ -155,7 +158,7 @@ func TestServerConfigMigrateV2toV8(t *testing.T) {
}
}
// TestServerConfigMigrateFaultyConfig - checks if all migrate code return errors with corrupted config files
// Test if all migrate code returns error with corrupted config files
func TestServerConfigMigrateFaultyConfig(t *testing.T) {
rootPath, err := newTestConfig("us-east-1")
if err != nil {
@@ -191,4 +194,7 @@ func TestServerConfigMigrateFaultyConfig(t *testing.T) {
if err := migrateV7ToV8(); err == nil {
t.Fatal("migrateConfigV7ToV8() should fail with a corrupted json")
}
if err := migrateV8ToV9(); err == nil {
t.Fatal("migrateConfigV8ToV9() should fail with a corrupted json")
}
}

View File

@@ -350,3 +350,43 @@ func loadConfigV7() (*serverConfigV7, error) {
}
return c, nil
}
// serverConfigV8 server configuration version '8'. Adds NATS notifier
// configuration.
type serverConfigV8 struct {
Version string `json:"version"`
// S3 API configuration.
Credential credential `json:"credential"`
Region string `json:"region"`
// Additional error logging configuration.
Logger logger `json:"logger"`
// Notification queue configuration.
Notify notifier `json:"notify"`
// Read Write mutex.
rwMutex *sync.RWMutex
}
// loadConfigV8 load config version '8'.
func loadConfigV8() (*serverConfigV8, error) {
configFile, err := getConfigFile()
if err != nil {
return nil, err
}
if _, err = os.Stat(configFile); err != nil {
return nil, err
}
c := &serverConfigV8{}
c.Version = "8"
qc, err := quick.New(c)
if err != nil {
return nil, err
}
if err := qc.Load(configFile); err != nil {
return nil, err
}
return c, nil
}

View File

@@ -23,8 +23,9 @@ import (
"github.com/minio/minio/pkg/quick"
)
// serverConfigV8 server configuration version '8'.
type serverConfigV8 struct {
// serverConfigV9 server configuration version '9'. Adds PostgreSQL
// notifier configuration.
type serverConfigV9 struct {
Version string `json:"version"`
// S3 API configuration.
@@ -45,7 +46,7 @@ type serverConfigV8 struct {
func initConfig() error {
if !isConfigFileExists() {
// Initialize server config.
srvCfg := &serverConfigV8{}
srvCfg := &serverConfigV9{}
srvCfg.Version = globalMinioConfigVersion
srvCfg.Region = "us-east-1"
srvCfg.Credential = mustGenAccessKeys()
@@ -65,7 +66,8 @@ func initConfig() error {
srvCfg.Notify.Redis["1"] = redisNotify{}
srvCfg.Notify.NATS = make(map[string]natsNotify)
srvCfg.Notify.NATS["1"] = natsNotify{}
srvCfg.Notify.PostgreSQL = make(map[string]postgreSQLNotify)
srvCfg.Notify.PostgreSQL["1"] = postgreSQLNotify{}
srvCfg.rwMutex = &sync.RWMutex{}
// Create config path.
@@ -87,7 +89,7 @@ func initConfig() error {
if _, err = os.Stat(configFile); err != nil {
return err
}
srvCfg := &serverConfigV8{}
srvCfg := &serverConfigV9{}
srvCfg.Version = globalMinioConfigVersion
srvCfg.rwMutex = &sync.RWMutex{}
qc, err := quick.New(srvCfg)
@@ -106,10 +108,10 @@ func initConfig() error {
}
// serverConfig server config.
var serverConfig *serverConfigV8
var serverConfig *serverConfigV9
// GetVersion get current config version.
func (s serverConfigV8) GetVersion() string {
func (s serverConfigV9) GetVersion() string {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Version
@@ -117,155 +119,173 @@ func (s serverConfigV8) GetVersion() string {
/// Logger related.
func (s *serverConfigV8) SetAMQPNotifyByID(accountID string, amqpn amqpNotify) {
func (s *serverConfigV9) SetAMQPNotifyByID(accountID string, amqpn amqpNotify) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
s.Notify.AMQP[accountID] = amqpn
}
func (s serverConfigV8) GetAMQP() map[string]amqpNotify {
func (s serverConfigV9) GetAMQP() map[string]amqpNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.AMQP
}
// GetAMQPNotify get current AMQP logger.
func (s serverConfigV8) GetAMQPNotifyByID(accountID string) amqpNotify {
func (s serverConfigV9) GetAMQPNotifyByID(accountID string) amqpNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.AMQP[accountID]
}
//
func (s *serverConfigV8) SetNATSNotifyByID(accountID string, natsn natsNotify) {
func (s *serverConfigV9) SetNATSNotifyByID(accountID string, natsn natsNotify) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
s.Notify.NATS[accountID] = natsn
}
func (s serverConfigV8) GetNATS() map[string]natsNotify {
func (s serverConfigV9) GetNATS() map[string]natsNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.NATS
}
// GetNATSNotify get current NATS logger.
func (s serverConfigV8) GetNATSNotifyByID(accountID string) natsNotify {
func (s serverConfigV9) GetNATSNotifyByID(accountID string) natsNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.NATS[accountID]
}
func (s *serverConfigV8) SetElasticSearchNotifyByID(accountID string, esNotify elasticSearchNotify) {
func (s *serverConfigV9) SetElasticSearchNotifyByID(accountID string, esNotify elasticSearchNotify) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
s.Notify.ElasticSearch[accountID] = esNotify
}
func (s serverConfigV8) GetElasticSearch() map[string]elasticSearchNotify {
func (s serverConfigV9) GetElasticSearch() map[string]elasticSearchNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.ElasticSearch
}
// GetElasticSearchNotify get current ElasicSearch logger.
func (s serverConfigV8) GetElasticSearchNotifyByID(accountID string) elasticSearchNotify {
func (s serverConfigV9) GetElasticSearchNotifyByID(accountID string) elasticSearchNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.ElasticSearch[accountID]
}
func (s *serverConfigV8) SetRedisNotifyByID(accountID string, rNotify redisNotify) {
func (s *serverConfigV9) SetRedisNotifyByID(accountID string, rNotify redisNotify) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
s.Notify.Redis[accountID] = rNotify
}
func (s serverConfigV8) GetRedis() map[string]redisNotify {
func (s serverConfigV9) GetRedis() map[string]redisNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.Redis
}
// GetRedisNotify get current Redis logger.
func (s serverConfigV8) GetRedisNotifyByID(accountID string) redisNotify {
func (s serverConfigV9) GetRedisNotifyByID(accountID string) redisNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.Redis[accountID]
}
func (s *serverConfigV9) SetPostgreSQLNotifyByID(accountID string, pgn postgreSQLNotify) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
s.Notify.PostgreSQL[accountID] = pgn
}
func (s serverConfigV9) GetPostgreSQL() map[string]postgreSQLNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.PostgreSQL
}
func (s serverConfigV9) GetPostgreSQLNotifyByID(accountID string) postgreSQLNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.PostgreSQL[accountID]
}
// SetFileLogger set new file logger.
func (s *serverConfigV8) SetFileLogger(flogger fileLogger) {
func (s *serverConfigV9) SetFileLogger(flogger fileLogger) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
s.Logger.File = flogger
}
// GetFileLogger get current file logger.
func (s serverConfigV8) GetFileLogger() fileLogger {
func (s serverConfigV9) GetFileLogger() fileLogger {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Logger.File
}
// SetConsoleLogger set new console logger.
func (s *serverConfigV8) SetConsoleLogger(clogger consoleLogger) {
func (s *serverConfigV9) SetConsoleLogger(clogger consoleLogger) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
s.Logger.Console = clogger
}
// GetConsoleLogger get current console logger.
func (s serverConfigV8) GetConsoleLogger() consoleLogger {
func (s serverConfigV9) GetConsoleLogger() consoleLogger {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Logger.Console
}
// SetSyslogLogger set new syslog logger.
func (s *serverConfigV8) SetSyslogLogger(slogger syslogLogger) {
func (s *serverConfigV9) SetSyslogLogger(slogger syslogLogger) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
s.Logger.Syslog = slogger
}
// GetSyslogLogger get current syslog logger.
func (s *serverConfigV8) GetSyslogLogger() syslogLogger {
func (s *serverConfigV9) GetSyslogLogger() syslogLogger {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Logger.Syslog
}
// SetRegion set new region.
func (s *serverConfigV8) SetRegion(region string) {
func (s *serverConfigV9) SetRegion(region string) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
s.Region = region
}
// GetRegion get current region.
func (s serverConfigV8) GetRegion() string {
func (s serverConfigV9) GetRegion() string {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Region
}
// SetCredentials set new credentials.
func (s *serverConfigV8) SetCredential(creds credential) {
func (s *serverConfigV9) SetCredential(creds credential) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
s.Credential = creds
}
// GetCredentials get current credentials.
func (s serverConfigV8) GetCredential() credential {
func (s serverConfigV9) GetCredential() credential {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Credential
}
// Save config.
func (s serverConfigV8) Save() error {
func (s serverConfigV9) Save() error {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()

View File

@@ -412,6 +412,32 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
}
queueTargets[queueARN] = elasticLog
}
// Load PostgreSQL targets, initialize their respective loggers.
for accountID, pgN := range serverConfig.GetPostgreSQL() {
if !pgN.Enable {
continue
}
// Construct the queue ARN for Postgres.
queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypePostgreSQL
_, ok := queueTargets[queueARN]
if ok {
continue
}
// Using accountID initialize a new Postgresql logrus instance.
pgLog, err := newPostgreSQLNotify(accountID)
if err != nil {
// Encapsulate network error to be more informative.
if _, ok := err.(net.Error); ok {
return nil, &net.OpError{
Op: "Connecting to " + queueARN, Net: "tcp",
Err: err,
}
}
return nil, err
}
queueTargets[queueARN] = pgLog
}
// Successfully initialized queue targets.
return queueTargets, nil
}

View File

@@ -30,7 +30,7 @@ const (
// minio configuration related constants.
const (
globalMinioConfigVersion = "8"
globalMinioConfigVersion = "9"
globalMinioConfigDir = ".minio"
globalMinioCertsDir = "certs"
globalMinioCertFile = "public.crt"

View File

@@ -36,6 +36,8 @@ const (
queueTypeElastic = "elasticsearch"
// Static string indicating queue type 'redis'.
queueTypeRedis = "redis"
// Static string indicating queue type 'postgresql'.
queueTypePostgreSQL = "postgresql"
)
// Topic type.
@@ -55,6 +57,7 @@ type notifier struct {
NATS map[string]natsNotify `json:"nats"`
ElasticSearch map[string]elasticSearchNotify `json:"elasticsearch"`
Redis map[string]redisNotify `json:"redis"`
PostgreSQL map[string]postgreSQLNotify `json:"postgresql"`
// Add new notification queues.
}
@@ -133,6 +136,24 @@ func isElasticQueue(sqsArn arnSQS) bool {
return true
}
// Returns true if queueArn is for PostgreSQL.
func isPostgreSQLQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypePostgreSQL {
return false
}
pgNotify := serverConfig.GetPostgreSQLNotifyByID(sqsArn.AccountID)
if !pgNotify.Enable {
return false
}
pgC, err := dialPostgreSQL(pgNotify)
if err != nil {
errorIf(err, "Unable to connect to PostgreSQL server %#v", pgNotify)
return false
}
defer pgC.Close()
return true
}
// Match function matches wild cards in 'pattern' for events.
func eventMatch(eventType string, events []string) (ok bool) {
for _, event := range events {

253
cmd/notify-postgresql.go Normal file
View File

@@ -0,0 +1,253 @@
/*
* Minio Cloud Storage, (C) 2014-2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// PostgreSQL Notifier implementation. A table with a specific
// structure (column names, column types, and primary key/uniqueness
// constraint) is used. The user may set the table name in the
// configuration. A sample SQL command that creates a command with the
// required structure is:
//
// CREATE TABLE myminio (
// key VARCHAR PRIMARY KEY,
// value JSONB
// );
//
// PostgreSQL's "INSERT ... ON CONFLICT ... DO UPDATE ..." feature
// (UPSERT) is used here, so the minimum version of PostgreSQL
// required is 9.5.
//
// On each create or update object event in Minio Object storage
// server, a row is created or updated in the table in Postgres. On
// each object removal, the corresponding row is deleted from the
// table.
package cmd
import (
"database/sql"
"encoding/json"
"fmt"
"io/ioutil"
"strings"
"github.com/Sirupsen/logrus"
// libpq db driver is usually imported blank - see examples in
// https://godoc.org/github.com/lib/pq
_ "github.com/lib/pq"
)
const (
upsertRow = `INSERT INTO %s (key, value)
VALUES ($1, $2)
ON CONFLICT (key)
DO UPDATE SET value = EXCLUDED.value;`
deleteRow = ` DELETE FROM %s
WHERE key = $1;`
createTable = `CREATE TABLE %s (
key VARCHAR PRIMARY KEY,
value JSONB
);`
tableExists = `SELECT 1 FROM %s;`
)
type postgreSQLNotify struct {
Enable bool `json:"enable"`
// pass connection string in config directly. This string is
// formatted according to
// https://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters
ConnectionString string `json:"connectionString"`
// specifying a table name is required.
Table string `json:"table"`
// uses the values below if no connection string is specified
// - however the connection string method offers more
// flexibility.
Host string `json:"host"`
Port string `json:"port"`
User string `json:"user"`
Password string `json:"password"`
Database string `json:"database"`
}
type pgConn struct {
connStr string
table string
preparedStmts map[string]*sql.Stmt
*sql.DB
}
func dialPostgreSQL(pgN postgreSQLNotify) (pgConn, error) {
if !pgN.Enable {
return pgConn{}, errNotifyNotEnabled
}
// check that table is specified
if pgN.Table == "" {
return pgConn{}, fmt.Errorf(
"PostgreSQL Notifier Error: Table was not specified in configuration")
}
connStr := pgN.ConnectionString
// check if connection string is specified
if connStr == "" {
// build from other parameters
params := []string{}
if pgN.Host != "" {
params = append(params, "host="+pgN.Host)
}
if pgN.Port != "" {
params = append(params, "port="+pgN.Port)
}
if pgN.User != "" {
params = append(params, "user="+pgN.User)
}
if pgN.Password != "" {
params = append(params, "password="+pgN.Password)
}
if pgN.Database != "" {
params = append(params, "dbname="+pgN.Database)
}
connStr = strings.Join(params, " ")
}
db, err := sql.Open("postgres", connStr)
if err != nil {
return pgConn{}, fmt.Errorf(
"PostgreSQL Notifier Error: Connection opening failure (connectionString=%s): %v",
connStr, err,
)
}
// ping to check that server is actually reachable.
err = db.Ping()
if err != nil {
return pgConn{}, fmt.Errorf(
"PostgreSQL Notifier Error: Ping to server failed with: %v",
err,
)
}
// check that table exists - if not, create it.
_, err = db.Exec(fmt.Sprintf(tableExists, pgN.Table))
if err != nil {
// most likely, table does not exist. try to create it:
_, errCreate := db.Exec(fmt.Sprintf(createTable, pgN.Table))
if errCreate != nil {
// failed to create the table. error out.
return pgConn{}, fmt.Errorf(
"PostgreSQL Notifier Error: 'Select' failed with %v, then 'Create Table' failed with %v",
err, errCreate,
)
}
}
// create prepared statements
stmts := make(map[string]*sql.Stmt)
// insert or update statement
stmts["upsertRow"], err = db.Prepare(fmt.Sprintf(upsertRow, pgN.Table))
if err != nil {
return pgConn{},
fmt.Errorf("PostgreSQL Notifier Error: create UPSERT prepared statement failed with: %v", err)
}
stmts["deleteRow"], err = db.Prepare(fmt.Sprintf(deleteRow, pgN.Table))
if err != nil {
return pgConn{},
fmt.Errorf("PostgreSQL Notifier Error: create DELETE prepared statement failed with: %v", err)
}
return pgConn{connStr, pgN.Table, stmts, db}, nil
}
func newPostgreSQLNotify(accountID string) (*logrus.Logger, error) {
pgNotify := serverConfig.GetPostgreSQLNotifyByID(accountID)
// Dial postgres
pgC, err := dialPostgreSQL(pgNotify)
if err != nil {
return nil, err
}
pgLog := logrus.New()
pgLog.Out = ioutil.Discard
pgLog.Formatter = new(logrus.JSONFormatter)
pgLog.Hooks.Add(pgC)
return pgLog, nil
}
func (pgC pgConn) Close() {
// first close all prepared statements
for _, v := range pgC.preparedStmts {
_ = v.Close()
}
// close db connection
_ = pgC.DB.Close()
}
func (pgC pgConn) Fire(entry *logrus.Entry) error {
// get event type by trying to convert to string
entryEventType, ok := entry.Data["EventType"].(string)
if !ok {
// ignore event if converting EventType to string
// fails.
return nil
}
// Check for event delete
if eventMatch(entryEventType, []string{"s3:ObjectRemoved:*"}) {
// delete row from the table
_, err := pgC.preparedStmts["deleteRow"].Exec(entry.Data["Key"])
if err != nil {
return fmt.Errorf(
"Error deleting event with key = %v - got postgres error - %v",
entry.Data["Key"], err,
)
}
} else {
// json encode the value for the row
value, err := json.Marshal(map[string]interface{}{
"Records": entry.Data["Records"],
})
if err != nil {
return fmt.Errorf(
"Unable to encode event %v to JSON - got error - %v",
entry.Data["Records"], err,
)
}
// upsert row into the table
_, err = pgC.preparedStmts["upsertRow"].Exec(entry.Data["Key"], value)
if err != nil {
return fmt.Errorf(
"Unable to upsert event with Key=%v and Value=%v - got postgres error - %v",
entry.Data["Key"], entry.Data["Records"], err,
)
}
}
return nil
}
func (pgC pgConn) Levels() []logrus.Level {
return []logrus.Level{
logrus.InfoLevel,
}
}

View File

@@ -37,7 +37,8 @@ type redisConn struct {
params redisNotify
}
// Dial a new connection to redis instance at addr, optionally with a password if any.
// Dial a new connection to redis instance at addr, optionally with a
// password if any.
func dialRedis(rNotify redisNotify) (*redis.Pool, error) {
// Return error if redis not enabled.
if !rNotify.Enable {