Support access format for database notification targets (#3953)

* Add configuration parameter "format" for db targets and perform
  configuration migration.
* Add PostgreSQL `access` format: This causes Minio to append all events
  to the configured table. Prefix, suffix and event filters continue
  to be supported for this mode too.
* Update documentation for PostgreSQL notification target.
* Add MySQL `access` format: It is very similar to the same format for
  PostgreSQL.
* Update MySQL notification documentation.
This commit is contained in:
Aditya Manthramurthy
2017-03-27 23:57:25 +05:30
committed by Harshavardhana
parent 6e63904048
commit a099319e66
14 changed files with 637 additions and 211 deletions

View File

@@ -85,6 +85,10 @@ func migrateConfig() error {
if err := migrateV15ToV16(); err != nil {
return err
}
// Migration version '16' to '17'.
if err := migrateV16ToV17(); err != nil {
return err
}
return nil
}
@@ -1082,3 +1086,124 @@ func migrateV15ToV16() error {
log.Printf("Migration from version %s to %s completed successfully.\n", cv15.Version, srvConfig.Version)
return nil
}
// Version '16' to '17' migration. Adds "format" configuration
// parameter for database targets.
func migrateV16ToV17() error {
configFile := getConfigFile()
cv16 := &serverConfigV16{}
_, err := quick.Load(configFile, cv16)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return fmt.Errorf("Unable to load config version 16. %v", err)
}
if cv16.Version != "16" {
return nil
}
// Copy over fields from V16 into V17 config struct
srvConfig := &serverConfigV17{
Logger: &loggers{},
Notify: &notifier{},
}
srvConfig.Version = "17"
srvConfig.Credential = cv16.Credential
srvConfig.Region = cv16.Region
if srvConfig.Region == "" {
// Region needs to be set for AWS Signature Version 4.
srvConfig.Region = globalMinioDefaultRegion
}
srvConfig.Logger.Console = cv16.Logger.Console
srvConfig.Logger.File = cv16.Logger.File
// check and set notifiers config
if len(cv16.Notify.AMQP) == 0 {
srvConfig.Notify.AMQP = make(map[string]amqpNotify)
srvConfig.Notify.AMQP["1"] = amqpNotify{}
} else {
srvConfig.Notify.AMQP = cv16.Notify.AMQP
}
if len(cv16.Notify.ElasticSearch) == 0 {
srvConfig.Notify.ElasticSearch = make(map[string]elasticSearchNotify)
srvConfig.Notify.ElasticSearch["1"] = elasticSearchNotify{}
} else {
// IMPORTANT NOTE: Future migrations should remove
// this as existing configuration will already contain
// a value for the "format" parameter.
for k, v := range cv16.Notify.ElasticSearch.Clone() {
v.Format = formatNamespace
cv16.Notify.ElasticSearch[k] = v
}
srvConfig.Notify.ElasticSearch = cv16.Notify.ElasticSearch
}
if len(cv16.Notify.Redis) == 0 {
srvConfig.Notify.Redis = make(map[string]redisNotify)
srvConfig.Notify.Redis["1"] = redisNotify{}
} else {
// IMPORTANT NOTE: Future migrations should remove
// this as existing configuration will already contain
// a value for the "format" parameter.
for k, v := range cv16.Notify.Redis.Clone() {
v.Format = formatNamespace
cv16.Notify.Redis[k] = v
}
srvConfig.Notify.Redis = cv16.Notify.Redis
}
if len(cv16.Notify.PostgreSQL) == 0 {
srvConfig.Notify.PostgreSQL = make(map[string]postgreSQLNotify)
srvConfig.Notify.PostgreSQL["1"] = postgreSQLNotify{}
} else {
// IMPORTANT NOTE: Future migrations should remove
// this as existing configuration will already contain
// a value for the "format" parameter.
for k, v := range cv16.Notify.PostgreSQL.Clone() {
v.Format = formatNamespace
cv16.Notify.PostgreSQL[k] = v
}
srvConfig.Notify.PostgreSQL = cv16.Notify.PostgreSQL
}
if len(cv16.Notify.Kafka) == 0 {
srvConfig.Notify.Kafka = make(map[string]kafkaNotify)
srvConfig.Notify.Kafka["1"] = kafkaNotify{}
} else {
srvConfig.Notify.Kafka = cv16.Notify.Kafka
}
if len(cv16.Notify.NATS) == 0 {
srvConfig.Notify.NATS = make(map[string]natsNotify)
srvConfig.Notify.NATS["1"] = natsNotify{}
} else {
srvConfig.Notify.NATS = cv16.Notify.NATS
}
if len(cv16.Notify.Webhook) == 0 {
srvConfig.Notify.Webhook = make(map[string]webhookNotify)
srvConfig.Notify.Webhook["1"] = webhookNotify{}
} else {
srvConfig.Notify.Webhook = cv16.Notify.Webhook
}
if len(cv16.Notify.MySQL) == 0 {
srvConfig.Notify.MySQL = make(map[string]mySQLNotify)
srvConfig.Notify.MySQL["1"] = mySQLNotify{}
} else {
// IMPORTANT NOTE: Future migrations should remove
// this as existing configuration will already contain
// a value for the "format" parameter.
for k, v := range cv16.Notify.MySQL.Clone() {
v.Format = formatNamespace
cv16.Notify.MySQL[k] = v
}
srvConfig.Notify.MySQL = cv16.Notify.MySQL
}
// Load browser config from existing config in the file.
srvConfig.Browser = cv16.Browser
if err = quick.Save(configFile, srvConfig); err != nil {
return fmt.Errorf("Failed to migrate config from %s to %s. %v", cv16.Version, srvConfig.Version, err)
}
log.Printf("Migration from version %s to %s completed successfully.\n", cv16.Version, srvConfig.Version)
return nil
}

