mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
a2a8d54bb6
This change adds `access` format support for notifications to a Elasticsearch server, and it refactors `namespace` format support. In the case of `access` format, for each event in Minio, a JSON document is inserted into Elasticsearch with its timestamp set to the event's timestamp, and with the ID generated automatically by elasticsearch. No events are modified or deleted in this mode. In the case of `namespace` format, for each event in Minio, a JSON document is keyed together by the bucket and object name is updated in Elasticsearch. In the case of an object being created or over-written in Minio, a new document or an existing document is inserted into the Elasticsearch index. If an object is deleted in Minio, the corresponding document is deleted from the Elasticsearch index. Additionally, this change upgrades Elasticsearch support to the 5.x series. This is a breaking change, and users of previous elasticsearch versions should upgrade. Also updates documentation on Elasticsearch notification target usage and has a link to an elasticsearch upgrade guide. This is the last patch that finally resolves #3928.
340 lines
8.8 KiB
Go
340 lines
8.8 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2017 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
// MySQL Notifier implementation. Two formats, "namespace" and
|
|
// "access" are supported.
|
|
//
|
|
// * Namespace format
|
|
//
|
|
// On each create or update object event in Minio Object storage
|
|
// server, a row is created or updated in the table in MySQL. On each
|
|
// object removal, the corresponding row is deleted from the table.
|
|
//
|
|
// A table with a specific structure (column names, column types, and
|
|
// primary key/uniqueness constraint) is used. The user may set the
|
|
// table name in the configuration. A sample SQL command that creates
|
|
// a command with the required structure is:
|
|
//
|
|
// CREATE TABLE myminio (
|
|
// key_name VARCHAR(2048),
|
|
// value JSONB,
|
|
// PRIMARY KEY (key_name),
|
|
// );
|
|
//
|
|
// MySQL's "INSERT ... ON DUPLICATE ..." feature (UPSERT) is used
|
|
// here. The implementation has been tested with MySQL Ver 14.14
|
|
// Distrib 5.7.17.
|
|
//
|
|
// * Access format
|
|
//
|
|
// On each event, a row is appended to the configured table. There is
|
|
// no deletion or modification of existing rows.
|
|
//
|
|
// A different table schema is used for this format. A sample SQL
|
|
// commant that creates a table with the required structure is:
|
|
//
|
|
// CREATE TABLE myminio (
|
|
// event_time TIMESTAMP WITH TIME ZONE NOT NULL,
|
|
// event_data JSONB
|
|
// );
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"time"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/go-sql-driver/mysql"
|
|
)
|
|
|
|
const (
|
|
// Queries for format=namespace mode.
|
|
upsertRowForNSMySQL = `INSERT INTO %s (key_name, value)
|
|
VALUES (?, ?)
|
|
ON DUPLICATE KEY UPDATE value=VALUES(value);
|
|
`
|
|
deleteRowForNSMySQL = ` DELETE FROM %s
|
|
WHERE key_name = ?;`
|
|
createTableForNSMySQL = `CREATE TABLE %s (
|
|
key_name VARCHAR(2048),
|
|
value JSON,
|
|
PRIMARY KEY (key_name)
|
|
);`
|
|
|
|
// Queries for format=access mode.
|
|
insertRowForAccessMySQL = `INSERT INTO %s (event_time, event_data)
|
|
VALUES (?, ?);`
|
|
createTableForAccessMySQL = `CREATE TABLE %s (
|
|
event_time DATETIME NOT NULL,
|
|
event_data JSON
|
|
);`
|
|
|
|
// Query to check if a table already exists.
|
|
tableExistsMySQL = `SELECT 1 FROM %s;`
|
|
)
|
|
|
|
var (
|
|
mysqlErrFunc = newNotificationErrorFactory("MySQL")
|
|
|
|
errMysqlFormat = mysqlErrFunc(`"format" value is invalid - it must be one of "%s" or "%s".`, formatNamespace, formatAccess)
|
|
errMysqlTable = mysqlErrFunc("Table was not specified in the configuration.")
|
|
)
|
|
|
|
type mySQLNotify struct {
|
|
Enable bool `json:"enable"`
|
|
|
|
Format string `json:"format"`
|
|
|
|
// pass data-source-name connection string in config
|
|
// directly. This string is formatted according to
|
|
// https://github.com/go-sql-driver/mysql#dsn-data-source-name
|
|
DsnString string `json:"dsnString"`
|
|
// specifying a table name is required.
|
|
Table string `json:"table"`
|
|
|
|
// uses the values below if no connection string is specified
|
|
// - however the connection string method offers more
|
|
// flexibility.
|
|
Host string `json:"host"`
|
|
Port string `json:"port"`
|
|
User string `json:"user"`
|
|
Password string `json:"password"`
|
|
Database string `json:"database"`
|
|
}
|
|
|
|
func (m *mySQLNotify) Validate() error {
|
|
if !m.Enable {
|
|
return nil
|
|
}
|
|
if m.Format != formatNamespace && m.Format != formatAccess {
|
|
return errMysqlFormat
|
|
}
|
|
if m.DsnString == "" {
|
|
if _, err := checkURL(m.Host); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if m.Table == "" {
|
|
return errMysqlTable
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type mySQLConn struct {
|
|
dsnStr string
|
|
table string
|
|
format string
|
|
preparedStmts map[string]*sql.Stmt
|
|
*sql.DB
|
|
}
|
|
|
|
func dialMySQL(msql mySQLNotify) (mySQLConn, error) {
|
|
if !msql.Enable {
|
|
return mySQLConn{}, errNotifyNotEnabled
|
|
}
|
|
|
|
dsnStr := msql.DsnString
|
|
// check if connection string is specified
|
|
if dsnStr == "" {
|
|
// build from other parameters
|
|
config := mysql.Config{
|
|
User: msql.User,
|
|
Passwd: msql.Password,
|
|
Net: "tcp",
|
|
Addr: msql.Host + ":" + msql.Port,
|
|
DBName: msql.Database,
|
|
}
|
|
dsnStr = config.FormatDSN()
|
|
}
|
|
|
|
db, err := sql.Open("mysql", dsnStr)
|
|
if err != nil {
|
|
return mySQLConn{}, mysqlErrFunc(
|
|
"Connection opening failure (dsnStr=%s): %v",
|
|
dsnStr, err)
|
|
}
|
|
|
|
// ping to check that server is actually reachable.
|
|
err = db.Ping()
|
|
if err != nil {
|
|
return mySQLConn{}, mysqlErrFunc(
|
|
"Ping to server failed with: %v", err)
|
|
}
|
|
|
|
// check that table exists - if not, create it.
|
|
_, err = db.Exec(fmt.Sprintf(tableExistsMySQL, msql.Table))
|
|
if err != nil {
|
|
createStmt := createTableForNSMySQL
|
|
if msql.Format == formatAccess {
|
|
createStmt = createTableForAccessMySQL
|
|
}
|
|
|
|
// most likely, table does not exist. try to create it:
|
|
_, errCreate := db.Exec(fmt.Sprintf(createStmt, msql.Table))
|
|
if errCreate != nil {
|
|
// failed to create the table. error out.
|
|
return mySQLConn{}, mysqlErrFunc(
|
|
"'Select' failed with %v, then 'Create Table' failed with %v",
|
|
err, errCreate,
|
|
)
|
|
}
|
|
}
|
|
|
|
// create prepared statements
|
|
stmts := make(map[string]*sql.Stmt)
|
|
switch msql.Format {
|
|
case formatNamespace:
|
|
// insert or update statement
|
|
stmts["upsertRow"], err = db.Prepare(fmt.Sprintf(upsertRowForNSMySQL,
|
|
msql.Table))
|
|
if err != nil {
|
|
return mySQLConn{},
|
|
mysqlErrFunc("create UPSERT prepared statement failed with: %v", err)
|
|
}
|
|
// delete statement
|
|
stmts["deleteRow"], err = db.Prepare(fmt.Sprintf(deleteRowForNSMySQL,
|
|
msql.Table))
|
|
if err != nil {
|
|
return mySQLConn{},
|
|
mysqlErrFunc("create DELETE prepared statement failed with: %v", err)
|
|
}
|
|
case formatAccess:
|
|
// insert statement
|
|
stmts["insertRow"], err = db.Prepare(fmt.Sprintf(insertRowForAccessMySQL,
|
|
msql.Table))
|
|
if err != nil {
|
|
return mySQLConn{}, mysqlErrFunc(
|
|
"create INSERT prepared statement failed with: %v", err)
|
|
}
|
|
|
|
}
|
|
return mySQLConn{dsnStr, msql.Table, msql.Format, stmts, db}, nil
|
|
}
|
|
|
|
func newMySQLNotify(accountID string) (*logrus.Logger, error) {
|
|
mysqlNotify := serverConfig.Notify.GetMySQLByID(accountID)
|
|
|
|
// Dial mysql
|
|
myC, err := dialMySQL(mysqlNotify)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
mySQLLog := logrus.New()
|
|
|
|
mySQLLog.Out = ioutil.Discard
|
|
|
|
mySQLLog.Formatter = new(logrus.JSONFormatter)
|
|
|
|
mySQLLog.Hooks.Add(myC)
|
|
|
|
return mySQLLog, nil
|
|
}
|
|
|
|
func (myC mySQLConn) Close() {
|
|
// first close all prepared statements
|
|
for _, v := range myC.preparedStmts {
|
|
_ = v.Close()
|
|
}
|
|
// close db connection
|
|
_ = myC.DB.Close()
|
|
}
|
|
|
|
func (myC mySQLConn) Fire(entry *logrus.Entry) error {
|
|
// get event type by trying to convert to string
|
|
entryEventType, ok := entry.Data["EventType"].(string)
|
|
if !ok {
|
|
// ignore event if converting EventType to string
|
|
// fails.
|
|
return nil
|
|
}
|
|
|
|
jsonEncoder := func(d interface{}) ([]byte, error) {
|
|
value, err := json.Marshal(map[string]interface{}{
|
|
"Records": d,
|
|
})
|
|
if err != nil {
|
|
return nil, mysqlErrFunc(
|
|
"Unable to encode event %v to JSON: %v", d, err)
|
|
}
|
|
return value, nil
|
|
}
|
|
|
|
switch myC.format {
|
|
case formatNamespace:
|
|
// Check for event delete
|
|
if eventMatch(entryEventType, []string{"s3:ObjectRemoved:*"}) {
|
|
// delete row from the table
|
|
_, err := myC.preparedStmts["deleteRow"].Exec(entry.Data["Key"])
|
|
if err != nil {
|
|
return mysqlErrFunc(
|
|
"Error deleting event with key = %v - got mysql error - %v",
|
|
entry.Data["Key"], err,
|
|
)
|
|
}
|
|
} else {
|
|
value, err := jsonEncoder(entry.Data["Records"])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// upsert row into the table
|
|
_, err = myC.preparedStmts["upsertRow"].Exec(entry.Data["Key"], value)
|
|
if err != nil {
|
|
return mysqlErrFunc(
|
|
"Unable to upsert event with Key=%v and Value=%v - got mysql error - %v",
|
|
entry.Data["Key"], entry.Data["Records"], err,
|
|
)
|
|
}
|
|
}
|
|
case formatAccess:
|
|
// eventTime is taken from the first entry in the
|
|
// records.
|
|
events, ok := entry.Data["Records"].([]NotificationEvent)
|
|
if !ok {
|
|
return mysqlErrFunc("unable to extract event time due to conversion error of entry.Data[\"Records\"]=%v", entry.Data["Records"])
|
|
}
|
|
eventTime, err := time.Parse(timeFormatAMZ, events[0].EventTime)
|
|
if err != nil {
|
|
return mysqlErrFunc("unable to parse event time \"%s\": %v",
|
|
events[0].EventTime, err)
|
|
}
|
|
|
|
value, err := jsonEncodeEventData(entry.Data["Records"])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = myC.preparedStmts["insertRow"].Exec(eventTime, value)
|
|
if err != nil {
|
|
return mysqlErrFunc("Unable to insert event with value=%v: %v",
|
|
value, err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (myC mySQLConn) Levels() []logrus.Level {
|
|
return []logrus.Level{
|
|
logrus.InfoLevel,
|
|
}
|
|
}
|