/*
 * Minio Cloud Storage, (C) 2017 Minio, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// MySQL Notifier implementation. Two formats, "namespace" and
// "access" are supported.
//
// * Namespace format
//
// On each create or update object event in Minio Object storage
// server, a row is created or updated in the table in MySQL. On each
// object removal, the corresponding row is deleted from the table.
//
// A table with a specific structure (column names, column types, and
// primary key/uniqueness constraint) is used. The user may set the
// table name in the configuration. A sample SQL command that creates
// a command with the required structure is:
//
//     CREATE TABLE myminio (
//         key_name VARCHAR(2048),
//         value JSONB,
//         PRIMARY KEY (key_name),
//     );
//
// MySQL's "INSERT ... ON DUPLICATE ..." feature (UPSERT) is used
// here. The implementation has been tested with MySQL Ver 14.14
// Distrib 5.7.17.
//
// * Access format
//
// On each event, a row is appended to the configured table. There is
// no deletion or modification of existing rows.
//
// A different table schema is used for this format. A sample SQL
// commant that creates a table with the required structure is:
//
// CREATE TABLE myminio (
//     event_time TIMESTAMP WITH TIME ZONE NOT NULL,
//     event_data JSONB
// );

package cmd

import (
	"database/sql"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"time"

	"github.com/Sirupsen/logrus"
	"github.com/go-sql-driver/mysql"
)

const (
	// Queries for format=namespace mode.
	upsertRowForNSMySQL = `INSERT INTO %s (key_name, value)
VALUES (?, ?)
ON DUPLICATE KEY UPDATE value=VALUES(value);
`
	deleteRowForNSMySQL = ` DELETE FROM %s
WHERE key_name = ?;`
	createTableForNSMySQL = `CREATE TABLE %s (
    key_name VARCHAR(2048),
    value JSON,
    PRIMARY KEY (key_name)
);`

	// Queries for format=access mode.
	insertRowForAccessMySQL = `INSERT INTO %s (event_time, event_data)
VALUES (?, ?);`
	createTableForAccessMySQL = `CREATE TABLE %s (
    event_time DATETIME NOT NULL,
    event_data JSON
);`

	// Query to check if a table already exists.
	tableExistsMySQL = `SELECT 1 FROM %s;`
)

var (
	mysqlErrFunc = newNotificationErrorFactory("MySQL")

	errMysqlFormat = mysqlErrFunc(`"format" value is invalid - it must be one of "%s" or "%s".`, formatNamespace, formatAccess)
	errMysqlTable  = mysqlErrFunc("Table was not specified in the configuration.")
)

type mySQLNotify struct {
	Enable bool `json:"enable"`

	Format string `json:"format"`

	// pass data-source-name connection string in config
	// directly. This string is formatted according to
	// https://github.com/go-sql-driver/mysql#dsn-data-source-name
	DsnString string `json:"dsnString"`
	// specifying a table name is required.
	Table string `json:"table"`

	// uses the values below if no connection string is specified
	// - however the connection string method offers more
	// flexibility.
	Host     string `json:"host"`
	Port     string `json:"port"`
	User     string `json:"user"`
	Password string `json:"password"`
	Database string `json:"database"`
}

func (m *mySQLNotify) Validate() error {
	if !m.Enable {
		return nil
	}
	if m.Format != formatNamespace && m.Format != formatAccess {
		return errMysqlFormat
	}
	if m.DsnString == "" {
		if _, err := checkURL(m.Host); err != nil {
			return err
		}
	}
	if m.Table == "" {
		return errMysqlTable
	}
	return nil
}

type mySQLConn struct {
	dsnStr        string
	table         string
	format        string
	preparedStmts map[string]*sql.Stmt
	*sql.DB
}

func dialMySQL(msql mySQLNotify) (mc mySQLConn, e error) {
	if !msql.Enable {
		return mc, errNotifyNotEnabled
	}

	dsnStr := msql.DsnString
	// check if connection string is specified
	if dsnStr == "" {
		// build from other parameters
		config := mysql.Config{
			User:   msql.User,
			Passwd: msql.Password,
			Net:    "tcp",
			Addr:   msql.Host + ":" + msql.Port,
			DBName: msql.Database,
		}
		dsnStr = config.FormatDSN()
	}

	db, err := sql.Open("mysql", dsnStr)
	if err != nil {
		return mc, mysqlErrFunc(
			"Connection opening failure (dsnStr=%s): %v",
			dsnStr, err)
	}

	// ping to check that server is actually reachable.
	err = db.Ping()
	if err != nil {
		return mc, mysqlErrFunc(
			"Ping to server failed with: %v", err)
	}

	// check that table exists - if not, create it.
	_, err = db.Exec(fmt.Sprintf(tableExistsMySQL, msql.Table))
	if err != nil {
		createStmt := createTableForNSMySQL
		if msql.Format == formatAccess {
			createStmt = createTableForAccessMySQL
		}

		// most likely, table does not exist. try to create it:
		_, errCreate := db.Exec(fmt.Sprintf(createStmt, msql.Table))
		if errCreate != nil {
			// failed to create the table. error out.
			return mc, mysqlErrFunc(
				"'Select' failed with %v, then 'Create Table' failed with %v",
				err, errCreate,
			)
		}
	}

	// create prepared statements
	stmts := make(map[string]*sql.Stmt)
	switch msql.Format {
	case formatNamespace:
		// insert or update statement
		stmts["upsertRow"], err = db.Prepare(fmt.Sprintf(upsertRowForNSMySQL,
			msql.Table))
		if err != nil {
			return mc, mysqlErrFunc("create UPSERT prepared statement failed with: %v", err)
		}
		// delete statement
		stmts["deleteRow"], err = db.Prepare(fmt.Sprintf(deleteRowForNSMySQL,
			msql.Table))
		if err != nil {
			return mc, mysqlErrFunc("create DELETE prepared statement failed with: %v", err)
		}
	case formatAccess:
		// insert statement
		stmts["insertRow"], err = db.Prepare(fmt.Sprintf(insertRowForAccessMySQL,
			msql.Table))
		if err != nil {
			return mc, mysqlErrFunc(
				"create INSERT prepared statement failed with: %v", err)
		}

	}
	return mySQLConn{dsnStr, msql.Table, msql.Format, stmts, db}, nil
}

func newMySQLNotify(accountID string) (*logrus.Logger, error) {
	mysqlNotify := serverConfig.Notify.GetMySQLByID(accountID)

	// Dial mysql
	myC, err := dialMySQL(mysqlNotify)
	if err != nil {
		return nil, err
	}

	mySQLLog := logrus.New()

	mySQLLog.Out = ioutil.Discard

	mySQLLog.Formatter = new(logrus.JSONFormatter)

	mySQLLog.Hooks.Add(myC)

	return mySQLLog, nil
}

func (myC mySQLConn) Close() {
	// first close all prepared statements
	for _, v := range myC.preparedStmts {
		_ = v.Close()
	}
	// close db connection
	_ = myC.DB.Close()
}

func (myC mySQLConn) Fire(entry *logrus.Entry) error {
	// get event type by trying to convert to string
	entryEventType, ok := entry.Data["EventType"].(string)
	if !ok {
		// ignore event if converting EventType to string
		// fails.
		return nil
	}

	jsonEncoder := func(d interface{}) ([]byte, error) {
		value, err := json.Marshal(map[string]interface{}{
			"Records": d,
		})
		if err != nil {
			return nil, mysqlErrFunc(
				"Unable to encode event %v to JSON: %v", d, err)
		}
		return value, nil
	}

	switch myC.format {
	case formatNamespace:
		// Check for event delete
		if eventMatch(entryEventType, []string{"s3:ObjectRemoved:*"}) {
			// delete row from the table
			_, err := myC.preparedStmts["deleteRow"].Exec(entry.Data["Key"])
			if err != nil {
				return mysqlErrFunc(
					"Error deleting event with key = %v - got mysql error - %v",
					entry.Data["Key"], err,
				)
			}
		} else {
			value, err := jsonEncoder(entry.Data["Records"])
			if err != nil {
				return err
			}

			// upsert row into the table
			_, err = myC.preparedStmts["upsertRow"].Exec(entry.Data["Key"], value)
			if err != nil {
				return mysqlErrFunc(
					"Unable to upsert event with Key=%v and Value=%v - got mysql error - %v",
					entry.Data["Key"], entry.Data["Records"], err,
				)
			}
		}
	case formatAccess:
		// eventTime is taken from the first entry in the
		// records.
		events, ok := entry.Data["Records"].([]NotificationEvent)
		if !ok {
			return mysqlErrFunc("unable to extract event time due to conversion error of entry.Data[\"Records\"]=%v", entry.Data["Records"])
		}
		eventTime, err := time.Parse(timeFormatAMZ, events[0].EventTime)
		if err != nil {
			return mysqlErrFunc("unable to parse event time \"%s\": %v",
				events[0].EventTime, err)
		}

		value, err := jsonEncodeEventData(entry.Data["Records"])
		if err != nil {
			return err
		}

		_, err = myC.preparedStmts["insertRow"].Exec(eventTime, value)
		if err != nil {
			return mysqlErrFunc("Unable to insert event with value=%v: %v",
				value, err)
		}
	}

	return nil
}

func (myC mySQLConn) Levels() []logrus.Level {
	return []logrus.Level{
		logrus.InfoLevel,
	}
}