View File

@@ -115,9 +115,13 @@ func TestServerConfigMigrateInexistentConfig(t *testing.T) {
if err := migrateV15ToV16(); err != nil {
t.Fatal("migrate v15 to v16 should succeed when no config file is found")
}
if err := migrateV16ToV17(); err != nil {
t.Fatal("migrate v16 to v17 should succeed when no config file is found")
}
}
// Test if a config migration from v2 to v16 is successfully done
// Test if a config migration from v2 to v17 is successfully done
func TestServerConfigMigrateV2toV16(t *testing.T) {
rootPath, err := newTestConfig(globalMinioDefaultRegion)
if err != nil {
@@ -157,7 +161,7 @@ func TestServerConfigMigrateV2toV16(t *testing.T) {
}
// Check the version number in the upgraded config file
expectedVersion := v16
expectedVersion := v17
if serverConfig.Version != expectedVersion {
t.Fatalf("Expect version "+expectedVersion+", found: %v", serverConfig.Version)
}
@@ -231,4 +235,7 @@ func TestServerConfigMigrateFaultyConfig(t *testing.T) {
if err := migrateV15ToV16(); err == nil {
t.Fatal("migrateConfigV15ToV16() should fail with a corrupted json")
}
if err := migrateV16ToV17(); err == nil {
t.Fatal("migrateConfigV16ToV17() should fail with a corrupted json")
}
}

View File

@@ -413,3 +413,20 @@ type serverConfigV15 struct {
// Notification queue configuration.
Notify *notifier `json:"notify"`
}
// serverConfigV16 server configuration version '16' which is like
// version '15' except it makes a change to logging configuration.
type serverConfigV16 struct {
Version string `json:"version"`
// S3 API configuration.
Credential credential `json:"credential"`
Region string `json:"region"`
Browser BrowserFlag `json:"browser"`
// Additional error logging configuration.
Logger *loggers `json:"logger"`
// Notification queue configuration.
Notify *notifier `json:"notify"`
}

View File

@@ -29,12 +29,14 @@ import (
// Read Write mutex for safe access to ServerConfig.
var serverConfigMu sync.RWMutex
const v16 = "16"
// Config version
const v17 = "17"
// serverConfigV16 server configuration version '16' which is like
// version '15' except it removes log level field and renames `fileName`
// field of File logger to `filename`
type serverConfigV16 struct {
// serverConfigV17 server configuration version '17' which is like
// version '16' except it adds support for "format" parameter in
// database event notification targets: PostgreSQL, MySQL, Redis and
// Elasticsearch.
type serverConfigV17 struct {
Version string `json:"version"`
// S3 API configuration.
@@ -49,9 +51,9 @@ type serverConfigV16 struct {
Notify *notifier `json:"notify"`
}
func newServerConfigV16() *serverConfigV16 {
srvCfg := &serverConfigV16{
Version: v16,
func newServerConfigV17() *serverConfigV17 {
srvCfg := &serverConfigV17{
Version: v17,
Credential: mustGetNewCredential(),
Region: globalMinioDefaultRegion,
Browser: true,
@@ -87,7 +89,7 @@ func newServerConfigV16() *serverConfigV16 {
// found, otherwise use default parameters
func newConfig(envParams envParams) error {
// Initialize server config.
srvCfg := newServerConfigV16()
srvCfg := newServerConfigV17()
// If env is set for a fresh start, save them to config file.
if globalIsEnvCreds {
@@ -117,7 +119,7 @@ func newConfig(envParams envParams) error {
// loadConfig - loads a new config from disk, overrides params from env
// if found and valid
func loadConfig(envParams envParams) error {
srvCfg := &serverConfigV16{
srvCfg := &serverConfigV17{
Region: globalMinioDefaultRegion,
Browser: true,
}
@@ -125,8 +127,8 @@ func loadConfig(envParams envParams) error {
if _, err := quick.Load(getConfigFile(), srvCfg); err != nil {
return err
}
if srvCfg.Version != v16 {
return fmt.Errorf("configuration version mismatch. Expected: %s, Got: %s", srvCfg.Version, v16)
if srvCfg.Version != v17 {
return fmt.Errorf("configuration version mismatch. Expected: %s, Got: %s", srvCfg.Version, v17)
}
// If env is set override the credentials from config file.
@@ -203,7 +205,7 @@ func checkDupJSONKeys(json string) error {
// validateConfig checks for
func validateConfig() error {
srvCfg := &serverConfigV16{
srvCfg := &serverConfigV17{
Region: globalMinioDefaultRegion,
Browser: true,
}
@@ -214,8 +216,8 @@ func validateConfig() error {
}
// Check if config version is valid
if srvCfg.Version != v16 {
return errors.New("bad config version, expected: " + v16)
if srvCfg.Version != v17 {
return errors.New("bad config version, expected: " + v17)
}
// Load config file json and check for duplication json keys
@@ -254,10 +256,10 @@ func validateConfig() error {
}
// serverConfig server config.
var serverConfig *serverConfigV16
var serverConfig *serverConfigV17
// GetVersion get current config version.
func (s serverConfigV16) GetVersion() string {
func (s serverConfigV17) GetVersion() string {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -265,7 +267,7 @@ func (s serverConfigV16) GetVersion() string {
}
// SetRegion set new region.
func (s *serverConfigV16) SetRegion(region string) {
func (s *serverConfigV17) SetRegion(region string) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
@@ -277,7 +279,7 @@ func (s *serverConfigV16) SetRegion(region string) {
}
// GetRegion get current region.
func (s serverConfigV16) GetRegion() string {
func (s serverConfigV17) GetRegion() string {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -290,7 +292,7 @@ func (s serverConfigV16) GetRegion() string {
}
// SetCredentials set new credentials.
func (s *serverConfigV16) SetCredential(creds credential) {
func (s *serverConfigV17) SetCredential(creds credential) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
@@ -299,7 +301,7 @@ func (s *serverConfigV16) SetCredential(creds credential) {
}
// GetCredentials get current credentials.
func (s serverConfigV16) GetCredential() credential {
func (s serverConfigV17) GetCredential() credential {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -307,16 +309,16 @@ func (s serverConfigV16) GetCredential() credential {
}
// SetBrowser set if browser is enabled.
func (s *serverConfigV16) SetBrowser(b BrowserFlag) {
func (s *serverConfigV17) SetBrowser(v BrowserFlag) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
// Set the new value.
s.Browser = b
s.Browser = v
}
// GetCredentials get current credentials.
func (s serverConfigV16) GetBrowser() BrowserFlag {
func (s serverConfigV17) GetBrowser() BrowserFlag {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -324,7 +326,7 @@ func (s serverConfigV16) GetBrowser() BrowserFlag {
}
// Save config.
func (s serverConfigV16) Save() error {
func (s serverConfigV17) Save() error {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()

View File

@@ -109,8 +109,8 @@ func TestServerConfig(t *testing.T) {
serverConfig.Logger.SetFile(fileLogger)
// Match version.
if serverConfig.GetVersion() != v16 {
t.Errorf("Expecting version %s found %s", serverConfig.GetVersion(), v16)
if serverConfig.GetVersion() != v17 {
t.Errorf("Expecting version %s found %s", serverConfig.GetVersion(), v17)
}
// Attempt to save.
@@ -215,7 +215,7 @@ func TestValidateConfig(t *testing.T) {
configPath := filepath.Join(rootPath, minioConfigFile)
v := v16
v := v17
testCases := []struct {
configData string
@@ -275,8 +275,32 @@ func TestValidateConfig(t *testing.T) {
// Test 18 - Test Webhook
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "webhook": { "1": { "enable": true, "endpoint": "" } }}}`, false},
// Test 19 - Test MySQL
// Test 20 - Test MySQL
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "mysql": { "1": { "enable": true, "dsnString": "", "table": "", "host": "", "port": "", "user": "", "password": "", "database": "" }}}}`, false},
// Test 21 - Test Format for MySQL
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "mysql": { "1": { "enable": true, "dsnString": "", "format": "invalid", "table": "xxx", "host": "10.0.0.1", "port": "3306", "user": "abc", "password": "pqr", "database": "test1" }}}}`, false},
// Test 22 - Test valid Format for MySQL
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "mysql": { "1": { "enable": true, "dsnString": "", "format": "namespace", "table": "xxx", "host": "10.0.0.1", "port": "3306", "user": "abc", "password": "pqr", "database": "test1" }}}}`, true},
// Test 23 - Test Format for PostgreSQL
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "postgresql": { "1": { "enable": true, "connectionString": "", "format": "invalid", "table": "xxx", "host": "myhost", "port": "5432", "user": "abc", "password": "pqr", "database": "test1" }}}}`, false},
// Test 24 - Test valid Format for PostgreSQL
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "postgresql": { "1": { "enable": true, "connectionString": "", "format": "namespace", "table": "xxx", "host": "myhost", "port": "5432", "user": "abc", "password": "pqr", "database": "test1" }}}}`, true},
// Test 25 - Test Format for ElasticSearch
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "elasticsearch": { "1": { "enable": true, "format": "invalid", "url": "example.com", "index": "myindex" } }}}`, false},
// Test 26 - Test valid Format for ElasticSearch
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "elasticsearch": { "1": { "enable": true, "format": "namespace", "url": "example.com", "index": "myindex" } }}}`, true},
// Test 27 - Test Format for Redis
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "redis": { "1": { "enable": true, "format": "invalid", "address": "example.com", "password": "xxx", "key": "key1" } }}}`, false},
// Test 28 - Test valid Format for Redis
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "redis": { "1": { "enable": true, "format": "namespace", "address": "example.com", "password": "xxx", "key": "key1" } }}}`, true},
}
for i, testCase := range testCases {

View File

@@ -96,7 +96,7 @@ func newGatewayLayer(backendType, accessKey, secretKey string) (GatewayLayer, er
// only used in memory.
func newGatewayConfig(accessKey, secretKey, region string) error {
// Initialize server config.
srvCfg := newServerConfigV16()
srvCfg := newServerConfigV17()
// If env is set for a fresh start, save them to config file.
srvCfg.SetCredential(credential{

View File

@@ -43,6 +43,10 @@ const (
queueTypeKafka = "kafka"
// Static string for Webhooks
queueTypeWebhook = "webhook"
// Notifier format value constants
formatNamespace = "namespace"
formatAccess = "access"
)
// Topic type.

View File

@@ -19,6 +19,7 @@ package cmd
import (
"encoding/hex"
"errors"
"fmt"
"io/ioutil"
"github.com/Sirupsen/logrus"
@@ -29,6 +30,7 @@ import (
// elasticQueue is a elasticsearch event notification queue.
type elasticSearchNotify struct {
Enable bool `json:"enable"`
Format string `json:"format"`
URL string `json:"url"`
Index string `json:"index"`
}
@@ -37,6 +39,11 @@ func (e *elasticSearchNotify) Validate() error {
if !e.Enable {
return nil
}
if e.Format != formatNamespace {
return fmt.Errorf(
"Elasticsearch Notifier Error: \"format\" must be \"%s\"",
formatNamespace)
}
if _, err := checkURL(e.URL); err != nil {
return err
}

View File

@@ -14,11 +14,19 @@
* limitations under the License.
*/
// MySQL Notifier implementation. A table with a specific
// structure (column names, column types, and primary key/uniqueness
// constraint) is used. The user may set the table name in the
// configuration. A sample SQL command that creates a command with the
// required structure is:
// MySQL Notifier implementation. Two formats, "namespace" and
// "access" are supported.
//
// * Namespace format
//
// On each create or update object event in Minio Object storage
// server, a row is created or updated in the table in MySQL. On each
// object removal, the corresponding row is deleted from the table.
//
// A table with a specific structure (column names, column types, and
// primary key/uniqueness constraint) is used. The user may set the
// table name in the configuration. A sample SQL command that creates
// a command with the required structure is:
//
// CREATE TABLE myminio (
// key_name VARCHAR(2048),
@@ -30,10 +38,18 @@
// here. The implementation has been tested with MySQL Ver 14.14
// Distrib 5.7.17.
//
// On each create or update object event in Minio Object storage
// server, a row is created or updated in the table in MySQL. On
// each object removal, the corresponding row is deleted from the
// table.
// * Access format
//
// On each event, a row is appended to the configured table. There is
// no deletion or modification of existing rows.
//
// A different table schema is used for this format. A sample SQL
// commant that creates a table with the required structure is:
//
// CREATE TABLE myminio (
// event_time TIMESTAMP WITH TIME ZONE NOT NULL,
// event_data JSONB
// );
package cmd
@@ -42,29 +58,53 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"time"
"github.com/Sirupsen/logrus"
"github.com/go-sql-driver/mysql"
)
const (
upsertRowMySQL = `INSERT INTO %s (key_name, value)
// Queries for format=namespace mode.
upsertRowForNSMySQL = `INSERT INTO %s (key_name, value)
VALUES (?, ?)
ON DUPLICATE KEY UPDATE value=VALUES(value);
`
deleteRowMySQL = ` DELETE FROM %s
deleteRowForNSMySQL = ` DELETE FROM %s
WHERE key_name = ?;`
createTableMySQL = `CREATE TABLE %s (
createTableForNSMySQL = `CREATE TABLE %s (
key_name VARCHAR(2048),
value JSON,
PRIMARY KEY (key_name)
);`
// Queries for format=access mode.
insertRowForAccessMySQL = `INSERT INTO %s (event_time, event_data)
VALUES (?, ?);`
createTableForAccessMySQL = `CREATE TABLE %s (
event_time DATETIME NOT NULL,
event_data JSON
);`
// Query to check if a table already exists.
tableExistsMySQL = `SELECT 1 FROM %s;`
)
func makeMySQLError(msg string, a ...interface{}) error {
s := fmt.Sprintf(msg, a...)
return fmt.Errorf("MySQL Notifier Error: %s", s)
}
var (
myNFormatError = makeMySQLError(`"format" value is invalid - it must be one of "%s" or "%s".`, formatNamespace, formatAccess)
myNTableError = makeMySQLError("Table was not specified in the configuration.")
)
type mySQLNotify struct {
Enable bool `json:"enable"`
Format string `json:"format"`
// pass data-source-name connection string in config
// directly. This string is formatted according to
// https://github.com/go-sql-driver/mysql#dsn-data-source-name
@@ -86,14 +126,16 @@ func (m *mySQLNotify) Validate() error {
if !m.Enable {
return nil
}
if m.Format != formatNamespace && m.Format != formatAccess {
return myNFormatError
}
if m.DsnString == "" {
if _, err := checkURL(m.Host); err != nil {
return err
}
}
if m.Table == "" {
return fmt.Errorf(
"MySQL Notifier Error: Table was not specified in configuration")
return myNTableError
}
return nil
}
@@ -101,6 +143,7 @@ func (m *mySQLNotify) Validate() error {
type mySQLConn struct {
dsnStr string
table string
format string
preparedStmts map[string]*sql.Stmt
*sql.DB
}
@@ -126,30 +169,32 @@ func dialMySQL(msql mySQLNotify) (mySQLConn, error) {
db, err := sql.Open("mysql", dsnStr)
if err != nil {
return mySQLConn{}, fmt.Errorf(
"MySQL Notifier Error: Connection opening failure (dsnStr=%s): %v",
dsnStr, err,
)
return mySQLConn{}, makeMySQLError(
"Connection opening failure (dsnStr=%s): %v",
dsnStr, err)
}
// ping to check that server is actually reachable.
err = db.Ping()
if err != nil {
return mySQLConn{}, fmt.Errorf(
"MySQL Notifier Error: Ping to server failed with: %v",
err,
)
return mySQLConn{}, makeMySQLError(
"Ping to server failed with: %v", err)
}
// check that table exists - if not, create it.
_, err = db.Exec(fmt.Sprintf(tableExistsMySQL, msql.Table))
if err != nil {
createStmt := createTableForNSMySQL
if msql.Format == formatAccess {
createStmt = createTableForAccessMySQL
}
// most likely, table does not exist. try to create it:
_, errCreate := db.Exec(fmt.Sprintf(createTableMySQL, msql.Table))
_, errCreate := db.Exec(fmt.Sprintf(createStmt, msql.Table))
if errCreate != nil {
// failed to create the table. error out.
return mySQLConn{}, fmt.Errorf(
"MySQL Notifier Error: 'Select' failed with %v, then 'Create Table' failed with %v",
return mySQLConn{}, makeMySQLError(
"'Select' failed with %v, then 'Create Table' failed with %v",
err, errCreate,
)
}
@@ -157,19 +202,33 @@ func dialMySQL(msql mySQLNotify) (mySQLConn, error) {
// create prepared statements
stmts := make(map[string]*sql.Stmt)
// insert or update statement
stmts["upsertRow"], err = db.Prepare(fmt.Sprintf(upsertRowMySQL, msql.Table))
if err != nil {
return mySQLConn{},
fmt.Errorf("MySQL Notifier Error: create UPSERT prepared statement failed with: %v", err)
}
stmts["deleteRow"], err = db.Prepare(fmt.Sprintf(deleteRowMySQL, msql.Table))
if err != nil {
return mySQLConn{},
fmt.Errorf("MySQL Notifier Error: create DELETE prepared statement failed with: %v", err)
}
switch msql.Format {
case formatNamespace:
// insert or update statement
stmts["upsertRow"], err = db.Prepare(fmt.Sprintf(upsertRowForNSMySQL,
msql.Table))
if err != nil {
return mySQLConn{},
makeMySQLError("create UPSERT prepared statement failed with: %v", err)
}
// delete statement
stmts["deleteRow"], err = db.Prepare(fmt.Sprintf(deleteRowForNSMySQL,
msql.Table))
if err != nil {
return mySQLConn{},
makeMySQLError("create DELETE prepared statement failed with: %v", err)
}
case formatAccess:
// insert statement
stmts["insertRow"], err = db.Prepare(fmt.Sprintf(insertRowForAccessMySQL,
msql.Table))
if err != nil {
return mySQLConn{}, makeMySQLError(
"create INSERT prepared statement failed with: %v", err)
}
return mySQLConn{dsnStr, msql.Table, stmts, db}, nil
}
return mySQLConn{dsnStr, msql.Table, msql.Format, stmts, db}, nil
}
func newMySQLNotify(accountID string) (*logrus.Logger, error) {
@@ -210,35 +269,66 @@ func (myC mySQLConn) Fire(entry *logrus.Entry) error {
return nil
}
// Check for event delete
if eventMatch(entryEventType, []string{"s3:ObjectRemoved:*"}) {
// delete row from the table
_, err := myC.preparedStmts["deleteRow"].Exec(entry.Data["Key"])
if err != nil {
return fmt.Errorf(
"Error deleting event with key = %v - got mysql error - %v",
entry.Data["Key"], err,
)
}
} else {
// json encode the value for the row
jsonEncoder := func(d interface{}) ([]byte, error) {
value, err := json.Marshal(map[string]interface{}{
"Records": entry.Data["Records"],
"Records": d,
})
if err != nil {
return fmt.Errorf(
"Unable to encode event %v to JSON - got error - %v",
entry.Data["Records"], err,
)
return nil, makeMySQLError(
"Unable to encode event %v to JSON: %v", d, err)
}
return value, nil
}
switch myC.format {
case formatNamespace:
// Check for event delete
if eventMatch(entryEventType, []string{"s3:ObjectRemoved:*"}) {
// delete row from the table
_, err := myC.preparedStmts["deleteRow"].Exec(entry.Data["Key"])
if err != nil {
return makeMySQLError(
"Error deleting event with key = %v - got mysql error - %v",
entry.Data["Key"], err,
)
}
} else {
value, err := jsonEncoder(entry.Data["Records"])
if err != nil {
return err
}
// upsert row into the table
_, err = myC.preparedStmts["upsertRow"].Exec(entry.Data["Key"], value)
if err != nil {
return makeMySQLError(
"Unable to upsert event with Key=%v and Value=%v - got mysql error - %v",
entry.Data["Key"], entry.Data["Records"], err,
)
}
}
case formatAccess:
// eventTime is taken from the first entry in the
// records.
events, ok := entry.Data["Records"].([]NotificationEvent)
if !ok {
return makeMySQLError("unable to extract event time due to conversion error of entry.Data[\"Records\"]=%v", entry.Data["Records"])
}
eventTime, err := time.Parse(timeFormatAMZ, events[0].EventTime)
if err != nil {
return makeMySQLError("unable to parse event time \"%s\": %v",
events[0].EventTime, err)
}
// upsert row into the table
_, err = myC.preparedStmts["upsertRow"].Exec(entry.Data["Key"], value)
value, err := jsonEncodeEventData(entry.Data["Records"])
if err != nil {
return fmt.Errorf(
"Unable to upsert event with Key=%v and Value=%v - got mysql error - %v",
entry.Data["Key"], entry.Data["Records"], err,
)
return err
}
_, err = myC.preparedStmts["insertRow"].Exec(eventTime, value)
if err != nil {
return makeMySQLError("Unable to insert event with value=%v: %v",
value, err)
}
}

View File

@@ -14,11 +14,20 @@
* limitations under the License.
*/
// PostgreSQL Notifier implementation. A table with a specific
// structure (column names, column types, and primary key/uniqueness
// constraint) is used. The user may set the table name in the
// configuration. A sample SQL command that creates a command with the
// required structure is:
// PostgreSQL Notifier implementation. Two formats, "namespace" and
// "access" are supported.
//
// * Namespace format
//
// On each create or update object event in Minio Object storage
// server, a row is created or updated in the table in Postgres. On
// each object removal, the corresponding row is deleted from the
// table.
//
// A table with a specific structure (column names, column types, and
// primary key/uniqueness constraint) is used. The user may set the
// table name in the configuration. A sample SQL command that creates
// a table with the required structure is:
//
// CREATE TABLE myminio (
// key VARCHAR PRIMARY KEY,
@@ -29,10 +38,18 @@
// (UPSERT) is used here, so the minimum version of PostgreSQL
// required is 9.5.
//
// On each create or update object event in Minio Object storage
// server, a row is created or updated in the table in Postgres. On
// each object removal, the corresponding row is deleted from the
// table.
// * Access format
//
// On each event, a row is appended to the configured table. There is
// no deletion or modification of existing rows.
//
// A different table schema is used for this format. A sample SQL
// commant that creates a table with the required structure is:
//
// CREATE TABLE myminio (
// event_time TIMESTAMP WITH TIME ZONE NOT NULL,
// event_data JSONB
// );
package cmd
@@ -42,54 +59,91 @@ import (
"fmt"
"io/ioutil"
"strings"
"time"
"github.com/Sirupsen/logrus"
// libpq db driver is usually imported blank - see examples in
// https://godoc.org/github.com/lib/pq
// Register postgres driver
_ "github.com/lib/pq"
)
const (
upsertRow = `INSERT INTO %s (key, value)
// Queries for format=namespace mode. Here the `key` column is
// the bucket and object of the event. When objects are
// deleted, the corresponding row is deleted in the
// table. When objects are created or over-written, rows are
// inserted or updated respectively in the table.
upsertRowForNS = `INSERT INTO %s (key, value)
VALUES ($1, $2)
ON CONFLICT (key)
DO UPDATE SET value = EXCLUDED.value;`
deleteRow = ` DELETE FROM %s
deleteRowForNS = ` DELETE FROM %s
WHERE key = $1;`
createTable = `CREATE TABLE %s (
createTableForNS = `CREATE TABLE %s (
key VARCHAR PRIMARY KEY,
value JSONB
);`
// Queries for format=access mode. Here the `event_time`
// column of the table, stores the time at which the event
// occurred in the Minio server.
insertRowForAccess = `INSERT INTO %s (event_time, event_data)
VALUES ($1, $2);`
createTableForAccess = `CREATE TABLE %s (
event_time TIMESTAMP WITH TIME ZONE NOT NULL,
event_data JSONB
);`
// Query to check if a table already exists.
tableExists = `SELECT 1 FROM %s;`
)
func makePGError(msg string, a ...interface{}) error {
s := fmt.Sprintf(msg, a...)
return fmt.Errorf("PostgreSQL Notifier Error: %s", s)
}
var (
pgNFormatError = makePGError(`"format" value is invalid - it must be one of "%s" or "%s".`, formatNamespace, formatAccess)
pgNTableError = makePGError("Table was not specified in the configuration.")
)
type postgreSQLNotify struct {
Enable bool `json:"enable"`
// pass connection string in config directly. This string is
Format string `json:"format"`
// Pass connection string in config directly. This string is
// formatted according to
// https://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters
ConnectionString string `json:"connectionString"`
// specifying a table name is required.
Table string `json:"table"`
// uses the values below if no connection string is specified
// - however the connection string method offers more
// flexibility.
Host string `json:"host"`
Port string `json:"port"`
User string `json:"user"`
Password string `json:"password"`
Database string `json:"database"`
// The values below, if non-empty are appended to
// ConnectionString above. Default values are shown in
// comments below (implicitly used by the library).
Host string `json:"host"` // default: localhost
Port string `json:"port"` // default: 5432
User string `json:"user"` // default: user running minio
Password string `json:"password"` // default: no password
Database string `json:"database"` // default: same as user
}
func (p *postgreSQLNotify) Validate() error {
if !p.Enable {
return nil
}
if _, err := checkURL(p.Host); err != nil {
return err
if p.Format != formatNamespace && p.Format != formatAccess {
return pgNFormatError
}
if p.ConnectionString == "" {
if _, err := checkURL(p.Host); err != nil {
return err
}
}
if p.Table == "" {
return pgNTableError
}
return nil
}
@@ -97,6 +151,7 @@ func (p *postgreSQLNotify) Validate() error {
type pgConn struct {
connStr string
table string
format string
preparedStmts map[string]*sql.Stmt
*sql.DB
}
@@ -106,61 +161,53 @@ func dialPostgreSQL(pgN postgreSQLNotify) (pgConn, error) {
return pgConn{}, errNotifyNotEnabled
}
// check that table is specified
if pgN.Table == "" {
return pgConn{}, fmt.Errorf(
"PostgreSQL Notifier Error: Table was not specified in configuration")
// collect connection params
params := []string{pgN.ConnectionString}
if pgN.Host != "" {
params = append(params, "host="+pgN.Host)
}
connStr := pgN.ConnectionString
// check if connection string is specified
if connStr == "" {
// build from other parameters
params := []string{}
if pgN.Host != "" {
params = append(params, "host="+pgN.Host)
}
if pgN.Port != "" {
params = append(params, "port="+pgN.Port)
}
if pgN.User != "" {
params = append(params, "user="+pgN.User)
}
if pgN.Password != "" {
params = append(params, "password="+pgN.Password)
}
if pgN.Database != "" {
params = append(params, "dbname="+pgN.Database)
}
connStr = strings.Join(params, " ")
if pgN.Port != "" {
params = append(params, "port="+pgN.Port)
}
if pgN.User != "" {
params = append(params, "user="+pgN.User)
}
if pgN.Password != "" {
params = append(params, "password="+pgN.Password)
}
if pgN.Database != "" {
params = append(params, "dbname="+pgN.Database)
}
connStr := strings.Join(params, " ")
db, err := sql.Open("postgres", connStr)
if err != nil {
return pgConn{}, fmt.Errorf(
"PostgreSQL Notifier Error: Connection opening failure (connectionString=%s): %v",
connStr, err,
)
return pgConn{}, makePGError(
"Connection opening failure (connectionString=%s): %v",
connStr, err)
}
// ping to check that server is actually reachable.
err = db.Ping()
if err != nil {
return pgConn{}, fmt.Errorf(
"PostgreSQL Notifier Error: Ping to server failed with: %v",
err,
)
return pgConn{}, makePGError("Ping to server failed with: %v",
err)
}
// check that table exists - if not, create it.
_, err = db.Exec(fmt.Sprintf(tableExists, pgN.Table))
if err != nil {
createStmt := createTableForNS
if pgN.Format == formatAccess {
createStmt = createTableForAccess
}
// most likely, table does not exist. try to create it:
_, errCreate := db.Exec(fmt.Sprintf(createTable, pgN.Table))
_, errCreate := db.Exec(fmt.Sprintf(createStmt, pgN.Table))
if errCreate != nil {
// failed to create the table. error out.
return pgConn{}, fmt.Errorf(
"PostgreSQL Notifier Error: 'Select' failed with %v, then 'Create Table' failed with %v",
return pgConn{}, makePGError(
"'Select' failed with %v, then 'Create Table' failed with %v",
err, errCreate,
)
}
@@ -168,19 +215,33 @@ func dialPostgreSQL(pgN postgreSQLNotify) (pgConn, error) {
// create prepared statements
stmts := make(map[string]*sql.Stmt)
// insert or update statement
stmts["upsertRow"], err = db.Prepare(fmt.Sprintf(upsertRow, pgN.Table))
if err != nil {
return pgConn{},
fmt.Errorf("PostgreSQL Notifier Error: create UPSERT prepared statement failed with: %v", err)
}
stmts["deleteRow"], err = db.Prepare(fmt.Sprintf(deleteRow, pgN.Table))
if err != nil {
return pgConn{},
fmt.Errorf("PostgreSQL Notifier Error: create DELETE prepared statement failed with: %v", err)
switch pgN.Format {
case formatNamespace:
// insert or update statement
stmts["upsertRow"], err = db.Prepare(fmt.Sprintf(upsertRowForNS,
pgN.Table))
if err != nil {
return pgConn{}, makePGError(
"create UPSERT prepared statement failed with: %v", err)
}
// delete statement
stmts["deleteRow"], err = db.Prepare(fmt.Sprintf(deleteRowForNS,
pgN.Table))
if err != nil {
return pgConn{}, makePGError(
"create DELETE prepared statement failed with: %v", err)
}
case formatAccess:
// insert statement
stmts["insertRow"], err = db.Prepare(fmt.Sprintf(insertRowForAccess,
pgN.Table))
if err != nil {
return pgConn{}, makePGError(
"create INSERT prepared statement failed with: %v", err)
}
}
return pgConn{connStr, pgN.Table, stmts, db}, nil
return pgConn{connStr, pgN.Table, pgN.Format, stmts, db}, nil
}
func newPostgreSQLNotify(accountID string) (*logrus.Logger, error) {
@@ -212,6 +273,18 @@ func (pgC pgConn) Close() {
_ = pgC.DB.Close()
}
func jsonEncodeEventData(d interface{}) ([]byte, error) {
// json encode the value for the row
value, err := json.Marshal(map[string]interface{}{
"Records": d,
})
if err != nil {
return nil, makePGError(
"Unable to encode event %v to JSON: %v", d, err)
}
return value, nil
}
func (pgC pgConn) Fire(entry *logrus.Entry) error {
// get event type by trying to convert to string
entryEventType, ok := entry.Data["EventType"].(string)
@@ -221,35 +294,55 @@ func (pgC pgConn) Fire(entry *logrus.Entry) error {
return nil
}
// Check for event delete
if eventMatch(entryEventType, []string{"s3:ObjectRemoved:*"}) {
// delete row from the table
_, err := pgC.preparedStmts["deleteRow"].Exec(entry.Data["Key"])
if err != nil {
return fmt.Errorf(
"Error deleting event with key = %v - got postgres error - %v",
entry.Data["Key"], err,
)
switch pgC.format {
case formatNamespace:
// Check for event delete
if eventMatch(entryEventType, []string{"s3:ObjectRemoved:*"}) {
// delete row from the table
_, err := pgC.preparedStmts["deleteRow"].Exec(entry.Data["Key"])
if err != nil {
return makePGError(
"Error deleting event with key=%v: %v",
entry.Data["Key"], err,
)
}
} else {
value, err := jsonEncodeEventData(entry.Data["Records"])
if err != nil {
return err
}
// upsert row into the table
_, err = pgC.preparedStmts["upsertRow"].Exec(entry.Data["Key"], value)
if err != nil {
return makePGError(
"Unable to upsert event with key=%v and value=%v: %v",
entry.Data["Key"], entry.Data["Records"], err,
)
}
}
} else {
// json encode the value for the row
value, err := json.Marshal(map[string]interface{}{
"Records": entry.Data["Records"],
})
case formatAccess:
// eventTime is taken from the first entry in the
// records.
events, ok := entry.Data["Records"].([]NotificationEvent)
if !ok {
return makePGError("unable to extract event time due to conversion error of entry.Data[\"Records\"]=%v", entry.Data["Records"])
}
eventTime, err := time.Parse(timeFormatAMZ, events[0].EventTime)
if err != nil {
return fmt.Errorf(
"Unable to encode event %v to JSON - got error - %v",
entry.Data["Records"], err,
)
return makePGError("unable to parse event time \"%s\": %v",
events[0].EventTime, err)
}
// upsert row into the table
_, err = pgC.preparedStmts["upsertRow"].Exec(entry.Data["Key"], value)
value, err := jsonEncodeEventData(entry.Data["Records"])
if err != nil {
return fmt.Errorf(
"Unable to upsert event with Key=%v and Value=%v - got postgres error - %v",
entry.Data["Key"], entry.Data["Records"], err,
)
return err
}
_, err = pgC.preparedStmts["insertRow"].Exec(eventTime, value)
if err != nil {
return makePGError("Unable to insert event with value=%v: %v",
value, err)
}
}

View File

@@ -17,6 +17,7 @@
package cmd
import (
"fmt"
"io/ioutil"
"time"
@@ -27,6 +28,7 @@ import (
// redisNotify to send logs to Redis server
type redisNotify struct {
Enable bool `json:"enable"`
Format string `json:"format"`
Addr string `json:"address"`
Password string `json:"password"`
Key string `json:"key"`
@@ -36,6 +38,11 @@ func (r *redisNotify) Validate() error {
if !r.Enable {
return nil
}
if r.Format != formatNamespace {
return fmt.Errorf(
"Redis Notifier Error: \"format\" must be \"%s\"",
formatNamespace)
}
if _, err := checkURL(r.Addr); err != nil {
return err
}