From a099319e66b55f98d85f648f71e54c142c65fb24 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Mon, 27 Mar 2017 23:57:25 +0530 Subject: [PATCH] Support `access` format for database notification targets (#3953) * Add configuration parameter "format" for db targets and perform configuration migration. * Add PostgreSQL `access` format: This causes Minio to append all events to the configured table. Prefix, suffix and event filters continue to be supported for this mode too. * Update documentation for PostgreSQL notification target. * Add MySQL `access` format: It is very similar to the same format for PostgreSQL. * Update MySQL notification documentation. --- cmd/config-migrate.go | 125 ++++++++ cmd/config-migrate_test.go | 11 +- cmd/config-old.go | 17 ++ cmd/{config-v16.go => config-v17.go} | 52 ++-- ...{config-v16_test.go => config-v17_test.go} | 32 +- cmd/gateway-main.go | 2 +- cmd/notifiers.go | 4 + cmd/notify-elasticsearch.go | 7 + cmd/notify-mysql.go | 210 +++++++++---- cmd/notify-postgresql.go | 287 ++++++++++++------ cmd/notify-redis.go | 7 + docs/bucket/notifications/README.md | 88 ++++-- docs/config/README.md | 2 +- docs/config/config.sample.json | 4 + 14 files changed, 637 insertions(+), 211 deletions(-) rename cmd/{config-v16.go => config-v17.go} (86%) rename cmd/{config-v16_test.go => config-v17_test.go} (79%) diff --git a/cmd/config-migrate.go b/cmd/config-migrate.go index 9fbd5b3cd..cc09661f7 100644 --- a/cmd/config-migrate.go +++ b/cmd/config-migrate.go @@ -85,6 +85,10 @@ func migrateConfig() error { if err := migrateV15ToV16(); err != nil { return err } + // Migration version '16' to '17'. + if err := migrateV16ToV17(); err != nil { + return err + } return nil } @@ -1082,3 +1086,124 @@ func migrateV15ToV16() error { log.Printf("Migration from version ‘%s’ to ‘%s’ completed successfully.\n", cv15.Version, srvConfig.Version) return nil } + +// Version '16' to '17' migration. Adds "format" configuration +// parameter for database targets. +func migrateV16ToV17() error { + configFile := getConfigFile() + + cv16 := &serverConfigV16{} + _, err := quick.Load(configFile, cv16) + if os.IsNotExist(err) { + return nil + } else if err != nil { + return fmt.Errorf("Unable to load config version ‘16’. %v", err) + } + if cv16.Version != "16" { + return nil + } + + // Copy over fields from V16 into V17 config struct + srvConfig := &serverConfigV17{ + Logger: &loggers{}, + Notify: ¬ifier{}, + } + srvConfig.Version = "17" + srvConfig.Credential = cv16.Credential + srvConfig.Region = cv16.Region + if srvConfig.Region == "" { + // Region needs to be set for AWS Signature Version 4. + srvConfig.Region = globalMinioDefaultRegion + } + + srvConfig.Logger.Console = cv16.Logger.Console + srvConfig.Logger.File = cv16.Logger.File + + // check and set notifiers config + if len(cv16.Notify.AMQP) == 0 { + srvConfig.Notify.AMQP = make(map[string]amqpNotify) + srvConfig.Notify.AMQP["1"] = amqpNotify{} + } else { + srvConfig.Notify.AMQP = cv16.Notify.AMQP + } + if len(cv16.Notify.ElasticSearch) == 0 { + srvConfig.Notify.ElasticSearch = make(map[string]elasticSearchNotify) + srvConfig.Notify.ElasticSearch["1"] = elasticSearchNotify{} + } else { + // IMPORTANT NOTE: Future migrations should remove + // this as existing configuration will already contain + // a value for the "format" parameter. + for k, v := range cv16.Notify.ElasticSearch.Clone() { + v.Format = formatNamespace + cv16.Notify.ElasticSearch[k] = v + } + srvConfig.Notify.ElasticSearch = cv16.Notify.ElasticSearch + } + if len(cv16.Notify.Redis) == 0 { + srvConfig.Notify.Redis = make(map[string]redisNotify) + srvConfig.Notify.Redis["1"] = redisNotify{} + } else { + // IMPORTANT NOTE: Future migrations should remove + // this as existing configuration will already contain + // a value for the "format" parameter. + for k, v := range cv16.Notify.Redis.Clone() { + v.Format = formatNamespace + cv16.Notify.Redis[k] = v + } + srvConfig.Notify.Redis = cv16.Notify.Redis + } + if len(cv16.Notify.PostgreSQL) == 0 { + srvConfig.Notify.PostgreSQL = make(map[string]postgreSQLNotify) + srvConfig.Notify.PostgreSQL["1"] = postgreSQLNotify{} + } else { + // IMPORTANT NOTE: Future migrations should remove + // this as existing configuration will already contain + // a value for the "format" parameter. + for k, v := range cv16.Notify.PostgreSQL.Clone() { + v.Format = formatNamespace + cv16.Notify.PostgreSQL[k] = v + } + srvConfig.Notify.PostgreSQL = cv16.Notify.PostgreSQL + } + if len(cv16.Notify.Kafka) == 0 { + srvConfig.Notify.Kafka = make(map[string]kafkaNotify) + srvConfig.Notify.Kafka["1"] = kafkaNotify{} + } else { + srvConfig.Notify.Kafka = cv16.Notify.Kafka + } + if len(cv16.Notify.NATS) == 0 { + srvConfig.Notify.NATS = make(map[string]natsNotify) + srvConfig.Notify.NATS["1"] = natsNotify{} + } else { + srvConfig.Notify.NATS = cv16.Notify.NATS + } + if len(cv16.Notify.Webhook) == 0 { + srvConfig.Notify.Webhook = make(map[string]webhookNotify) + srvConfig.Notify.Webhook["1"] = webhookNotify{} + } else { + srvConfig.Notify.Webhook = cv16.Notify.Webhook + } + if len(cv16.Notify.MySQL) == 0 { + srvConfig.Notify.MySQL = make(map[string]mySQLNotify) + srvConfig.Notify.MySQL["1"] = mySQLNotify{} + } else { + // IMPORTANT NOTE: Future migrations should remove + // this as existing configuration will already contain + // a value for the "format" parameter. + for k, v := range cv16.Notify.MySQL.Clone() { + v.Format = formatNamespace + cv16.Notify.MySQL[k] = v + } + srvConfig.Notify.MySQL = cv16.Notify.MySQL + } + + // Load browser config from existing config in the file. + srvConfig.Browser = cv16.Browser + + if err = quick.Save(configFile, srvConfig); err != nil { + return fmt.Errorf("Failed to migrate config from ‘%s’ to ‘%s’. %v", cv16.Version, srvConfig.Version, err) + } + + log.Printf("Migration from version ‘%s’ to ‘%s’ completed successfully.\n", cv16.Version, srvConfig.Version) + return nil +} diff --git a/cmd/config-migrate_test.go b/cmd/config-migrate_test.go index 9774bc412..5e7223e12 100644 --- a/cmd/config-migrate_test.go +++ b/cmd/config-migrate_test.go @@ -115,9 +115,13 @@ func TestServerConfigMigrateInexistentConfig(t *testing.T) { if err := migrateV15ToV16(); err != nil { t.Fatal("migrate v15 to v16 should succeed when no config file is found") } + if err := migrateV16ToV17(); err != nil { + t.Fatal("migrate v16 to v17 should succeed when no config file is found") + } + } -// Test if a config migration from v2 to v16 is successfully done +// Test if a config migration from v2 to v17 is successfully done func TestServerConfigMigrateV2toV16(t *testing.T) { rootPath, err := newTestConfig(globalMinioDefaultRegion) if err != nil { @@ -157,7 +161,7 @@ func TestServerConfigMigrateV2toV16(t *testing.T) { } // Check the version number in the upgraded config file - expectedVersion := v16 + expectedVersion := v17 if serverConfig.Version != expectedVersion { t.Fatalf("Expect version "+expectedVersion+", found: %v", serverConfig.Version) } @@ -231,4 +235,7 @@ func TestServerConfigMigrateFaultyConfig(t *testing.T) { if err := migrateV15ToV16(); err == nil { t.Fatal("migrateConfigV15ToV16() should fail with a corrupted json") } + if err := migrateV16ToV17(); err == nil { + t.Fatal("migrateConfigV16ToV17() should fail with a corrupted json") + } } diff --git a/cmd/config-old.go b/cmd/config-old.go index 41cb64440..438812275 100644 --- a/cmd/config-old.go +++ b/cmd/config-old.go @@ -413,3 +413,20 @@ type serverConfigV15 struct { // Notification queue configuration. Notify *notifier `json:"notify"` } + +// serverConfigV16 server configuration version '16' which is like +// version '15' except it makes a change to logging configuration. +type serverConfigV16 struct { + Version string `json:"version"` + + // S3 API configuration. + Credential credential `json:"credential"` + Region string `json:"region"` + Browser BrowserFlag `json:"browser"` + + // Additional error logging configuration. + Logger *loggers `json:"logger"` + + // Notification queue configuration. + Notify *notifier `json:"notify"` +} diff --git a/cmd/config-v16.go b/cmd/config-v17.go similarity index 86% rename from cmd/config-v16.go rename to cmd/config-v17.go index aee62356a..838353e60 100644 --- a/cmd/config-v16.go +++ b/cmd/config-v17.go @@ -29,12 +29,14 @@ import ( // Read Write mutex for safe access to ServerConfig. var serverConfigMu sync.RWMutex -const v16 = "16" +// Config version +const v17 = "17" -// serverConfigV16 server configuration version '16' which is like -// version '15' except it removes log level field and renames `fileName` -// field of File logger to `filename` -type serverConfigV16 struct { +// serverConfigV17 server configuration version '17' which is like +// version '16' except it adds support for "format" parameter in +// database event notification targets: PostgreSQL, MySQL, Redis and +// Elasticsearch. +type serverConfigV17 struct { Version string `json:"version"` // S3 API configuration. @@ -49,9 +51,9 @@ type serverConfigV16 struct { Notify *notifier `json:"notify"` } -func newServerConfigV16() *serverConfigV16 { - srvCfg := &serverConfigV16{ - Version: v16, +func newServerConfigV17() *serverConfigV17 { + srvCfg := &serverConfigV17{ + Version: v17, Credential: mustGetNewCredential(), Region: globalMinioDefaultRegion, Browser: true, @@ -87,7 +89,7 @@ func newServerConfigV16() *serverConfigV16 { // found, otherwise use default parameters func newConfig(envParams envParams) error { // Initialize server config. - srvCfg := newServerConfigV16() + srvCfg := newServerConfigV17() // If env is set for a fresh start, save them to config file. if globalIsEnvCreds { @@ -117,7 +119,7 @@ func newConfig(envParams envParams) error { // loadConfig - loads a new config from disk, overrides params from env // if found and valid func loadConfig(envParams envParams) error { - srvCfg := &serverConfigV16{ + srvCfg := &serverConfigV17{ Region: globalMinioDefaultRegion, Browser: true, } @@ -125,8 +127,8 @@ func loadConfig(envParams envParams) error { if _, err := quick.Load(getConfigFile(), srvCfg); err != nil { return err } - if srvCfg.Version != v16 { - return fmt.Errorf("configuration version mismatch. Expected: ‘%s’, Got: ‘%s’", srvCfg.Version, v16) + if srvCfg.Version != v17 { + return fmt.Errorf("configuration version mismatch. Expected: ‘%s’, Got: ‘%s’", srvCfg.Version, v17) } // If env is set override the credentials from config file. @@ -203,7 +205,7 @@ func checkDupJSONKeys(json string) error { // validateConfig checks for func validateConfig() error { - srvCfg := &serverConfigV16{ + srvCfg := &serverConfigV17{ Region: globalMinioDefaultRegion, Browser: true, } @@ -214,8 +216,8 @@ func validateConfig() error { } // Check if config version is valid - if srvCfg.Version != v16 { - return errors.New("bad config version, expected: " + v16) + if srvCfg.Version != v17 { + return errors.New("bad config version, expected: " + v17) } // Load config file json and check for duplication json keys @@ -254,10 +256,10 @@ func validateConfig() error { } // serverConfig server config. -var serverConfig *serverConfigV16 +var serverConfig *serverConfigV17 // GetVersion get current config version. -func (s serverConfigV16) GetVersion() string { +func (s serverConfigV17) GetVersion() string { serverConfigMu.RLock() defer serverConfigMu.RUnlock() @@ -265,7 +267,7 @@ func (s serverConfigV16) GetVersion() string { } // SetRegion set new region. -func (s *serverConfigV16) SetRegion(region string) { +func (s *serverConfigV17) SetRegion(region string) { serverConfigMu.Lock() defer serverConfigMu.Unlock() @@ -277,7 +279,7 @@ func (s *serverConfigV16) SetRegion(region string) { } // GetRegion get current region. -func (s serverConfigV16) GetRegion() string { +func (s serverConfigV17) GetRegion() string { serverConfigMu.RLock() defer serverConfigMu.RUnlock() @@ -290,7 +292,7 @@ func (s serverConfigV16) GetRegion() string { } // SetCredentials set new credentials. -func (s *serverConfigV16) SetCredential(creds credential) { +func (s *serverConfigV17) SetCredential(creds credential) { serverConfigMu.Lock() defer serverConfigMu.Unlock() @@ -299,7 +301,7 @@ func (s *serverConfigV16) SetCredential(creds credential) { } // GetCredentials get current credentials. -func (s serverConfigV16) GetCredential() credential { +func (s serverConfigV17) GetCredential() credential { serverConfigMu.RLock() defer serverConfigMu.RUnlock() @@ -307,16 +309,16 @@ func (s serverConfigV16) GetCredential() credential { } // SetBrowser set if browser is enabled. -func (s *serverConfigV16) SetBrowser(b BrowserFlag) { +func (s *serverConfigV17) SetBrowser(v BrowserFlag) { serverConfigMu.Lock() defer serverConfigMu.Unlock() // Set the new value. - s.Browser = b + s.Browser = v } // GetCredentials get current credentials. -func (s serverConfigV16) GetBrowser() BrowserFlag { +func (s serverConfigV17) GetBrowser() BrowserFlag { serverConfigMu.RLock() defer serverConfigMu.RUnlock() @@ -324,7 +326,7 @@ func (s serverConfigV16) GetBrowser() BrowserFlag { } // Save config. -func (s serverConfigV16) Save() error { +func (s serverConfigV17) Save() error { serverConfigMu.RLock() defer serverConfigMu.RUnlock() diff --git a/cmd/config-v16_test.go b/cmd/config-v17_test.go similarity index 79% rename from cmd/config-v16_test.go rename to cmd/config-v17_test.go index a3c9612a2..f0acd0406 100644 --- a/cmd/config-v16_test.go +++ b/cmd/config-v17_test.go @@ -109,8 +109,8 @@ func TestServerConfig(t *testing.T) { serverConfig.Logger.SetFile(fileLogger) // Match version. - if serverConfig.GetVersion() != v16 { - t.Errorf("Expecting version %s found %s", serverConfig.GetVersion(), v16) + if serverConfig.GetVersion() != v17 { + t.Errorf("Expecting version %s found %s", serverConfig.GetVersion(), v17) } // Attempt to save. @@ -215,7 +215,7 @@ func TestValidateConfig(t *testing.T) { configPath := filepath.Join(rootPath, minioConfigFile) - v := v16 + v := v17 testCases := []struct { configData string @@ -275,8 +275,32 @@ func TestValidateConfig(t *testing.T) { // Test 18 - Test Webhook {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "webhook": { "1": { "enable": true, "endpoint": "" } }}}`, false}, - // Test 19 - Test MySQL + // Test 20 - Test MySQL {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "mysql": { "1": { "enable": true, "dsnString": "", "table": "", "host": "", "port": "", "user": "", "password": "", "database": "" }}}}`, false}, + + // Test 21 - Test Format for MySQL + {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "mysql": { "1": { "enable": true, "dsnString": "", "format": "invalid", "table": "xxx", "host": "10.0.0.1", "port": "3306", "user": "abc", "password": "pqr", "database": "test1" }}}}`, false}, + + // Test 22 - Test valid Format for MySQL + {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "mysql": { "1": { "enable": true, "dsnString": "", "format": "namespace", "table": "xxx", "host": "10.0.0.1", "port": "3306", "user": "abc", "password": "pqr", "database": "test1" }}}}`, true}, + + // Test 23 - Test Format for PostgreSQL + {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "postgresql": { "1": { "enable": true, "connectionString": "", "format": "invalid", "table": "xxx", "host": "myhost", "port": "5432", "user": "abc", "password": "pqr", "database": "test1" }}}}`, false}, + + // Test 24 - Test valid Format for PostgreSQL + {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "postgresql": { "1": { "enable": true, "connectionString": "", "format": "namespace", "table": "xxx", "host": "myhost", "port": "5432", "user": "abc", "password": "pqr", "database": "test1" }}}}`, true}, + + // Test 25 - Test Format for ElasticSearch + {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "elasticsearch": { "1": { "enable": true, "format": "invalid", "url": "example.com", "index": "myindex" } }}}`, false}, + + // Test 26 - Test valid Format for ElasticSearch + {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "elasticsearch": { "1": { "enable": true, "format": "namespace", "url": "example.com", "index": "myindex" } }}}`, true}, + + // Test 27 - Test Format for Redis + {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "redis": { "1": { "enable": true, "format": "invalid", "address": "example.com", "password": "xxx", "key": "key1" } }}}`, false}, + + // Test 28 - Test valid Format for Redis + {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "redis": { "1": { "enable": true, "format": "namespace", "address": "example.com", "password": "xxx", "key": "key1" } }}}`, true}, } for i, testCase := range testCases { diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index 23d499f3e..024d6e951 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -96,7 +96,7 @@ func newGatewayLayer(backendType, accessKey, secretKey string) (GatewayLayer, er // only used in memory. func newGatewayConfig(accessKey, secretKey, region string) error { // Initialize server config. - srvCfg := newServerConfigV16() + srvCfg := newServerConfigV17() // If env is set for a fresh start, save them to config file. srvCfg.SetCredential(credential{ diff --git a/cmd/notifiers.go b/cmd/notifiers.go index 35cccea4a..5da9e086e 100644 --- a/cmd/notifiers.go +++ b/cmd/notifiers.go @@ -43,6 +43,10 @@ const ( queueTypeKafka = "kafka" // Static string for Webhooks queueTypeWebhook = "webhook" + + // Notifier format value constants + formatNamespace = "namespace" + formatAccess = "access" ) // Topic type. diff --git a/cmd/notify-elasticsearch.go b/cmd/notify-elasticsearch.go index e35eef0cd..e879ccff0 100644 --- a/cmd/notify-elasticsearch.go +++ b/cmd/notify-elasticsearch.go @@ -19,6 +19,7 @@ package cmd import ( "encoding/hex" "errors" + "fmt" "io/ioutil" "github.com/Sirupsen/logrus" @@ -29,6 +30,7 @@ import ( // elasticQueue is a elasticsearch event notification queue. type elasticSearchNotify struct { Enable bool `json:"enable"` + Format string `json:"format"` URL string `json:"url"` Index string `json:"index"` } @@ -37,6 +39,11 @@ func (e *elasticSearchNotify) Validate() error { if !e.Enable { return nil } + if e.Format != formatNamespace { + return fmt.Errorf( + "Elasticsearch Notifier Error: \"format\" must be \"%s\"", + formatNamespace) + } if _, err := checkURL(e.URL); err != nil { return err } diff --git a/cmd/notify-mysql.go b/cmd/notify-mysql.go index 63932b265..9d1bb17f6 100644 --- a/cmd/notify-mysql.go +++ b/cmd/notify-mysql.go @@ -14,11 +14,19 @@ * limitations under the License. */ -// MySQL Notifier implementation. A table with a specific -// structure (column names, column types, and primary key/uniqueness -// constraint) is used. The user may set the table name in the -// configuration. A sample SQL command that creates a command with the -// required structure is: +// MySQL Notifier implementation. Two formats, "namespace" and +// "access" are supported. +// +// * Namespace format +// +// On each create or update object event in Minio Object storage +// server, a row is created or updated in the table in MySQL. On each +// object removal, the corresponding row is deleted from the table. +// +// A table with a specific structure (column names, column types, and +// primary key/uniqueness constraint) is used. The user may set the +// table name in the configuration. A sample SQL command that creates +// a command with the required structure is: // // CREATE TABLE myminio ( // key_name VARCHAR(2048), @@ -30,10 +38,18 @@ // here. The implementation has been tested with MySQL Ver 14.14 // Distrib 5.7.17. // -// On each create or update object event in Minio Object storage -// server, a row is created or updated in the table in MySQL. On -// each object removal, the corresponding row is deleted from the -// table. +// * Access format +// +// On each event, a row is appended to the configured table. There is +// no deletion or modification of existing rows. +// +// A different table schema is used for this format. A sample SQL +// commant that creates a table with the required structure is: +// +// CREATE TABLE myminio ( +// event_time TIMESTAMP WITH TIME ZONE NOT NULL, +// event_data JSONB +// ); package cmd @@ -42,29 +58,53 @@ import ( "encoding/json" "fmt" "io/ioutil" + "time" "github.com/Sirupsen/logrus" "github.com/go-sql-driver/mysql" ) const ( - upsertRowMySQL = `INSERT INTO %s (key_name, value) + // Queries for format=namespace mode. + upsertRowForNSMySQL = `INSERT INTO %s (key_name, value) VALUES (?, ?) ON DUPLICATE KEY UPDATE value=VALUES(value); ` - deleteRowMySQL = ` DELETE FROM %s + deleteRowForNSMySQL = ` DELETE FROM %s WHERE key_name = ?;` - createTableMySQL = `CREATE TABLE %s ( + createTableForNSMySQL = `CREATE TABLE %s ( key_name VARCHAR(2048), value JSON, PRIMARY KEY (key_name) );` + + // Queries for format=access mode. + insertRowForAccessMySQL = `INSERT INTO %s (event_time, event_data) +VALUES (?, ?);` + createTableForAccessMySQL = `CREATE TABLE %s ( + event_time DATETIME NOT NULL, + event_data JSON +);` + + // Query to check if a table already exists. tableExistsMySQL = `SELECT 1 FROM %s;` ) +func makeMySQLError(msg string, a ...interface{}) error { + s := fmt.Sprintf(msg, a...) + return fmt.Errorf("MySQL Notifier Error: %s", s) +} + +var ( + myNFormatError = makeMySQLError(`"format" value is invalid - it must be one of "%s" or "%s".`, formatNamespace, formatAccess) + myNTableError = makeMySQLError("Table was not specified in the configuration.") +) + type mySQLNotify struct { Enable bool `json:"enable"` + Format string `json:"format"` + // pass data-source-name connection string in config // directly. This string is formatted according to // https://github.com/go-sql-driver/mysql#dsn-data-source-name @@ -86,14 +126,16 @@ func (m *mySQLNotify) Validate() error { if !m.Enable { return nil } + if m.Format != formatNamespace && m.Format != formatAccess { + return myNFormatError + } if m.DsnString == "" { if _, err := checkURL(m.Host); err != nil { return err } } if m.Table == "" { - return fmt.Errorf( - "MySQL Notifier Error: Table was not specified in configuration") + return myNTableError } return nil } @@ -101,6 +143,7 @@ func (m *mySQLNotify) Validate() error { type mySQLConn struct { dsnStr string table string + format string preparedStmts map[string]*sql.Stmt *sql.DB } @@ -126,30 +169,32 @@ func dialMySQL(msql mySQLNotify) (mySQLConn, error) { db, err := sql.Open("mysql", dsnStr) if err != nil { - return mySQLConn{}, fmt.Errorf( - "MySQL Notifier Error: Connection opening failure (dsnStr=%s): %v", - dsnStr, err, - ) + return mySQLConn{}, makeMySQLError( + "Connection opening failure (dsnStr=%s): %v", + dsnStr, err) } // ping to check that server is actually reachable. err = db.Ping() if err != nil { - return mySQLConn{}, fmt.Errorf( - "MySQL Notifier Error: Ping to server failed with: %v", - err, - ) + return mySQLConn{}, makeMySQLError( + "Ping to server failed with: %v", err) } // check that table exists - if not, create it. _, err = db.Exec(fmt.Sprintf(tableExistsMySQL, msql.Table)) if err != nil { + createStmt := createTableForNSMySQL + if msql.Format == formatAccess { + createStmt = createTableForAccessMySQL + } + // most likely, table does not exist. try to create it: - _, errCreate := db.Exec(fmt.Sprintf(createTableMySQL, msql.Table)) + _, errCreate := db.Exec(fmt.Sprintf(createStmt, msql.Table)) if errCreate != nil { // failed to create the table. error out. - return mySQLConn{}, fmt.Errorf( - "MySQL Notifier Error: 'Select' failed with %v, then 'Create Table' failed with %v", + return mySQLConn{}, makeMySQLError( + "'Select' failed with %v, then 'Create Table' failed with %v", err, errCreate, ) } @@ -157,19 +202,33 @@ func dialMySQL(msql mySQLNotify) (mySQLConn, error) { // create prepared statements stmts := make(map[string]*sql.Stmt) - // insert or update statement - stmts["upsertRow"], err = db.Prepare(fmt.Sprintf(upsertRowMySQL, msql.Table)) - if err != nil { - return mySQLConn{}, - fmt.Errorf("MySQL Notifier Error: create UPSERT prepared statement failed with: %v", err) - } - stmts["deleteRow"], err = db.Prepare(fmt.Sprintf(deleteRowMySQL, msql.Table)) - if err != nil { - return mySQLConn{}, - fmt.Errorf("MySQL Notifier Error: create DELETE prepared statement failed with: %v", err) - } + switch msql.Format { + case formatNamespace: + // insert or update statement + stmts["upsertRow"], err = db.Prepare(fmt.Sprintf(upsertRowForNSMySQL, + msql.Table)) + if err != nil { + return mySQLConn{}, + makeMySQLError("create UPSERT prepared statement failed with: %v", err) + } + // delete statement + stmts["deleteRow"], err = db.Prepare(fmt.Sprintf(deleteRowForNSMySQL, + msql.Table)) + if err != nil { + return mySQLConn{}, + makeMySQLError("create DELETE prepared statement failed with: %v", err) + } + case formatAccess: + // insert statement + stmts["insertRow"], err = db.Prepare(fmt.Sprintf(insertRowForAccessMySQL, + msql.Table)) + if err != nil { + return mySQLConn{}, makeMySQLError( + "create INSERT prepared statement failed with: %v", err) + } - return mySQLConn{dsnStr, msql.Table, stmts, db}, nil + } + return mySQLConn{dsnStr, msql.Table, msql.Format, stmts, db}, nil } func newMySQLNotify(accountID string) (*logrus.Logger, error) { @@ -210,35 +269,66 @@ func (myC mySQLConn) Fire(entry *logrus.Entry) error { return nil } - // Check for event delete - if eventMatch(entryEventType, []string{"s3:ObjectRemoved:*"}) { - // delete row from the table - _, err := myC.preparedStmts["deleteRow"].Exec(entry.Data["Key"]) - if err != nil { - return fmt.Errorf( - "Error deleting event with key = %v - got mysql error - %v", - entry.Data["Key"], err, - ) - } - } else { - // json encode the value for the row + jsonEncoder := func(d interface{}) ([]byte, error) { value, err := json.Marshal(map[string]interface{}{ - "Records": entry.Data["Records"], + "Records": d, }) if err != nil { - return fmt.Errorf( - "Unable to encode event %v to JSON - got error - %v", - entry.Data["Records"], err, - ) + return nil, makeMySQLError( + "Unable to encode event %v to JSON: %v", d, err) + } + return value, nil + } + + switch myC.format { + case formatNamespace: + // Check for event delete + if eventMatch(entryEventType, []string{"s3:ObjectRemoved:*"}) { + // delete row from the table + _, err := myC.preparedStmts["deleteRow"].Exec(entry.Data["Key"]) + if err != nil { + return makeMySQLError( + "Error deleting event with key = %v - got mysql error - %v", + entry.Data["Key"], err, + ) + } + } else { + value, err := jsonEncoder(entry.Data["Records"]) + if err != nil { + return err + } + + // upsert row into the table + _, err = myC.preparedStmts["upsertRow"].Exec(entry.Data["Key"], value) + if err != nil { + return makeMySQLError( + "Unable to upsert event with Key=%v and Value=%v - got mysql error - %v", + entry.Data["Key"], entry.Data["Records"], err, + ) + } + } + case formatAccess: + // eventTime is taken from the first entry in the + // records. + events, ok := entry.Data["Records"].([]NotificationEvent) + if !ok { + return makeMySQLError("unable to extract event time due to conversion error of entry.Data[\"Records\"]=%v", entry.Data["Records"]) + } + eventTime, err := time.Parse(timeFormatAMZ, events[0].EventTime) + if err != nil { + return makeMySQLError("unable to parse event time \"%s\": %v", + events[0].EventTime, err) } - // upsert row into the table - _, err = myC.preparedStmts["upsertRow"].Exec(entry.Data["Key"], value) + value, err := jsonEncodeEventData(entry.Data["Records"]) if err != nil { - return fmt.Errorf( - "Unable to upsert event with Key=%v and Value=%v - got mysql error - %v", - entry.Data["Key"], entry.Data["Records"], err, - ) + return err + } + + _, err = myC.preparedStmts["insertRow"].Exec(eventTime, value) + if err != nil { + return makeMySQLError("Unable to insert event with value=%v: %v", + value, err) } } diff --git a/cmd/notify-postgresql.go b/cmd/notify-postgresql.go index fadecd332..2b3cec9f9 100644 --- a/cmd/notify-postgresql.go +++ b/cmd/notify-postgresql.go @@ -14,11 +14,20 @@ * limitations under the License. */ -// PostgreSQL Notifier implementation. A table with a specific -// structure (column names, column types, and primary key/uniqueness -// constraint) is used. The user may set the table name in the -// configuration. A sample SQL command that creates a command with the -// required structure is: +// PostgreSQL Notifier implementation. Two formats, "namespace" and +// "access" are supported. +// +// * Namespace format +// +// On each create or update object event in Minio Object storage +// server, a row is created or updated in the table in Postgres. On +// each object removal, the corresponding row is deleted from the +// table. +// +// A table with a specific structure (column names, column types, and +// primary key/uniqueness constraint) is used. The user may set the +// table name in the configuration. A sample SQL command that creates +// a table with the required structure is: // // CREATE TABLE myminio ( // key VARCHAR PRIMARY KEY, @@ -29,10 +38,18 @@ // (UPSERT) is used here, so the minimum version of PostgreSQL // required is 9.5. // -// On each create or update object event in Minio Object storage -// server, a row is created or updated in the table in Postgres. On -// each object removal, the corresponding row is deleted from the -// table. +// * Access format +// +// On each event, a row is appended to the configured table. There is +// no deletion or modification of existing rows. +// +// A different table schema is used for this format. A sample SQL +// commant that creates a table with the required structure is: +// +// CREATE TABLE myminio ( +// event_time TIMESTAMP WITH TIME ZONE NOT NULL, +// event_data JSONB +// ); package cmd @@ -42,54 +59,91 @@ import ( "fmt" "io/ioutil" "strings" + "time" "github.com/Sirupsen/logrus" - // libpq db driver is usually imported blank - see examples in - // https://godoc.org/github.com/lib/pq + // Register postgres driver _ "github.com/lib/pq" ) const ( - upsertRow = `INSERT INTO %s (key, value) + // Queries for format=namespace mode. Here the `key` column is + // the bucket and object of the event. When objects are + // deleted, the corresponding row is deleted in the + // table. When objects are created or over-written, rows are + // inserted or updated respectively in the table. + upsertRowForNS = `INSERT INTO %s (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value;` - deleteRow = ` DELETE FROM %s + deleteRowForNS = ` DELETE FROM %s WHERE key = $1;` - createTable = `CREATE TABLE %s ( + createTableForNS = `CREATE TABLE %s ( key VARCHAR PRIMARY KEY, value JSONB );` + + // Queries for format=access mode. Here the `event_time` + // column of the table, stores the time at which the event + // occurred in the Minio server. + insertRowForAccess = `INSERT INTO %s (event_time, event_data) +VALUES ($1, $2);` + createTableForAccess = `CREATE TABLE %s ( + event_time TIMESTAMP WITH TIME ZONE NOT NULL, + event_data JSONB +);` + + // Query to check if a table already exists. tableExists = `SELECT 1 FROM %s;` ) +func makePGError(msg string, a ...interface{}) error { + s := fmt.Sprintf(msg, a...) + return fmt.Errorf("PostgreSQL Notifier Error: %s", s) +} + +var ( + pgNFormatError = makePGError(`"format" value is invalid - it must be one of "%s" or "%s".`, formatNamespace, formatAccess) + pgNTableError = makePGError("Table was not specified in the configuration.") +) + type postgreSQLNotify struct { Enable bool `json:"enable"` - // pass connection string in config directly. This string is + Format string `json:"format"` + + // Pass connection string in config directly. This string is // formatted according to // https://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters ConnectionString string `json:"connectionString"` // specifying a table name is required. Table string `json:"table"` - // uses the values below if no connection string is specified - // - however the connection string method offers more - // flexibility. - Host string `json:"host"` - Port string `json:"port"` - User string `json:"user"` - Password string `json:"password"` - Database string `json:"database"` + // The values below, if non-empty are appended to + // ConnectionString above. Default values are shown in + // comments below (implicitly used by the library). + Host string `json:"host"` // default: localhost + Port string `json:"port"` // default: 5432 + User string `json:"user"` // default: user running minio + Password string `json:"password"` // default: no password + Database string `json:"database"` // default: same as user } func (p *postgreSQLNotify) Validate() error { if !p.Enable { return nil } - if _, err := checkURL(p.Host); err != nil { - return err + if p.Format != formatNamespace && p.Format != formatAccess { + return pgNFormatError + } + if p.ConnectionString == "" { + if _, err := checkURL(p.Host); err != nil { + return err + } + } + if p.Table == "" { + return pgNTableError } return nil } @@ -97,6 +151,7 @@ func (p *postgreSQLNotify) Validate() error { type pgConn struct { connStr string table string + format string preparedStmts map[string]*sql.Stmt *sql.DB } @@ -106,61 +161,53 @@ func dialPostgreSQL(pgN postgreSQLNotify) (pgConn, error) { return pgConn{}, errNotifyNotEnabled } - // check that table is specified - if pgN.Table == "" { - return pgConn{}, fmt.Errorf( - "PostgreSQL Notifier Error: Table was not specified in configuration") + // collect connection params + params := []string{pgN.ConnectionString} + if pgN.Host != "" { + params = append(params, "host="+pgN.Host) } - - connStr := pgN.ConnectionString - // check if connection string is specified - if connStr == "" { - // build from other parameters - params := []string{} - if pgN.Host != "" { - params = append(params, "host="+pgN.Host) - } - if pgN.Port != "" { - params = append(params, "port="+pgN.Port) - } - if pgN.User != "" { - params = append(params, "user="+pgN.User) - } - if pgN.Password != "" { - params = append(params, "password="+pgN.Password) - } - if pgN.Database != "" { - params = append(params, "dbname="+pgN.Database) - } - connStr = strings.Join(params, " ") + if pgN.Port != "" { + params = append(params, "port="+pgN.Port) } + if pgN.User != "" { + params = append(params, "user="+pgN.User) + } + if pgN.Password != "" { + params = append(params, "password="+pgN.Password) + } + if pgN.Database != "" { + params = append(params, "dbname="+pgN.Database) + } + connStr := strings.Join(params, " ") db, err := sql.Open("postgres", connStr) if err != nil { - return pgConn{}, fmt.Errorf( - "PostgreSQL Notifier Error: Connection opening failure (connectionString=%s): %v", - connStr, err, - ) + return pgConn{}, makePGError( + "Connection opening failure (connectionString=%s): %v", + connStr, err) } // ping to check that server is actually reachable. err = db.Ping() if err != nil { - return pgConn{}, fmt.Errorf( - "PostgreSQL Notifier Error: Ping to server failed with: %v", - err, - ) + return pgConn{}, makePGError("Ping to server failed with: %v", + err) } // check that table exists - if not, create it. _, err = db.Exec(fmt.Sprintf(tableExists, pgN.Table)) if err != nil { + createStmt := createTableForNS + if pgN.Format == formatAccess { + createStmt = createTableForAccess + } + // most likely, table does not exist. try to create it: - _, errCreate := db.Exec(fmt.Sprintf(createTable, pgN.Table)) + _, errCreate := db.Exec(fmt.Sprintf(createStmt, pgN.Table)) if errCreate != nil { // failed to create the table. error out. - return pgConn{}, fmt.Errorf( - "PostgreSQL Notifier Error: 'Select' failed with %v, then 'Create Table' failed with %v", + return pgConn{}, makePGError( + "'Select' failed with %v, then 'Create Table' failed with %v", err, errCreate, ) } @@ -168,19 +215,33 @@ func dialPostgreSQL(pgN postgreSQLNotify) (pgConn, error) { // create prepared statements stmts := make(map[string]*sql.Stmt) - // insert or update statement - stmts["upsertRow"], err = db.Prepare(fmt.Sprintf(upsertRow, pgN.Table)) - if err != nil { - return pgConn{}, - fmt.Errorf("PostgreSQL Notifier Error: create UPSERT prepared statement failed with: %v", err) - } - stmts["deleteRow"], err = db.Prepare(fmt.Sprintf(deleteRow, pgN.Table)) - if err != nil { - return pgConn{}, - fmt.Errorf("PostgreSQL Notifier Error: create DELETE prepared statement failed with: %v", err) + switch pgN.Format { + case formatNamespace: + // insert or update statement + stmts["upsertRow"], err = db.Prepare(fmt.Sprintf(upsertRowForNS, + pgN.Table)) + if err != nil { + return pgConn{}, makePGError( + "create UPSERT prepared statement failed with: %v", err) + } + // delete statement + stmts["deleteRow"], err = db.Prepare(fmt.Sprintf(deleteRowForNS, + pgN.Table)) + if err != nil { + return pgConn{}, makePGError( + "create DELETE prepared statement failed with: %v", err) + } + case formatAccess: + // insert statement + stmts["insertRow"], err = db.Prepare(fmt.Sprintf(insertRowForAccess, + pgN.Table)) + if err != nil { + return pgConn{}, makePGError( + "create INSERT prepared statement failed with: %v", err) + } } - return pgConn{connStr, pgN.Table, stmts, db}, nil + return pgConn{connStr, pgN.Table, pgN.Format, stmts, db}, nil } func newPostgreSQLNotify(accountID string) (*logrus.Logger, error) { @@ -212,6 +273,18 @@ func (pgC pgConn) Close() { _ = pgC.DB.Close() } +func jsonEncodeEventData(d interface{}) ([]byte, error) { + // json encode the value for the row + value, err := json.Marshal(map[string]interface{}{ + "Records": d, + }) + if err != nil { + return nil, makePGError( + "Unable to encode event %v to JSON: %v", d, err) + } + return value, nil +} + func (pgC pgConn) Fire(entry *logrus.Entry) error { // get event type by trying to convert to string entryEventType, ok := entry.Data["EventType"].(string) @@ -221,35 +294,55 @@ func (pgC pgConn) Fire(entry *logrus.Entry) error { return nil } - // Check for event delete - if eventMatch(entryEventType, []string{"s3:ObjectRemoved:*"}) { - // delete row from the table - _, err := pgC.preparedStmts["deleteRow"].Exec(entry.Data["Key"]) - if err != nil { - return fmt.Errorf( - "Error deleting event with key = %v - got postgres error - %v", - entry.Data["Key"], err, - ) + switch pgC.format { + case formatNamespace: + // Check for event delete + if eventMatch(entryEventType, []string{"s3:ObjectRemoved:*"}) { + // delete row from the table + _, err := pgC.preparedStmts["deleteRow"].Exec(entry.Data["Key"]) + if err != nil { + return makePGError( + "Error deleting event with key=%v: %v", + entry.Data["Key"], err, + ) + } + } else { + value, err := jsonEncodeEventData(entry.Data["Records"]) + if err != nil { + return err + } + + // upsert row into the table + _, err = pgC.preparedStmts["upsertRow"].Exec(entry.Data["Key"], value) + if err != nil { + return makePGError( + "Unable to upsert event with key=%v and value=%v: %v", + entry.Data["Key"], entry.Data["Records"], err, + ) + } } - } else { - // json encode the value for the row - value, err := json.Marshal(map[string]interface{}{ - "Records": entry.Data["Records"], - }) + case formatAccess: + // eventTime is taken from the first entry in the + // records. + events, ok := entry.Data["Records"].([]NotificationEvent) + if !ok { + return makePGError("unable to extract event time due to conversion error of entry.Data[\"Records\"]=%v", entry.Data["Records"]) + } + eventTime, err := time.Parse(timeFormatAMZ, events[0].EventTime) if err != nil { - return fmt.Errorf( - "Unable to encode event %v to JSON - got error - %v", - entry.Data["Records"], err, - ) + return makePGError("unable to parse event time \"%s\": %v", + events[0].EventTime, err) } - // upsert row into the table - _, err = pgC.preparedStmts["upsertRow"].Exec(entry.Data["Key"], value) + value, err := jsonEncodeEventData(entry.Data["Records"]) if err != nil { - return fmt.Errorf( - "Unable to upsert event with Key=%v and Value=%v - got postgres error - %v", - entry.Data["Key"], entry.Data["Records"], err, - ) + return err + } + + _, err = pgC.preparedStmts["insertRow"].Exec(eventTime, value) + if err != nil { + return makePGError("Unable to insert event with value=%v: %v", + value, err) } } diff --git a/cmd/notify-redis.go b/cmd/notify-redis.go index 1f385d472..c25f0ba71 100644 --- a/cmd/notify-redis.go +++ b/cmd/notify-redis.go @@ -17,6 +17,7 @@ package cmd import ( + "fmt" "io/ioutil" "time" @@ -27,6 +28,7 @@ import ( // redisNotify to send logs to Redis server type redisNotify struct { Enable bool `json:"enable"` + Format string `json:"format"` Addr string `json:"address"` Password string `json:"password"` Key string `json:"key"` @@ -36,6 +38,11 @@ func (r *redisNotify) Validate() error { if !r.Enable { return nil } + if r.Format != formatNamespace { + return fmt.Errorf( + "Redis Notifier Error: \"format\" must be \"%s\"", + formatNamespace) + } if _, err := checkURL(r.Addr); err != nil { return err } diff --git a/docs/bucket/notifications/README.md b/docs/bucket/notifications/README.md index c7fdd7c6c..441348704 100644 --- a/docs/bucket/notifications/README.md +++ b/docs/bucket/notifications/README.md @@ -372,36 +372,72 @@ go run nats.go ## Publish Minio events via PostgreSQL -Install PostgreSQL from [here](https://www.postgresql.org/). +Install [PostgreSQL](https://www.postgresql.org/) database server. For illustrative purposes, we have set the "postgres" user password as `password` and created a database called `minio_events` to store the events. + +This notification target supports two formats: _namespace_ and _access_. + +When the _namespace_ format is used, Minio synchronizes objects in the bucket with rows in the table. It creates rows with two columns: key and value. The key is the bucket and object name of an object that exists in Minio. The value is JSON encoded event data about the operation that created/replaced the object in Minio. When objects are updated or deleted, the corresponding row from this table is updated or deleted respectively. + +When the _access_ format is used, Minio appends events to a table. It creates rows with two columns: event_time and event_data. The event_time is the time at which the event occurred in the Minio server. The event_data is the JSON encoded event data about the operation on an object. No rows are deleted or modified in this format. + +The steps below show how to use this notification target in `namespace` format. The other format is very similar and is omitted for brevity. ### Step 1: Add PostgreSQL endpoint to Minio -The default location of Minio server configuration file is ``~/.minio/config.json``. Update the PostgreSQL configuration block in ``config.json`` as follows: +The default location of Minio server configuration file is ``~/.minio/config.json``. The PostgreSQL configuration is located in the `postgresql` key under the `notify` top-level key. Create a configuration key-value pair here for your PostgreSQL instance. The key is a name for your PostgreSQL endpoint, and the value is a collection of key-value parameters described in the table below. + +| Parameter | Type | Description | +|:---|:---|:---| +| `enable` | _bool_ | (Required) Is this server endpoint configuration active/enabled? | +| `format` | _string_ | (Required) Either `namespace` or `access`. | +| `connectionString` | _string_ | (Optional) [Connection string parameters](https://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters) for the PostgreSQL server. Can be used to set `sslmode` for example. | +| `table` | _string_ | (Required) Table name in which events will be stored/updated. If the table does not exist, the Minio server creates it at start-up.| +| `host` | _string_ | (Optional) Host name of the PostgreSQL server. Defaults to `localhost`| +| `port` | _string_ | (Optional) Port on which to connect to PostgreSQL server. Defaults to `5432`. | +| `user` | _string_ | (Optional) Database user name. Defaults to user running the server process. | +| `password` | _string_ | (Optional) Database password. | +| `database` | _string_ | (Optional) Database name. | + +An example of PostgreSQL configuration is as follows: ``` "postgresql": { "1": { "enable": true, - "connectionString": "", + "format": "namespace", + "connectionString": "sslmode=disable", "table": "bucketevents", "host": "127.0.0.1", "port": "5432", "user": "postgres", - "password": "mypassword", - "database": "bucketevents_db" + "password": "password", + "database": "minio_events" } } ``` -Restart Minio server to reflect config changes. ``bucketevents`` is the database table used by PostgreSQL in this example. +Note that for illustration here, we have disabled SSL. In the interest of security, for production this is not recommended. + +After updating the configuration file, restart the Minio server to put the changes into effect. The server will print a line like `SQS ARNs: arn:minio:sqs:us-east-1:1:postgresql` at start-up if there were no errors. + +Note that, you can add as many PostgreSQL server endpoint configurations as needed by providing an identifier (like "1" in the example above) for the PostgreSQL instance and an object of per-server configuration parameters. + ### Step 2: Enable bucket notification using Minio client -We will enable bucket event notification to trigger whenever a JPEG image is uploaded or deleted from ``images`` bucket on ``myminio`` server. Here ARN value is ``arn:minio:sqs:us-east-1:1:postgresql``. To understand more about ARN please follow [AWS ARN](http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html) documentation. +We will now enable bucket event notifications on a bucket named `images`. Whenever a JPEG image is created/overwritten, a new row is added or an existing row is updated in the PostgreSQL configured above. When an existing object is deleted, the corresponding row is deleted from the PostgreSQL table. Thus, the rows in the PostgreSQL table, reflect the `.jpg` objects in the `images` bucket. + +To configure this bucket notification, we need the ARN printed by Minio in the previous step. Additional information about ARN is available [here](http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html). + +With the `mc` tool, the configuration is very simple to add. Let us say that the Minio server is aliased as `myminio` in our mc configuration. Execute the following: ``` +# Create bucket named `images` in myminio mc mb myminio/images -mc events add myminio/images arn:minio:sqs:us-east-1:1:postgresql --suffix .jpg +# Add notification configuration on the `images` bucket using the MySQL ARN. The --suffix argument filters events. +mc events add myminio/images arn:minio:sqs:us-east-1:1:postgresql --suffix .jpg +# Print out the notification configuration on the `images` bucket. +mc events list myminio/images mc events list myminio/images arn:minio:sqs:us-east-1:1:postgresql s3:ObjectCreated:*,s3:ObjectRemoved:* Filter: suffix=”.jpg” ``` @@ -414,12 +450,13 @@ Open another terminal and upload a JPEG image into ``images`` bucket. mc cp myphoto.jpg myminio/images ``` -Open PostgreSQL terminal to list the saved event notification logs. +Open PostgreSQL terminal to list the rows in the `bucketevents` table. ``` -bucketevents_db=# select * from bucketevents; +$ psql -h 127.0.0.1 -u postgres -p minio_events +minio_events=# select * from bucketevents; -key | value +key | value --------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- images/myphoto.jpg | {"Records": [{"s3": {"bucket": {"arn": "arn:aws:s3:::images", "name": "images", "ownerIdentity": {"principalId": "minio"}}, "object": {"key": "myphoto.jpg", "eTag": "1d97bf45ecb37f7a7b699418070df08f", "size": 56060, "sequencer": "147CE57C70B31931"}, "configurationId": "Config", "s3SchemaVersion": "1.0"}, "awsRegion": "us-east-1", "eventName": "s3:ObjectCreated:Put", "eventTime": "2016-10-12T21:18:20Z", "eventSource": "aws:s3", "eventVersion": "2.0", "userIdentity": {"principalId": "minio"}, "responseElements": {}, "requestParameters": {"sourceIPAddress": "[::1]:39706"}}]} (1 row) @@ -430,20 +467,29 @@ key | value Install MySQL from [here](https://dev.mysql.com/downloads/mysql/). For illustrative purposes, we have set the root password as `password` and created a database called `miniodb` to store the events. +This notification target supports two formats: _namespace_ and _access_. + +When the _namespace_ format is used, Minio synchronizes objects in the bucket with rows in the table. It creates rows with two columns: key_name and value. The key_name is the bucket and object name of an object that exists in Minio. The value is JSON encoded event data about the operation that created/replaced the object in Minio. When objects are updated or deleted, the corresponding row from this table is updated or deleted respectively. + +When the _access_ format is used, Minio appends events to a table. It creates rows with two columns: event_time and event_data. The event_time is the time at which the event occurred in the Minio server. The event_data is the JSON encoded event data about the operation on an object. No rows are deleted or modified in this format. + +The steps below show how to use this notification target in `namespace` format. The other format is very similar and is omitted for brevity. + ### Step 1: Add MySQL server endpoint configuration to Minio The default location of Minio server configuration file is ``~/.minio/config.json``. The MySQL configuration is located in the `mysql` key under the `notify` top-level key. Create a configuration key-value pair here for your MySQL instance. The key is a name for your MySQL endpoint, and the value is a collection of key-value parameters described in the table below. -| Parameter | Value type | Description | +| Parameter | Type | Description | |:---|:---|:---| -| `enable` | Boolean | (Required) Is this server endpoint configuration active/enabled? | -| `dsnString` | String | (Optional) [Data-Source-Name connection string](https://github.com/go-sql-driver/mysql#dsn-data-source-name) for the MySQL server. If not specified, the connection information specified by the `host`, `port`, `user`, `password` and `database` parameters are used. | -| `table` | String | (Required) Table name in which events will be stored/updated. If the table does not exist, the Minio server creates it at start-up.| -| `host` | String | Host name of the MySQL server (used only if `dsnString` is empty). | -| `port` | String | Port on which to connect to the MySQL server (used only if `dsnString` is empty). | -| `user` | String | Database user-name (used only if `dsnString` is empty). | -| `password` | String | Database password (used only if `dsnString` is empty). | -| `database` | String | Database name (used only if `dsnString` is empty). | +| `enable` | _bool_ | (Required) Is this server endpoint configuration active/enabled? | +| `format` | _string_ | (Required) Either `namespace` or `access`. | +| `dsnString` | _string_ | (Optional) [Data-Source-Name connection string](https://github.com/go-sql-driver/mysql#dsn-data-source-name) for the MySQL server. If not specified, the connection information specified by the `host`, `port`, `user`, `password` and `database` parameters are used. | +| `table` | _string_ | (Required) Table name in which events will be stored/updated. If the table does not exist, the Minio server creates it at start-up.| +| `host` | _string_ | Host name of the MySQL server (used only if `dsnString` is empty). | +| `port` | _string_ | Port on which to connect to the MySQL server (used only if `dsnString` is empty). | +| `user` | _string_ | Database user-name (used only if `dsnString` is empty). | +| `password` | _string_ | Database password (used only if `dsnString` is empty). | +| `database` | _string_ | Database name (used only if `dsnString` is empty). | An example of MySQL configuration is as follows: @@ -479,7 +525,7 @@ With the `mc` tool, the configuration is very simple to add. Let us say that the # Create bucket named `images` in myminio mc mb myminio/images # Add notification configuration on the `images` bucket using the MySQL ARN. The --suffix argument filters events. -mc events add myminio/images arn:minio:sqs:us-east-1:1:postgresql --suffix .jpg +mc events add myminio/images arn:minio:sqs:us-east-1:1:postgresql --suffix .jpg # Print out the notification configuration on the `images` bucket. mc events list myminio/images arn:minio:sqs:us-east-1:1:postgresql s3:ObjectCreated:*,s3:ObjectRemoved:* Filter: suffix=”.jpg” diff --git a/docs/config/README.md b/docs/config/README.md index 5b7cb5a08..508e0e0e6 100644 --- a/docs/config/README.md +++ b/docs/config/README.md @@ -1,4 +1,4 @@ -# Minio Server `config.json` (v16) Guide [![Slack](https://slack.minio.io/slack?type=svg)](https://slack.minio.io) [![Go Report Card](https://goreportcard.com/badge/minio/minio)](https://goreportcard.com/report/minio/minio) [![Docker Pulls](https://img.shields.io/docker/pulls/minio/minio.svg?maxAge=604800)](https://hub.docker.com/r/minio/minio/) [![codecov](https://codecov.io/gh/minio/minio/branch/master/graph/badge.svg)](https://codecov.io/gh/minio/minio) +# Minio Server `config.json` (v17) Guide [![Slack](https://slack.minio.io/slack?type=svg)](https://slack.minio.io) [![Go Report Card](https://goreportcard.com/badge/minio/minio)](https://goreportcard.com/report/minio/minio) [![Docker Pulls](https://img.shields.io/docker/pulls/minio/minio.svg?maxAge=604800)](https://hub.docker.com/r/minio/minio/) [![codecov](https://codecov.io/gh/minio/minio/branch/master/graph/badge.svg)](https://codecov.io/gh/minio/minio) Minio server stores all its configuration data in `${HOME}/.minio/config.json` file by default. Following sections provide detailed explanation of each fields and how to customize them. A complete example of `config.json` is available [here](https://raw.githubusercontent.com/minio/minio/master/docs/config/config.sample.json) diff --git a/docs/config/config.sample.json b/docs/config/config.sample.json index 8e421c3fc..65f521f0b 100644 --- a/docs/config/config.sample.json +++ b/docs/config/config.sample.json @@ -53,6 +53,7 @@ "elasticsearch": { "1": { "enable": true, + "format": "namespace", "url": "http://127.0.0.1:9200", "index": "bucketevents" } @@ -60,6 +61,7 @@ "redis": { "1": { "enable": true, + "format": "namespace", "address": "127.0.0.1:6379", "password": "yoursecret", "key": "bucketevents" @@ -68,6 +70,7 @@ "postgresql": { "1": { "enable": true, + "format": "namespace", "connectionString": "", "table": "bucketevents", "host": "127.0.0.1", @@ -93,6 +96,7 @@ "mysql": { "1": { "enable": true, + "format": "namespace", "dsnString": "", "table": "minio_images", "host": "172.17.0.1",