mirror of
https://github.com/minio/minio.git
synced 2025-01-02 10:33:21 -05:00
47ca411163
- Avoids code duplication across the other targets. By having a centralized function call. - Reduce the room for race.
275 lines
7.2 KiB
Go
275 lines
7.2 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2018 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
// MySQL Notifier implementation. Two formats, "namespace" and
|
|
// "access" are supported.
|
|
//
|
|
// * Namespace format
|
|
//
|
|
// On each create or update object event in MinIO Object storage
|
|
// server, a row is created or updated in the table in MySQL. On each
|
|
// object removal, the corresponding row is deleted from the table.
|
|
//
|
|
// A table with a specific structure (column names, column types, and
|
|
// primary key/uniqueness constraint) is used. The user may set the
|
|
// table name in the configuration. A sample SQL command that creates
|
|
// a command with the required structure is:
|
|
//
|
|
// CREATE TABLE myminio (
|
|
// key_name VARCHAR(2048),
|
|
// value JSONB,
|
|
// PRIMARY KEY (key_name),
|
|
// );
|
|
//
|
|
// MySQL's "INSERT ... ON DUPLICATE ..." feature (UPSERT) is used
|
|
// here. The implementation has been tested with MySQL Ver 14.14
|
|
// Distrib 5.7.17.
|
|
//
|
|
// * Access format
|
|
//
|
|
// On each event, a row is appended to the configured table. There is
|
|
// no deletion or modification of existing rows.
|
|
//
|
|
// A different table schema is used for this format. A sample SQL
|
|
// commant that creates a table with the required structure is:
|
|
//
|
|
// CREATE TABLE myminio (
|
|
// event_time TIMESTAMP WITH TIME ZONE NOT NULL,
|
|
// event_data JSONB
|
|
// );
|
|
|
|
package target
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/url"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/go-sql-driver/mysql"
|
|
"github.com/minio/minio/pkg/event"
|
|
xnet "github.com/minio/minio/pkg/net"
|
|
)
|
|
|
|
const (
|
|
mysqlTableExists = `SELECT 1 FROM %s;`
|
|
mysqlCreateNamespaceTable = `CREATE TABLE %s (key_name VARCHAR(2048), value JSON, PRIMARY KEY (key_name));`
|
|
mysqlCreateAccessTable = `CREATE TABLE %s (event_time DATETIME NOT NULL, event_data JSON);`
|
|
|
|
mysqlUpdateRow = `INSERT INTO %s (key_name, value) VALUES (?, ?) ON DUPLICATE KEY UPDATE value=VALUES(value);`
|
|
mysqlDeleteRow = `DELETE FROM %s WHERE key_name = ?;`
|
|
mysqlInsertRow = `INSERT INTO %s (event_time, event_data) VALUES (?, ?);`
|
|
)
|
|
|
|
// MySQLArgs - MySQL target arguments.
|
|
type MySQLArgs struct {
|
|
Enable bool `json:"enable"`
|
|
Format string `json:"format"`
|
|
DSN string `json:"dsnString"`
|
|
Table string `json:"table"`
|
|
Host xnet.URL `json:"host"`
|
|
Port string `json:"port"`
|
|
User string `json:"user"`
|
|
Password string `json:"password"`
|
|
Database string `json:"database"`
|
|
}
|
|
|
|
// Validate MySQLArgs fields
|
|
func (m MySQLArgs) Validate() error {
|
|
if !m.Enable {
|
|
return nil
|
|
}
|
|
|
|
if m.Format != "" {
|
|
f := strings.ToLower(m.Format)
|
|
if f != event.NamespaceFormat && f != event.AccessFormat {
|
|
return fmt.Errorf("unrecognized format")
|
|
}
|
|
}
|
|
|
|
if m.Table == "" {
|
|
return fmt.Errorf("table unspecified")
|
|
}
|
|
|
|
if m.DSN != "" {
|
|
if _, err := mysql.ParseDSN(m.DSN); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
// Some fields need to be specified when DSN is unspecified
|
|
if m.Port == "" {
|
|
return fmt.Errorf("unspecified port")
|
|
}
|
|
if _, err := strconv.Atoi(m.Port); err != nil {
|
|
return fmt.Errorf("invalid port")
|
|
}
|
|
if m.Database == "" {
|
|
return fmt.Errorf("database unspecified")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// MySQLTarget - MySQL target.
|
|
type MySQLTarget struct {
|
|
id event.TargetID
|
|
args MySQLArgs
|
|
updateStmt *sql.Stmt
|
|
deleteStmt *sql.Stmt
|
|
insertStmt *sql.Stmt
|
|
db *sql.DB
|
|
}
|
|
|
|
// ID - returns target ID.
|
|
func (target *MySQLTarget) ID() event.TargetID {
|
|
return target.id
|
|
}
|
|
|
|
// Save - Sends event directly without persisting.
|
|
func (target *MySQLTarget) Save(eventData event.Event) error {
|
|
return target.send(eventData)
|
|
}
|
|
|
|
func (target *MySQLTarget) send(eventData event.Event) error {
|
|
if target.args.Format == event.NamespaceFormat {
|
|
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
key := eventData.S3.Bucket.Name + "/" + objectName
|
|
|
|
if eventData.EventName == event.ObjectRemovedDelete {
|
|
_, err = target.deleteStmt.Exec(key)
|
|
} else {
|
|
var data []byte
|
|
if data, err = json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}}); err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = target.updateStmt.Exec(key, data)
|
|
}
|
|
return err
|
|
}
|
|
|
|
if target.args.Format == event.AccessFormat {
|
|
eventTime, err := time.Parse(event.AMZTimeFormat, eventData.EventTime)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
data, err := json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = target.insertStmt.Exec(eventTime, data)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Send - interface compatible method does no-op.
|
|
func (target *MySQLTarget) Send(eventKey string) error {
|
|
return nil
|
|
}
|
|
|
|
// Close - closes underneath connections to MySQL database.
|
|
func (target *MySQLTarget) Close() error {
|
|
if target.updateStmt != nil {
|
|
// FIXME: log returned error. ignore time being.
|
|
_ = target.updateStmt.Close()
|
|
}
|
|
|
|
if target.deleteStmt != nil {
|
|
// FIXME: log returned error. ignore time being.
|
|
_ = target.deleteStmt.Close()
|
|
}
|
|
|
|
if target.insertStmt != nil {
|
|
// FIXME: log returned error. ignore time being.
|
|
_ = target.insertStmt.Close()
|
|
}
|
|
|
|
return target.db.Close()
|
|
}
|
|
|
|
// NewMySQLTarget - creates new MySQL target.
|
|
func NewMySQLTarget(id string, args MySQLArgs) (*MySQLTarget, error) {
|
|
if args.DSN == "" {
|
|
config := mysql.Config{
|
|
User: args.User,
|
|
Passwd: args.Password,
|
|
Net: "tcp",
|
|
Addr: args.Host.String() + ":" + args.Port,
|
|
DBName: args.Database,
|
|
AllowNativePasswords: true,
|
|
}
|
|
|
|
args.DSN = config.FormatDSN()
|
|
}
|
|
|
|
db, err := sql.Open("mysql", args.DSN)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err = db.Ping(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if _, err = db.Exec(fmt.Sprintf(mysqlTableExists, args.Table)); err != nil {
|
|
createStmt := mysqlCreateNamespaceTable
|
|
if args.Format == event.AccessFormat {
|
|
createStmt = mysqlCreateAccessTable
|
|
}
|
|
|
|
if _, err = db.Exec(fmt.Sprintf(createStmt, args.Table)); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
var updateStmt, deleteStmt, insertStmt *sql.Stmt
|
|
switch args.Format {
|
|
case event.NamespaceFormat:
|
|
// insert or update statement
|
|
if updateStmt, err = db.Prepare(fmt.Sprintf(mysqlUpdateRow, args.Table)); err != nil {
|
|
return nil, err
|
|
}
|
|
// delete statement
|
|
if deleteStmt, err = db.Prepare(fmt.Sprintf(mysqlDeleteRow, args.Table)); err != nil {
|
|
return nil, err
|
|
}
|
|
case event.AccessFormat:
|
|
// insert statement
|
|
if insertStmt, err = db.Prepare(fmt.Sprintf(mysqlInsertRow, args.Table)); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return &MySQLTarget{
|
|
id: event.TargetID{ID: id, Name: "mysql"},
|
|
args: args,
|
|
updateStmt: updateStmt,
|
|
deleteStmt: deleteStmt,
|
|
insertStmt: insertStmt,
|
|
db: db,
|
|
}, nil
|
|
}
|