mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
fix: config to support keys with special values (#9304)
This PR adds context-based `k=v` splits based on the sub-system which was obtained, if the keys are not provided an error will be thrown during parsing, if keys are provided with wrong values an error will be thrown. Keys can now have values which are of a much more complex form such as `k="v=v"` or `k=" v = v"` and other variations. additionally, deprecate unnecessary postgres/mysql configuration styles, support only - connection_string for Postgres - dsn_string for MySQL All other parameters are removed.
This commit is contained in:
parent
7c919329e8
commit
3184205519
@ -22,6 +22,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/minio/minio-go/v6/pkg/set"
|
||||
@ -196,6 +197,15 @@ func (kvs KVS) Empty() bool {
|
||||
return len(kvs) == 0
|
||||
}
|
||||
|
||||
// Keys returns the list of keys for the current KVS
|
||||
func (kvs KVS) Keys() []string {
|
||||
var keys = make([]string, len(kvs))
|
||||
for i := range kvs {
|
||||
keys[i] = kvs[i].Key
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
func (kvs KVS) String() string {
|
||||
var s strings.Builder
|
||||
for _, kv := range kvs {
|
||||
@ -559,6 +569,33 @@ func (c Config) Clone() Config {
|
||||
return cp
|
||||
}
|
||||
|
||||
// Converts an input string of form "k1=v1 k2=v2" into fields
|
||||
// of ["k1=v1", "k2=v2"], the tokenization of each `k=v`
|
||||
// happens with the right number of input keys, if keys
|
||||
// input is empty returned value is empty slice as well.
|
||||
func kvFields(input string, keys []string) []string {
|
||||
var valueIndexes []int
|
||||
for _, key := range keys {
|
||||
i := strings.Index(input, key+KvSeparator)
|
||||
if i == -1 {
|
||||
continue
|
||||
}
|
||||
valueIndexes = append(valueIndexes, i)
|
||||
}
|
||||
|
||||
sort.Ints(valueIndexes)
|
||||
var fields = make([]string, len(valueIndexes))
|
||||
for i := range valueIndexes {
|
||||
j := i + 1
|
||||
if j < len(valueIndexes) {
|
||||
fields[i] = strings.TrimSpace(input[valueIndexes[i]:valueIndexes[j]])
|
||||
} else {
|
||||
fields[i] = strings.TrimSpace(input[valueIndexes[i]:])
|
||||
}
|
||||
}
|
||||
return fields
|
||||
}
|
||||
|
||||
// SetKVS - set specific key values per sub-system.
|
||||
func (c Config) SetKVS(s string, defaultKVS map[string]KVS) error {
|
||||
if len(s) == 0 {
|
||||
@ -581,9 +618,20 @@ func (c Config) SetKVS(s string, defaultKVS map[string]KVS) error {
|
||||
return Errorf("sub-system '%s' only supports single target", subSystemValue[0])
|
||||
}
|
||||
|
||||
tgt := Default
|
||||
subSys := subSystemValue[0]
|
||||
if len(subSystemValue) == 2 {
|
||||
tgt = subSystemValue[1]
|
||||
}
|
||||
|
||||
fields := kvFields(inputs[1], defaultKVS[subSys].Keys())
|
||||
if len(fields) == 0 {
|
||||
return Errorf("sub-system '%s' cannot have empty keys", subSys)
|
||||
}
|
||||
|
||||
var kvs = KVS{}
|
||||
var prevK string
|
||||
for _, v := range strings.Fields(inputs[1]) {
|
||||
for _, v := range fields {
|
||||
kv := strings.SplitN(v, KvSeparator, 2)
|
||||
if len(kv) == 0 {
|
||||
continue
|
||||
@ -604,12 +652,6 @@ func (c Config) SetKVS(s string, defaultKVS map[string]KVS) error {
|
||||
return Errorf("key '%s', cannot have empty value", kv[0])
|
||||
}
|
||||
|
||||
tgt := Default
|
||||
subSys := subSystemValue[0]
|
||||
if len(subSystemValue) == 2 {
|
||||
tgt = subSystemValue[1]
|
||||
}
|
||||
|
||||
_, ok := kvs.Lookup(Enable)
|
||||
// Check if state is required
|
||||
_, enableRequired := defaultKVS[subSys].Lookup(Enable)
|
||||
|
@ -17,7 +17,94 @@
|
||||
|
||||
package config
|
||||
|
||||
import "testing"
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestKVFields(t *testing.T) {
|
||||
tests := []struct {
|
||||
input string
|
||||
keys []string
|
||||
expectedFields map[string]struct{}
|
||||
}{
|
||||
// No keys present
|
||||
{
|
||||
input: "",
|
||||
keys: []string{"comment"},
|
||||
expectedFields: map[string]struct{}{},
|
||||
},
|
||||
// No keys requested for tokenizing
|
||||
{
|
||||
input: `comment="Hi this is my comment ="`,
|
||||
keys: []string{},
|
||||
expectedFields: map[string]struct{}{},
|
||||
},
|
||||
// Single key requested and present
|
||||
{
|
||||
input: `comment="Hi this is my comment ="`,
|
||||
keys: []string{"comment"},
|
||||
expectedFields: map[string]struct{}{`comment="Hi this is my comment ="`: {}},
|
||||
},
|
||||
// Keys and input order of k=v is same.
|
||||
{
|
||||
input: `connection_string="host=localhost port=2832" comment="really long comment"`,
|
||||
keys: []string{"connection_string", "comment"},
|
||||
expectedFields: map[string]struct{}{
|
||||
`connection_string="host=localhost port=2832"`: {},
|
||||
`comment="really long comment"`: {},
|
||||
},
|
||||
},
|
||||
// Keys with spaces in between
|
||||
{
|
||||
input: `enable=on format=namespace connection_string=" host=localhost port=5432 dbname = cesnietor sslmode=disable" table=holicrayoli`,
|
||||
keys: []string{"enable", "connection_string", "comment", "format", "table"},
|
||||
expectedFields: map[string]struct{}{
|
||||
`enable=on`: {},
|
||||
`format=namespace`: {},
|
||||
`connection_string=" host=localhost port=5432 dbname = cesnietor sslmode=disable"`: {},
|
||||
`table=holicrayoli`: {},
|
||||
},
|
||||
},
|
||||
// One of the keys is not present and order of input has changed.
|
||||
{
|
||||
input: `comment="really long comment" connection_string="host=localhost port=2832"`,
|
||||
keys: []string{"connection_string", "comment", "format"},
|
||||
expectedFields: map[string]struct{}{
|
||||
`connection_string="host=localhost port=2832"`: {},
|
||||
`comment="really long comment"`: {},
|
||||
},
|
||||
},
|
||||
// Incorrect delimiter, expected fields should be empty.
|
||||
{
|
||||
input: `comment:"really long comment" connection_string:"host=localhost port=2832"`,
|
||||
keys: []string{"connection_string", "comment"},
|
||||
expectedFields: map[string]struct{}{},
|
||||
},
|
||||
// Incorrect type of input v/s required keys.
|
||||
{
|
||||
input: `comme="really long comment" connection_str="host=localhost port=2832"`,
|
||||
keys: []string{"connection_string", "comment"},
|
||||
expectedFields: map[string]struct{}{},
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
test := test
|
||||
t.Run("", func(t *testing.T) {
|
||||
gotFields := kvFields(test.input, test.keys)
|
||||
if len(gotFields) != len(test.expectedFields) {
|
||||
t.Errorf("Expected keys %d, found %d", len(test.expectedFields), len(gotFields))
|
||||
}
|
||||
found := true
|
||||
for _, field := range gotFields {
|
||||
_, ok := test.expectedFields[field]
|
||||
found = found && ok
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("Expected %s, got %s", test.expectedFields, gotFields)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidRegion(t *testing.T) {
|
||||
tests := []struct {
|
||||
|
@ -297,7 +297,7 @@ var (
|
||||
HelpPostgres = config.HelpKVS{
|
||||
config.HelpKV{
|
||||
Key: target.PostgresConnectionString,
|
||||
Description: "Postgres server connection-string",
|
||||
Description: `Postgres server connection-string e.g. "host=localhost port=5432 dbname=minio_events user=postgres password=password sslmode=disable"`,
|
||||
Type: "string",
|
||||
},
|
||||
config.HelpKV{
|
||||
@ -310,36 +310,6 @@ var (
|
||||
Description: formatComment,
|
||||
Type: "namespace*|access",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: target.PostgresHost,
|
||||
Description: "Postgres server hostname (used only if `connection_string` is empty)",
|
||||
Optional: true,
|
||||
Type: "hostname",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: target.PostgresPort,
|
||||
Description: "Postgres server port, defaults to `5432` (used only if `connection_string` is empty)",
|
||||
Optional: true,
|
||||
Type: "port",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: target.PostgresUsername,
|
||||
Description: "database username (used only if `connection_string` is empty)",
|
||||
Optional: true,
|
||||
Type: "string",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: target.PostgresPassword,
|
||||
Description: "database password (used only if `connection_string` is empty)",
|
||||
Optional: true,
|
||||
Type: "string",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: target.PostgresDatabase,
|
||||
Description: "database name (used only if `connection_string` is empty)",
|
||||
Optional: true,
|
||||
Type: "string",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: target.PostgresQueueDir,
|
||||
Description: queueDirComment,
|
||||
@ -363,7 +333,7 @@ var (
|
||||
HelpMySQL = config.HelpKVS{
|
||||
config.HelpKV{
|
||||
Key: target.MySQLDSNString,
|
||||
Description: "MySQL data-source-name connection string",
|
||||
Description: `MySQL data-source-name connection string e.g. "<user>:<password>@tcp(<host>:<port>)/<database>"`,
|
||||
Optional: true,
|
||||
Type: "string",
|
||||
},
|
||||
@ -377,36 +347,6 @@ var (
|
||||
Description: formatComment,
|
||||
Type: "namespace*|access",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: target.MySQLHost,
|
||||
Description: "MySQL server hostname (used only if `dsn_string` is empty)",
|
||||
Optional: true,
|
||||
Type: "hostname",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: target.MySQLPort,
|
||||
Description: "MySQL server port (used only if `dsn_string` is empty)",
|
||||
Optional: true,
|
||||
Type: "port",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: target.MySQLUsername,
|
||||
Description: "database username (used only if `dsn_string` is empty)",
|
||||
Optional: true,
|
||||
Type: "string",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: target.MySQLPassword,
|
||||
Description: "database password (used only if `dsn_string` is empty)",
|
||||
Optional: true,
|
||||
Type: "string",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: target.MySQLDatabase,
|
||||
Description: "database name (used only if `dsn_string` is empty)",
|
||||
Optional: true,
|
||||
Type: "string",
|
||||
},
|
||||
config.HelpKV{
|
||||
Key: target.MySQLQueueDir,
|
||||
Description: queueDirComment,
|
||||
|
@ -306,8 +306,10 @@ func checkValidNotificationKeys(cfg config.Config) error {
|
||||
if tname != config.Default {
|
||||
subSysTarget = subSys + config.SubSystemSeparator + tname
|
||||
}
|
||||
if err := config.CheckValidKeys(subSysTarget, kv, validKVS); err != nil {
|
||||
return err
|
||||
if v, ok := kv.Lookup(config.Enable); ok && v == config.EnableOn {
|
||||
if err := config.CheckValidKeys(subSysTarget, kv, validKVS); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -696,26 +698,6 @@ var (
|
||||
Key: target.MySQLFormat,
|
||||
Value: formatNamespace,
|
||||
},
|
||||
config.KV{
|
||||
Key: target.MySQLHost,
|
||||
Value: "",
|
||||
},
|
||||
config.KV{
|
||||
Key: target.MySQLPort,
|
||||
Value: "",
|
||||
},
|
||||
config.KV{
|
||||
Key: target.MySQLUsername,
|
||||
Value: "",
|
||||
},
|
||||
config.KV{
|
||||
Key: target.MySQLPassword,
|
||||
Value: "",
|
||||
},
|
||||
config.KV{
|
||||
Key: target.MySQLDatabase,
|
||||
Value: "",
|
||||
},
|
||||
config.KV{
|
||||
Key: target.MySQLDSNString,
|
||||
Value: "",
|
||||
@ -752,16 +734,6 @@ func GetNotifyMySQL(mysqlKVS map[string]config.KVS) (map[string]target.MySQLArgs
|
||||
continue
|
||||
}
|
||||
|
||||
hostEnv := target.EnvMySQLHost
|
||||
if k != config.Default {
|
||||
hostEnv = hostEnv + config.Default + k
|
||||
}
|
||||
|
||||
host, err := xnet.ParseURL(env.Get(hostEnv, kv.Get(target.MySQLHost)))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
queueLimitEnv := target.EnvMySQLQueueLimit
|
||||
if k != config.Default {
|
||||
queueLimitEnv = queueLimitEnv + config.Default + k
|
||||
@ -775,30 +747,17 @@ func GetNotifyMySQL(mysqlKVS map[string]config.KVS) (map[string]target.MySQLArgs
|
||||
if k != config.Default {
|
||||
formatEnv = formatEnv + config.Default + k
|
||||
}
|
||||
|
||||
dsnStringEnv := target.EnvMySQLDSNString
|
||||
if k != config.Default {
|
||||
dsnStringEnv = dsnStringEnv + config.Default + k
|
||||
}
|
||||
|
||||
tableEnv := target.EnvMySQLTable
|
||||
if k != config.Default {
|
||||
tableEnv = tableEnv + config.Default + k
|
||||
}
|
||||
portEnv := target.EnvMySQLPort
|
||||
if k != config.Default {
|
||||
portEnv = portEnv + config.Default + k
|
||||
}
|
||||
usernameEnv := target.EnvMySQLUsername
|
||||
if k != config.Default {
|
||||
usernameEnv = usernameEnv + config.Default + k
|
||||
}
|
||||
passwordEnv := target.EnvMySQLPassword
|
||||
if k != config.Default {
|
||||
passwordEnv = passwordEnv + config.Default + k
|
||||
}
|
||||
databaseEnv := target.EnvMySQLDatabase
|
||||
if k != config.Default {
|
||||
databaseEnv = databaseEnv + config.Default + k
|
||||
}
|
||||
|
||||
queueDirEnv := target.EnvMySQLQueueDir
|
||||
if k != config.Default {
|
||||
queueDirEnv = queueDirEnv + config.Default + k
|
||||
@ -808,11 +767,6 @@ func GetNotifyMySQL(mysqlKVS map[string]config.KVS) (map[string]target.MySQLArgs
|
||||
Format: env.Get(formatEnv, kv.Get(target.MySQLFormat)),
|
||||
DSN: env.Get(dsnStringEnv, kv.Get(target.MySQLDSNString)),
|
||||
Table: env.Get(tableEnv, kv.Get(target.MySQLTable)),
|
||||
Host: *host,
|
||||
Port: env.Get(portEnv, kv.Get(target.MySQLPort)),
|
||||
User: env.Get(usernameEnv, kv.Get(target.MySQLUsername)),
|
||||
Password: env.Get(passwordEnv, kv.Get(target.MySQLPassword)),
|
||||
Database: env.Get(databaseEnv, kv.Get(target.MySQLDatabase)),
|
||||
QueueDir: env.Get(queueDirEnv, kv.Get(target.MySQLQueueDir)),
|
||||
QueueLimit: queueLimit,
|
||||
}
|
||||
@ -1180,26 +1134,6 @@ var (
|
||||
Key: target.PostgresTable,
|
||||
Value: "",
|
||||
},
|
||||
config.KV{
|
||||
Key: target.PostgresHost,
|
||||
Value: "",
|
||||
},
|
||||
config.KV{
|
||||
Key: target.PostgresPort,
|
||||
Value: "",
|
||||
},
|
||||
config.KV{
|
||||
Key: target.PostgresUsername,
|
||||
Value: "",
|
||||
},
|
||||
config.KV{
|
||||
Key: target.PostgresPassword,
|
||||
Value: "",
|
||||
},
|
||||
config.KV{
|
||||
Key: target.PostgresDatabase,
|
||||
Value: "",
|
||||
},
|
||||
config.KV{
|
||||
Key: target.PostgresQueueDir,
|
||||
Value: "",
|
||||
@ -1228,16 +1162,6 @@ func GetNotifyPostgres(postgresKVS map[string]config.KVS) (map[string]target.Pos
|
||||
continue
|
||||
}
|
||||
|
||||
hostEnv := target.EnvPostgresHost
|
||||
if k != config.Default {
|
||||
hostEnv = hostEnv + config.Default + k
|
||||
}
|
||||
|
||||
host, err := xnet.ParseHost(env.Get(hostEnv, kv.Get(target.PostgresHost)))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
queueLimitEnv := target.EnvPostgresQueueLimit
|
||||
if k != config.Default {
|
||||
queueLimitEnv = queueLimitEnv + config.Default + k
|
||||
@ -1263,26 +1187,6 @@ func GetNotifyPostgres(postgresKVS map[string]config.KVS) (map[string]target.Pos
|
||||
tableEnv = tableEnv + config.Default + k
|
||||
}
|
||||
|
||||
portEnv := target.EnvPostgresPort
|
||||
if k != config.Default {
|
||||
portEnv = portEnv + config.Default + k
|
||||
}
|
||||
|
||||
usernameEnv := target.EnvPostgresUsername
|
||||
if k != config.Default {
|
||||
usernameEnv = usernameEnv + config.Default + k
|
||||
}
|
||||
|
||||
passwordEnv := target.EnvPostgresPassword
|
||||
if k != config.Default {
|
||||
passwordEnv = passwordEnv + config.Default + k
|
||||
}
|
||||
|
||||
databaseEnv := target.EnvPostgresDatabase
|
||||
if k != config.Default {
|
||||
databaseEnv = databaseEnv + config.Default + k
|
||||
}
|
||||
|
||||
queueDirEnv := target.EnvPostgresQueueDir
|
||||
if k != config.Default {
|
||||
queueDirEnv = queueDirEnv + config.Default + k
|
||||
@ -1293,11 +1197,6 @@ func GetNotifyPostgres(postgresKVS map[string]config.KVS) (map[string]target.Pos
|
||||
Format: env.Get(formatEnv, kv.Get(target.PostgresFormat)),
|
||||
ConnectionString: env.Get(connectionStringEnv, kv.Get(target.PostgresConnectionString)),
|
||||
Table: env.Get(tableEnv, kv.Get(target.PostgresTable)),
|
||||
Host: *host,
|
||||
Port: env.Get(portEnv, kv.Get(target.PostgresPort)),
|
||||
User: env.Get(usernameEnv, kv.Get(target.PostgresUsername)),
|
||||
Password: env.Get(passwordEnv, kv.Get(target.PostgresPassword)),
|
||||
Database: env.Get(databaseEnv, kv.Get(target.PostgresDatabase)),
|
||||
QueueDir: env.Get(queueDirEnv, kv.Get(target.PostgresQueueDir)),
|
||||
QueueLimit: uint64(queueLimit),
|
||||
}
|
||||
|
@ -38,9 +38,10 @@ notify_elasticsearch publish bucket notifications to Elasticsearch endpoints
|
||||
notify_redis publish bucket notifications to Redis datastores
|
||||
```
|
||||
|
||||
> NOTE: '\*' at the end of arg means its mandatory.
|
||||
> NOTE: '\*' at the end of the values, means its the default value for the arg.
|
||||
> NOTE: When configured using environment variables, the `:name` can be specified using this format `MINIO_NOTIFY_WEBHOOK_ENABLE_<name>`.
|
||||
> NOTE:
|
||||
> - '\*' at the end of arg means its mandatory.
|
||||
> - '\*' at the end of the values, means its the default value for the arg.
|
||||
> - When configured using environment variables, the `:name` can be specified using this format `MINIO_NOTIFY_WEBHOOK_ENABLE_<name>`.
|
||||
|
||||
<a name="AMQP"></a>
|
||||
|
||||
@ -829,14 +830,9 @@ KEY:
|
||||
notify_postgres[:name] publish bucket notifications to Postgres databases
|
||||
|
||||
ARGS:
|
||||
connection_string* (string) Postgres server connection-string
|
||||
connection_string* (string) Postgres server connection-string e.g. "host=localhost port=5432 dbname=minio_events user=postgres password=password sslmode=disable"
|
||||
table* (string) DB table name to store/update events, table is auto-created
|
||||
format* (namespace*|access) 'namespace' reflects current bucket/object list and 'access' reflects a journal of object operations, defaults to 'namespace'
|
||||
host (hostname) Postgres server hostname (used only if `connection_string` is empty)
|
||||
port (port) Postgres server port, defaults to `5432` (used only if `connection_string` is empty)
|
||||
username (string) database username (used only if `connection_string` is empty)
|
||||
password (string) database password (used only if `connection_string` is empty)
|
||||
database (string) database name (used only if `connection_string` is empty)
|
||||
queue_dir (path) staging dir for undelivered messages e.g. '/home/events'
|
||||
queue_limit (number) maximum limit for undelivered messages, defaults to '10000'
|
||||
comment (sentence) optionally add a comment to this setting
|
||||
@ -849,14 +845,9 @@ notify_postgres[:name] publish bucket notifications to Postgres databases
|
||||
|
||||
ARGS:
|
||||
MINIO_NOTIFY_POSTGRES_ENABLE* (on|off) enable notify_postgres target, default is 'off'
|
||||
MINIO_NOTIFY_POSTGRES_CONNECTION_STRING* (string) Postgres server connection-string
|
||||
MINIO_NOTIFY_POSTGRES_CONNECTION_STRING* (string) Postgres server connection-string e.g. "host=localhost port=5432 dbname=minio_events user=postgres password=password sslmode=disable"
|
||||
MINIO_NOTIFY_POSTGRES_TABLE* (string) DB table name to store/update events, table is auto-created
|
||||
MINIO_NOTIFY_POSTGRES_FORMAT* (namespace*|access) 'namespace' reflects current bucket/object list and 'access' reflects a journal of object operations, defaults to 'namespace'
|
||||
MINIO_NOTIFY_POSTGRES_HOST (hostname) Postgres server hostname (used only if `connection_string` is empty)
|
||||
MINIO_NOTIFY_POSTGRES_PORT (port) Postgres server port, defaults to `5432` (used only if `connection_string` is empty)
|
||||
MINIO_NOTIFY_POSTGRES_USERNAME (string) database username (used only if `connection_string` is empty)
|
||||
MINIO_NOTIFY_POSTGRES_PASSWORD (string) database password (used only if `connection_string` is empty)
|
||||
MINIO_NOTIFY_POSTGRES_DATABASE (string) database name (used only if `connection_string` is empty)
|
||||
MINIO_NOTIFY_POSTGRES_QUEUE_DIR (path) staging dir for undelivered messages e.g. '/home/events'
|
||||
MINIO_NOTIFY_POSTGRES_QUEUE_LIMIT (number) maximum limit for undelivered messages, defaults to '10000'
|
||||
MINIO_NOTIFY_POSTGRES_COMMENT (sentence) optionally add a comment to this setting
|
||||
@ -869,13 +860,13 @@ To update the configuration, use `mc admin config get` command to get the curren
|
||||
|
||||
```sh
|
||||
$ mc admin config get myminio notify_postgres
|
||||
notify_postgres:1 password="" port="" queue_dir="" connection_string="" host="" queue_limit="0" table="" username="" database="" format="namespace"
|
||||
notify_postgres:1 queue_dir="" connection_string="" queue_limit="0" table="" format="namespace"
|
||||
```
|
||||
|
||||
Use `mc admin config set` command to update the configuration for the deployment. Restart the MinIO server to put the changes into effect. The server will print a line like `SQS ARNs: arn:minio:sqs::1:postgresql` at start-up if there were no errors.
|
||||
|
||||
```sh
|
||||
$ mc admin config set myminio notify_postgres:1 password="password" port="5432" queue_dir="" connection_string="sslmode=disable" host="127.0.0.1" queue_limit="0" table="bucketevents" username="postgres" database="minio_events" format="namespace"
|
||||
$ mc admin config set myminio notify_postgres:1 connection_string="host=localhost port=5432 dbname=minio_events user=postgres password=password sslmode=disable" table="bucketevents" format="namespace"
|
||||
```
|
||||
|
||||
Note that, you can add as many PostgreSQL server endpoint configurations as needed by providing an identifier (like "1" in the example above) for the PostgreSQL instance and an object of per-server configuration parameters.
|
||||
@ -946,14 +937,9 @@ KEY:
|
||||
notify_mysql[:name] publish bucket notifications to MySQL databases. When multiple MySQL server endpoints are needed, a user specified "name" can be added for each configuration, (e.g."notify_mysql:myinstance").
|
||||
|
||||
ARGS:
|
||||
dsn_string* (string) MySQL data-source-name connection string
|
||||
dsn_string* (string) MySQL data-source-name connection string e.g. "<user>:<password>@tcp(<host>:<port>)/<database>"
|
||||
table* (string) DB table name to store/update events, table is auto-created
|
||||
format* (namespace*|access) 'namespace' reflects current bucket/object list and 'access' reflects a journal of object operations, defaults to 'namespace'
|
||||
host (hostname) MySQL server hostname (used only if `dsn_string` is empty)
|
||||
port (port) MySQL server port (used only if `dsn_string` is empty)
|
||||
username (string) database username (used only if `dsn_string` is empty)
|
||||
password (string) database password (used only if `dsn_string` is empty)
|
||||
database (string) database name (used only if `dsn_string` is empty)
|
||||
queue_dir (path) staging dir for undelivered messages e.g. '/home/events'
|
||||
queue_limit (number) maximum limit for undelivered messages, defaults to '10000'
|
||||
comment (sentence) optionally add a comment to this setting
|
||||
@ -966,20 +952,15 @@ notify_mysql[:name] publish bucket notifications to MySQL databases
|
||||
|
||||
ARGS:
|
||||
MINIO_NOTIFY_MYSQL_ENABLE* (on|off) enable notify_mysql target, default is 'off'
|
||||
MINIO_NOTIFY_MYSQL_DSN_STRING* (string) MySQL data-source-name connection string
|
||||
MINIO_NOTIFY_MYSQL_DSN_STRING* (string) MySQL data-source-name connection string e.g. "<user>:<password>@tcp(<host>:<port>)/<database>"
|
||||
MINIO_NOTIFY_MYSQL_TABLE* (string) DB table name to store/update events, table is auto-created
|
||||
MINIO_NOTIFY_MYSQL_FORMAT* (namespace*|access) 'namespace' reflects current bucket/object list and 'access' reflects a journal of object operations, defaults to 'namespace'
|
||||
MINIO_NOTIFY_MYSQL_HOST (hostname) MySQL server hostname (used only if `dsn_string` is empty)
|
||||
MINIO_NOTIFY_MYSQL_PORT (port) MySQL server port (used only if `dsn_string` is empty)
|
||||
MINIO_NOTIFY_MYSQL_USERNAME (string) database username (used only if `dsn_string` is empty)
|
||||
MINIO_NOTIFY_MYSQL_PASSWORD (string) database password (used only if `dsn_string` is empty)
|
||||
MINIO_NOTIFY_MYSQL_DATABASE (string) database name (used only if `dsn_string` is empty)
|
||||
MINIO_NOTIFY_MYSQL_QUEUE_DIR (path) staging dir for undelivered messages e.g. '/home/events'
|
||||
MINIO_NOTIFY_MYSQL_QUEUE_LIMIT (number) maximum limit for undelivered messages, defaults to '10000'
|
||||
MINIO_NOTIFY_MYSQL_COMMENT (sentence) optionally add a comment to this setting
|
||||
```
|
||||
|
||||
`dsn_string` is optional. If not specified, the connection information specified by the `user`, `password`, `host`, `port`, and `database` parameters are used. `dsn_string` is formed as `"<user>:<password>@tcp(<host>:<port>)/<database>"`
|
||||
`dsn_string` is required and is of form `"<user>:<password>@tcp(<host>:<port>)/<database>"`
|
||||
|
||||
MinIO supports persistent event store. The persistent store will backup events if MySQL connection goes offline and then replays the stored events when the broken connection comes back up. The event store can be configured by setting a directory path in `queue_dir` field, and the maximum number of events, which can be stored in a `queue_dir`, in `queue_limit` field. For example, `queue_dir` can be set to `/home/events` and `queue_limit` can be set to `1000`. By default, the `queue_limit` is set to `10000`.
|
||||
|
||||
@ -987,15 +968,11 @@ Before updating the configuration, let's start with `mc admin config get` comman
|
||||
|
||||
```sh
|
||||
$ mc admin config get myminio/ notify_mysql
|
||||
notify_mysql:myinstance enable=off format=namespace host= port= username= password= database= dsn_string= table= queue_dir= queue_limit=0
|
||||
notify_mysql:myinstance enable=off format=namespace host= port= username= password= database= dsn_string= table= queue_dir= queue_limit=0
|
||||
```
|
||||
|
||||
Use `mc admin config set` command to update MySQL notification configuration for the deployment.
|
||||
Use `mc admin config set` command to update MySQL notification configuration for the deployment with `dsn_string` parameter:
|
||||
|
||||
```sh
|
||||
$ mc admin config set myminio notify_mysql:myinstance table="minio_images" username="root" password="xxxx" host="172.17.0.1" port="3306" database="miniodb"
|
||||
```
|
||||
or with `dsn_string` parameter;
|
||||
```sh
|
||||
$ mc admin config set myminio notify_mysql:myinstance table="minio_images" dsn_string="root:xxxx@tcp(172.17.0.1:3306)/miniodb"
|
||||
```
|
||||
|
@ -78,7 +78,7 @@ func (adm *AdminClient) SetConfigKV(ctx context.Context, kv string) (err error)
|
||||
}
|
||||
|
||||
// GetConfigKV - returns the key, value of the requested key, incoming data is encrypted.
|
||||
func (adm *AdminClient) GetConfigKV(ctx context.Context, key string) (Targets, error) {
|
||||
func (adm *AdminClient) GetConfigKV(ctx context.Context, key string) ([]byte, error) {
|
||||
v := url.Values{}
|
||||
v.Set("key", key)
|
||||
|
||||
@ -100,10 +100,5 @@ func (adm *AdminClient) GetConfigKV(ctx context.Context, key string) (Targets, e
|
||||
return nil, httpRespToErrorResponse(resp)
|
||||
}
|
||||
|
||||
data, err := DecryptData(adm.getSecretKey(), resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ParseSubSysTarget(data)
|
||||
return DecryptData(adm.getSecretKey(), resp.Body)
|
||||
}
|
||||
|
@ -18,73 +18,10 @@
|
||||
package madmin
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strings"
|
||||
"unicode"
|
||||
)
|
||||
|
||||
// KV - is a shorthand of each key value.
|
||||
type KV struct {
|
||||
Key string `json:"key"`
|
||||
Value string `json:"value"`
|
||||
}
|
||||
|
||||
// KVS - is a shorthand for some wrapper functions
|
||||
// to operate on list of key values.
|
||||
type KVS []KV
|
||||
|
||||
// Empty - return if kv is empty
|
||||
func (kvs KVS) Empty() bool {
|
||||
return len(kvs) == 0
|
||||
}
|
||||
|
||||
// Set sets a value, if not sets a default value.
|
||||
func (kvs *KVS) Set(key, value string) {
|
||||
for i, kv := range *kvs {
|
||||
if kv.Key == key {
|
||||
(*kvs)[i] = KV{
|
||||
Key: key,
|
||||
Value: value,
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
*kvs = append(*kvs, KV{
|
||||
Key: key,
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
|
||||
// Get - returns the value of a key, if not found returns empty.
|
||||
func (kvs KVS) Get(key string) string {
|
||||
v, ok := kvs.Lookup(key)
|
||||
if ok {
|
||||
return v
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// Lookup - lookup a key in a list of KVS
|
||||
func (kvs KVS) Lookup(key string) (string, bool) {
|
||||
for _, kv := range kvs {
|
||||
if kv.Key == key {
|
||||
return kv.Value, true
|
||||
}
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
// Target signifies an individual target
|
||||
type Target struct {
|
||||
SubSystem string `json:"subSys"`
|
||||
KVS KVS `json:"kvs"`
|
||||
}
|
||||
|
||||
// Targets sub-system targets
|
||||
type Targets []Target
|
||||
|
||||
// Standard config keys and values.
|
||||
const (
|
||||
EnableKey = "enable"
|
||||
@ -95,36 +32,6 @@ const (
|
||||
EnableOff = "off"
|
||||
)
|
||||
|
||||
func (kvs KVS) String() string {
|
||||
var s strings.Builder
|
||||
for _, kv := range kvs {
|
||||
// Do not need to print state which is on.
|
||||
if kv.Key == EnableKey && kv.Value == EnableOn {
|
||||
continue
|
||||
}
|
||||
if kv.Key == CommentKey && kv.Value == "" {
|
||||
continue
|
||||
}
|
||||
s.WriteString(kv.Key)
|
||||
s.WriteString(KvSeparator)
|
||||
spc := HasSpace(kv.Value)
|
||||
if spc {
|
||||
s.WriteString(KvDoubleQuote)
|
||||
}
|
||||
s.WriteString(kv.Value)
|
||||
if spc {
|
||||
s.WriteString(KvDoubleQuote)
|
||||
}
|
||||
s.WriteString(KvSpaceSeparator)
|
||||
}
|
||||
return s.String()
|
||||
}
|
||||
|
||||
// Count - returns total numbers of target
|
||||
func (t Targets) Count() int {
|
||||
return len(t)
|
||||
}
|
||||
|
||||
// HasSpace - returns if given string has space.
|
||||
func HasSpace(s string) bool {
|
||||
for _, r := range s {
|
||||
@ -135,23 +42,6 @@ func HasSpace(s string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (t Targets) String() string {
|
||||
var s strings.Builder
|
||||
count := t.Count()
|
||||
// Print all "on" states entries
|
||||
for _, targetKV := range t {
|
||||
kv := targetKV.KVS
|
||||
count--
|
||||
s.WriteString(targetKV.SubSystem)
|
||||
s.WriteString(KvSpaceSeparator)
|
||||
s.WriteString(kv.String())
|
||||
if len(t) > 1 && count > 0 {
|
||||
s.WriteString(KvNewline)
|
||||
}
|
||||
}
|
||||
return s.String()
|
||||
}
|
||||
|
||||
// Constant separators
|
||||
const (
|
||||
SubSystemSeparator = `:`
|
||||
@ -170,69 +60,3 @@ func SanitizeValue(v string) string {
|
||||
v = strings.TrimSuffix(strings.TrimPrefix(strings.TrimSpace(v), KvDoubleQuote), KvDoubleQuote)
|
||||
return strings.TrimSuffix(strings.TrimPrefix(v, KvSingleQuote), KvSingleQuote)
|
||||
}
|
||||
|
||||
// AddTarget - adds new targets, by parsing the input string s.
|
||||
func (t *Targets) AddTarget(s string) error {
|
||||
inputs := strings.SplitN(s, KvSpaceSeparator, 2)
|
||||
if len(inputs) <= 1 {
|
||||
return fmt.Errorf("invalid number of arguments '%s'", s)
|
||||
}
|
||||
|
||||
subSystemValue := strings.SplitN(inputs[0], SubSystemSeparator, 2)
|
||||
if len(subSystemValue) == 0 {
|
||||
return fmt.Errorf("invalid number of arguments %s", s)
|
||||
}
|
||||
|
||||
var kvs = KVS{}
|
||||
var prevK string
|
||||
for _, v := range strings.Fields(inputs[1]) {
|
||||
kv := strings.SplitN(v, KvSeparator, 2)
|
||||
if len(kv) == 0 {
|
||||
continue
|
||||
}
|
||||
if len(kv) == 1 && prevK != "" {
|
||||
value := strings.Join([]string{
|
||||
kvs.Get(prevK),
|
||||
SanitizeValue(kv[0]),
|
||||
}, KvSpaceSeparator)
|
||||
kvs.Set(prevK, value)
|
||||
continue
|
||||
}
|
||||
if len(kv) == 2 {
|
||||
prevK = kv[0]
|
||||
kvs.Set(prevK, SanitizeValue(kv[1]))
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("value for key '%s' cannot be empty", kv[0])
|
||||
}
|
||||
|
||||
for i := range *t {
|
||||
if (*t)[i].SubSystem == inputs[0] {
|
||||
(*t)[i] = Target{
|
||||
SubSystem: inputs[0],
|
||||
KVS: kvs,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
*t = append(*t, Target{
|
||||
SubSystem: inputs[0],
|
||||
KVS: kvs,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// ParseSubSysTarget - parse sub-system target
|
||||
func ParseSubSysTarget(buf []byte) (Targets, error) {
|
||||
var targets Targets
|
||||
bio := bufio.NewScanner(bytes.NewReader(buf))
|
||||
for bio.Scan() {
|
||||
if err := targets.AddTarget(bio.Text()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if err := bio.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return targets, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user