mirror of
https://github.com/minio/minio.git
synced 2024-12-25 22:55:54 -05:00
a099319e66
* Add configuration parameter "format" for db targets and perform configuration migration. * Add PostgreSQL `access` format: This causes Minio to append all events to the configured table. Prefix, suffix and event filters continue to be supported for this mode too. * Update documentation for PostgreSQL notification target. * Add MySQL `access` format: It is very similar to the same format for PostgreSQL. * Update MySQL notification documentation.
343 lines
8.9 KiB
Go
343 lines
8.9 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2017 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
// MySQL Notifier implementation. Two formats, "namespace" and
|
|
// "access" are supported.
|
|
//
|
|
// * Namespace format
|
|
//
|
|
// On each create or update object event in Minio Object storage
|
|
// server, a row is created or updated in the table in MySQL. On each
|
|
// object removal, the corresponding row is deleted from the table.
|
|
//
|
|
// A table with a specific structure (column names, column types, and
|
|
// primary key/uniqueness constraint) is used. The user may set the
|
|
// table name in the configuration. A sample SQL command that creates
|
|
// a command with the required structure is:
|
|
//
|
|
// CREATE TABLE myminio (
|
|
// key_name VARCHAR(2048),
|
|
// value JSONB,
|
|
// PRIMARY KEY (key_name),
|
|
// );
|
|
//
|
|
// MySQL's "INSERT ... ON DUPLICATE ..." feature (UPSERT) is used
|
|
// here. The implementation has been tested with MySQL Ver 14.14
|
|
// Distrib 5.7.17.
|
|
//
|
|
// * Access format
|
|
//
|
|
// On each event, a row is appended to the configured table. There is
|
|
// no deletion or modification of existing rows.
|
|
//
|
|
// A different table schema is used for this format. A sample SQL
|
|
// commant that creates a table with the required structure is:
|
|
//
|
|
// CREATE TABLE myminio (
|
|
// event_time TIMESTAMP WITH TIME ZONE NOT NULL,
|
|
// event_data JSONB
|
|
// );
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"time"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/go-sql-driver/mysql"
|
|
)
|
|
|
|
const (
|
|
// Queries for format=namespace mode.
|
|
upsertRowForNSMySQL = `INSERT INTO %s (key_name, value)
|
|
VALUES (?, ?)
|
|
ON DUPLICATE KEY UPDATE value=VALUES(value);
|
|
`
|
|
deleteRowForNSMySQL = ` DELETE FROM %s
|
|
WHERE key_name = ?;`
|
|
createTableForNSMySQL = `CREATE TABLE %s (
|
|
key_name VARCHAR(2048),
|
|
value JSON,
|
|
PRIMARY KEY (key_name)
|
|
);`
|
|
|
|
// Queries for format=access mode.
|
|
insertRowForAccessMySQL = `INSERT INTO %s (event_time, event_data)
|
|
VALUES (?, ?);`
|
|
createTableForAccessMySQL = `CREATE TABLE %s (
|
|
event_time DATETIME NOT NULL,
|
|
event_data JSON
|
|
);`
|
|
|
|
// Query to check if a table already exists.
|
|
tableExistsMySQL = `SELECT 1 FROM %s;`
|
|
)
|
|
|
|
func makeMySQLError(msg string, a ...interface{}) error {
|
|
s := fmt.Sprintf(msg, a...)
|
|
return fmt.Errorf("MySQL Notifier Error: %s", s)
|
|
}
|
|
|
|
var (
|
|
myNFormatError = makeMySQLError(`"format" value is invalid - it must be one of "%s" or "%s".`, formatNamespace, formatAccess)
|
|
myNTableError = makeMySQLError("Table was not specified in the configuration.")
|
|
)
|
|
|
|
type mySQLNotify struct {
|
|
Enable bool `json:"enable"`
|
|
|
|
Format string `json:"format"`
|
|
|
|
// pass data-source-name connection string in config
|
|
// directly. This string is formatted according to
|
|
// https://github.com/go-sql-driver/mysql#dsn-data-source-name
|
|
DsnString string `json:"dsnString"`
|
|
// specifying a table name is required.
|
|
Table string `json:"table"`
|
|
|
|
// uses the values below if no connection string is specified
|
|
// - however the connection string method offers more
|
|
// flexibility.
|
|
Host string `json:"host"`
|
|
Port string `json:"port"`
|
|
User string `json:"user"`
|
|
Password string `json:"password"`
|
|
Database string `json:"database"`
|
|
}
|
|
|
|
func (m *mySQLNotify) Validate() error {
|
|
if !m.Enable {
|
|
return nil
|
|
}
|
|
if m.Format != formatNamespace && m.Format != formatAccess {
|
|
return myNFormatError
|
|
}
|
|
if m.DsnString == "" {
|
|
if _, err := checkURL(m.Host); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if m.Table == "" {
|
|
return myNTableError
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type mySQLConn struct {
|
|
dsnStr string
|
|
table string
|
|
format string
|
|
preparedStmts map[string]*sql.Stmt
|
|
*sql.DB
|
|
}
|
|
|
|
func dialMySQL(msql mySQLNotify) (mySQLConn, error) {
|
|
if !msql.Enable {
|
|
return mySQLConn{}, errNotifyNotEnabled
|
|
}
|
|
|
|
dsnStr := msql.DsnString
|
|
// check if connection string is specified
|
|
if dsnStr == "" {
|
|
// build from other parameters
|
|
config := mysql.Config{
|
|
User: msql.User,
|
|
Passwd: msql.Password,
|
|
Net: "tcp",
|
|
Addr: msql.Host + ":" + msql.Port,
|
|
DBName: msql.Database,
|
|
}
|
|
dsnStr = config.FormatDSN()
|
|
}
|
|
|
|
db, err := sql.Open("mysql", dsnStr)
|
|
if err != nil {
|
|
return mySQLConn{}, makeMySQLError(
|
|
"Connection opening failure (dsnStr=%s): %v",
|
|
dsnStr, err)
|
|
}
|
|
|
|
// ping to check that server is actually reachable.
|
|
err = db.Ping()
|
|
if err != nil {
|
|
return mySQLConn{}, makeMySQLError(
|
|
"Ping to server failed with: %v", err)
|
|
}
|
|
|
|
// check that table exists - if not, create it.
|
|
_, err = db.Exec(fmt.Sprintf(tableExistsMySQL, msql.Table))
|
|
if err != nil {
|
|
createStmt := createTableForNSMySQL
|
|
if msql.Format == formatAccess {
|
|
createStmt = createTableForAccessMySQL
|
|
}
|
|
|
|
// most likely, table does not exist. try to create it:
|
|
_, errCreate := db.Exec(fmt.Sprintf(createStmt, msql.Table))
|
|
if errCreate != nil {
|
|
// failed to create the table. error out.
|
|
return mySQLConn{}, makeMySQLError(
|
|
"'Select' failed with %v, then 'Create Table' failed with %v",
|
|
err, errCreate,
|
|
)
|
|
}
|
|
}
|
|
|
|
// create prepared statements
|
|
stmts := make(map[string]*sql.Stmt)
|
|
switch msql.Format {
|
|
case formatNamespace:
|
|
// insert or update statement
|
|
stmts["upsertRow"], err = db.Prepare(fmt.Sprintf(upsertRowForNSMySQL,
|
|
msql.Table))
|
|
if err != nil {
|
|
return mySQLConn{},
|
|
makeMySQLError("create UPSERT prepared statement failed with: %v", err)
|
|
}
|
|
// delete statement
|
|
stmts["deleteRow"], err = db.Prepare(fmt.Sprintf(deleteRowForNSMySQL,
|
|
msql.Table))
|
|
if err != nil {
|
|
return mySQLConn{},
|
|
makeMySQLError("create DELETE prepared statement failed with: %v", err)
|
|
}
|
|
case formatAccess:
|
|
// insert statement
|
|
stmts["insertRow"], err = db.Prepare(fmt.Sprintf(insertRowForAccessMySQL,
|
|
msql.Table))
|
|
if err != nil {
|
|
return mySQLConn{}, makeMySQLError(
|
|
"create INSERT prepared statement failed with: %v", err)
|
|
}
|
|
|
|
}
|
|
return mySQLConn{dsnStr, msql.Table, msql.Format, stmts, db}, nil
|
|
}
|
|
|
|
func newMySQLNotify(accountID string) (*logrus.Logger, error) {
|
|
mysqlNotify := serverConfig.Notify.GetMySQLByID(accountID)
|
|
|
|
// Dial mysql
|
|
myC, err := dialMySQL(mysqlNotify)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
mySQLLog := logrus.New()
|
|
|
|
mySQLLog.Out = ioutil.Discard
|
|
|
|
mySQLLog.Formatter = new(logrus.JSONFormatter)
|
|
|
|
mySQLLog.Hooks.Add(myC)
|
|
|
|
return mySQLLog, nil
|
|
}
|
|
|
|
func (myC mySQLConn) Close() {
|
|
// first close all prepared statements
|
|
for _, v := range myC.preparedStmts {
|
|
_ = v.Close()
|
|
}
|
|
// close db connection
|
|
_ = myC.DB.Close()
|
|
}
|
|
|
|
func (myC mySQLConn) Fire(entry *logrus.Entry) error {
|
|
// get event type by trying to convert to string
|
|
entryEventType, ok := entry.Data["EventType"].(string)
|
|
if !ok {
|
|
// ignore event if converting EventType to string
|
|
// fails.
|
|
return nil
|
|
}
|
|
|
|
jsonEncoder := func(d interface{}) ([]byte, error) {
|
|
value, err := json.Marshal(map[string]interface{}{
|
|
"Records": d,
|
|
})
|
|
if err != nil {
|
|
return nil, makeMySQLError(
|
|
"Unable to encode event %v to JSON: %v", d, err)
|
|
}
|
|
return value, nil
|
|
}
|
|
|
|
switch myC.format {
|
|
case formatNamespace:
|
|
// Check for event delete
|
|
if eventMatch(entryEventType, []string{"s3:ObjectRemoved:*"}) {
|
|
// delete row from the table
|
|
_, err := myC.preparedStmts["deleteRow"].Exec(entry.Data["Key"])
|
|
if err != nil {
|
|
return makeMySQLError(
|
|
"Error deleting event with key = %v - got mysql error - %v",
|
|
entry.Data["Key"], err,
|
|
)
|
|
}
|
|
} else {
|
|
value, err := jsonEncoder(entry.Data["Records"])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// upsert row into the table
|
|
_, err = myC.preparedStmts["upsertRow"].Exec(entry.Data["Key"], value)
|
|
if err != nil {
|
|
return makeMySQLError(
|
|
"Unable to upsert event with Key=%v and Value=%v - got mysql error - %v",
|
|
entry.Data["Key"], entry.Data["Records"], err,
|
|
)
|
|
}
|
|
}
|
|
case formatAccess:
|
|
// eventTime is taken from the first entry in the
|
|
// records.
|
|
events, ok := entry.Data["Records"].([]NotificationEvent)
|
|
if !ok {
|
|
return makeMySQLError("unable to extract event time due to conversion error of entry.Data[\"Records\"]=%v", entry.Data["Records"])
|
|
}
|
|
eventTime, err := time.Parse(timeFormatAMZ, events[0].EventTime)
|
|
if err != nil {
|
|
return makeMySQLError("unable to parse event time \"%s\": %v",
|
|
events[0].EventTime, err)
|
|
}
|
|
|
|
value, err := jsonEncodeEventData(entry.Data["Records"])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = myC.preparedStmts["insertRow"].Exec(eventTime, value)
|
|
if err != nil {
|
|
return makeMySQLError("Unable to insert event with value=%v: %v",
|
|
value, err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (myC mySQLConn) Levels() []logrus.Level {
|
|
return []logrus.Level{
|
|
logrus.InfoLevel,
|
|
}
|
|
}
|