Enable event persistence in MySQL and PostgreSQL (#7629)

This commit is contained in:
Praveen raj Mani
2019-07-24 22:48:29 +05:30
committed by kannappanr
parent ac82798d0a
commit 55d4eee6f1
7 changed files with 328 additions and 111 deletions

View File

@@ -56,8 +56,11 @@ package target
import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"
@@ -79,15 +82,17 @@ const (
// MySQLArgs - MySQL target arguments.
type MySQLArgs struct {
Enable bool `json:"enable"`
Format string `json:"format"`
DSN string `json:"dsnString"`
Table string `json:"table"`
Host xnet.URL `json:"host"`
Port string `json:"port"`
User string `json:"user"`
Password string `json:"password"`
Database string `json:"database"`
Enable bool `json:"enable"`
Format string `json:"format"`
DSN string `json:"dsnString"`
Table string `json:"table"`
Host xnet.URL `json:"host"`
Port string `json:"port"`
User string `json:"user"`
Password string `json:"password"`
Database string `json:"database"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
}
// Validate MySQLArgs fields
@@ -123,6 +128,16 @@ func (m MySQLArgs) Validate() error {
return fmt.Errorf("database unspecified")
}
}
if m.QueueDir != "" {
if !filepath.IsAbs(m.QueueDir) {
return errors.New("queueDir path should be absolute")
}
}
if m.QueueLimit > 10000 {
return errors.New("queueLimit should not exceed 10000")
}
return nil
}
@@ -134,6 +149,8 @@ type MySQLTarget struct {
deleteStmt *sql.Stmt
insertStmt *sql.Stmt
db *sql.DB
store Store
firstPing bool
}
// ID - returns target ID.
@@ -141,11 +158,21 @@ func (target *MySQLTarget) ID() event.TargetID {
return target.id
}
// Save - Sends event directly without persisting.
// Save - saves the events to the store which will be replayed when the SQL connection is active.
func (target *MySQLTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
if err := target.db.Ping(); err != nil {
if IsConnErr(err) {
return errNotConnected
}
return err
}
return target.send(eventData)
}
// send - sends an event to the mysql.
func (target *MySQLTarget) send(eventData event.Event) error {
if target.args.Format == event.NamespaceFormat {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
@@ -164,6 +191,7 @@ func (target *MySQLTarget) send(eventData event.Event) error {
_, err = target.updateStmt.Exec(key, data)
}
return err
}
@@ -179,15 +207,51 @@ func (target *MySQLTarget) send(eventData event.Event) error {
}
_, err = target.insertStmt.Exec(eventTime, data)
return err
}
return nil
}
// Send - interface compatible method does no-op.
// Send - reads an event from store and sends it to MySQL.
func (target *MySQLTarget) Send(eventKey string) error {
return nil
if err := target.db.Ping(); err != nil {
if IsConnErr(err) {
return errNotConnected
}
return err
}
if !target.firstPing {
if err := target.executeStmts(); err != nil {
if IsConnErr(err) {
return errNotConnected
}
return err
}
}
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os.IsNotExist(eErr) {
return nil
}
return eErr
}
if err := target.send(eventData); err != nil {
if IsConnErr(err) {
return errNotConnected
}
return err
}
// Delete the event from store.
return target.store.Del(eventKey)
}
// Close - closes underneath connections to MySQL database.
@@ -210,8 +274,45 @@ func (target *MySQLTarget) Close() error {
return target.db.Close()
}
// Executes the table creation statements.
func (target *MySQLTarget) executeStmts() error {
_, err := target.db.Exec(fmt.Sprintf(mysqlTableExists, target.args.Table))
if err != nil {
createStmt := mysqlCreateNamespaceTable
if target.args.Format == event.AccessFormat {
createStmt = mysqlCreateAccessTable
}
if _, dbErr := target.db.Exec(fmt.Sprintf(createStmt, target.args.Table)); dbErr != nil {
return dbErr
}
}
switch target.args.Format {
case event.NamespaceFormat:
// insert or update statement
if target.updateStmt, err = target.db.Prepare(fmt.Sprintf(mysqlUpdateRow, target.args.Table)); err != nil {
return err
}
// delete statement
if target.deleteStmt, err = target.db.Prepare(fmt.Sprintf(mysqlDeleteRow, target.args.Table)); err != nil {
return err
}
case event.AccessFormat:
// insert statement
if target.insertStmt, err = target.db.Prepare(fmt.Sprintf(mysqlInsertRow, target.args.Table)); err != nil {
return err
}
}
return nil
}
// NewMySQLTarget - creates new MySQL target.
func NewMySQLTarget(id string, args MySQLArgs) (*MySQLTarget, error) {
func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}) (*MySQLTarget, error) {
var firstPing bool
if args.DSN == "" {
config := mysql.Config{
User: args.User,
@@ -230,45 +331,42 @@ func NewMySQLTarget(id string, args MySQLArgs) (*MySQLTarget, error) {
return nil, err
}
if err = db.Ping(); err != nil {
return nil, err
}
var store Store
if _, err = db.Exec(fmt.Sprintf(mysqlTableExists, args.Table)); err != nil {
createStmt := mysqlCreateNamespaceTable
if args.Format == event.AccessFormat {
createStmt = mysqlCreateAccessTable
}
if _, err = db.Exec(fmt.Sprintf(createStmt, args.Table)); err != nil {
return nil, err
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mysql-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
return nil, oErr
}
}
var updateStmt, deleteStmt, insertStmt *sql.Stmt
switch args.Format {
case event.NamespaceFormat:
// insert or update statement
if updateStmt, err = db.Prepare(fmt.Sprintf(mysqlUpdateRow, args.Table)); err != nil {
return nil, err
}
// delete statement
if deleteStmt, err = db.Prepare(fmt.Sprintf(mysqlDeleteRow, args.Table)); err != nil {
return nil, err
}
case event.AccessFormat:
// insert statement
if insertStmt, err = db.Prepare(fmt.Sprintf(mysqlInsertRow, args.Table)); err != nil {
return nil, err
}
target := &MySQLTarget{
id: event.TargetID{ID: id, Name: "mysql"},
args: args,
db: db,
store: store,
firstPing: firstPing,
}
return &MySQLTarget{
id: event.TargetID{ID: id, Name: "mysql"},
args: args,
updateStmt: updateStmt,
deleteStmt: deleteStmt,
insertStmt: insertStmt,
db: db,
}, nil
err = target.db.Ping()
if err != nil {
if target.store == nil || !IsConnRefusedErr(err) {
return nil, err
}
} else {
if err = target.executeStmts(); err != nil {
return nil, err
}
target.firstPing = true
}
if target.store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh)
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh)
}
return target, nil
}

View File

@@ -56,8 +56,11 @@ package target
import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"
@@ -89,6 +92,8 @@ type PostgreSQLArgs struct {
User string `json:"user"` // default: user running minio
Password string `json:"password"` // default: no password
Database string `json:"database"` // default: same as user
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
}
// Validate PostgreSQLArgs fields
@@ -122,6 +127,15 @@ func (p PostgreSQLArgs) Validate() error {
}
}
if p.QueueDir != "" {
if !filepath.IsAbs(p.QueueDir) {
return errors.New("queueDir path should be absolute")
}
}
if p.QueueLimit > 10000 {
return errors.New("queueLimit should not exceed 10000")
}
return nil
}
@@ -133,6 +147,8 @@ type PostgreSQLTarget struct {
deleteStmt *sql.Stmt
insertStmt *sql.Stmt
db *sql.DB
store Store
firstPing bool
}
// ID - returns target ID.
@@ -140,11 +156,26 @@ func (target *PostgreSQLTarget) ID() event.TargetID {
return target.id
}
// Save - Sends event directly without persisting.
// Save - saves the events to the store if questore is configured, which will be replayed when the PostgreSQL connection is active.
func (target *PostgreSQLTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
if err := target.db.Ping(); err != nil {
if IsConnErr(err) {
return errNotConnected
}
return err
}
return target.send(eventData)
}
// IsConnErr - To detect a connection error.
func IsConnErr(err error) bool {
return IsConnRefusedErr(err) || err.Error() == "sql: database is closed" || err.Error() == "sql: statement is closed" || err.Error() == "invalid connection"
}
// send - sends an event to the PostgreSQL.
func (target *PostgreSQLTarget) send(eventData event.Event) error {
if target.args.Format == event.NamespaceFormat {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
@@ -177,16 +208,52 @@ func (target *PostgreSQLTarget) send(eventData event.Event) error {
return err
}
_, err = target.insertStmt.Exec(eventTime, data)
return err
if _, err = target.insertStmt.Exec(eventTime, data); err != nil {
return err
}
}
return nil
}
// Send - interface compatible method does no-op.
// Send - reads an event from store and sends it to PostgreSQL.
func (target *PostgreSQLTarget) Send(eventKey string) error {
return nil
if err := target.db.Ping(); err != nil {
if IsConnErr(err) {
return errNotConnected
}
return err
}
if !target.firstPing {
if err := target.executeStmts(); err != nil {
if IsConnErr(err) {
return errNotConnected
}
return err
}
}
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os.IsNotExist(eErr) {
return nil
}
return eErr
}
if err := target.send(eventData); err != nil {
if IsConnErr(err) {
return errNotConnected
}
return err
}
// Delete the event from store.
return target.store.Del(eventKey)
}
// Close - closes underneath connections to PostgreSQL database.
@@ -209,8 +276,45 @@ func (target *PostgreSQLTarget) Close() error {
return target.db.Close()
}
// Executes the table creation statements.
func (target *PostgreSQLTarget) executeStmts() error {
_, err := target.db.Exec(fmt.Sprintf(psqlTableExists, target.args.Table))
if err != nil {
createStmt := psqlCreateNamespaceTable
if target.args.Format == event.AccessFormat {
createStmt = psqlCreateAccessTable
}
if _, dbErr := target.db.Exec(fmt.Sprintf(createStmt, target.args.Table)); dbErr != nil {
return dbErr
}
}
switch target.args.Format {
case event.NamespaceFormat:
// insert or update statement
if target.updateStmt, err = target.db.Prepare(fmt.Sprintf(psqlUpdateRow, target.args.Table)); err != nil {
return err
}
// delete statement
if target.deleteStmt, err = target.db.Prepare(fmt.Sprintf(psqlDeleteRow, target.args.Table)); err != nil {
return err
}
case event.AccessFormat:
// insert statement
if target.insertStmt, err = target.db.Prepare(fmt.Sprintf(psqlInsertRow, target.args.Table)); err != nil {
return err
}
}
return nil
}
// NewPostgreSQLTarget - creates new PostgreSQL target.
func NewPostgreSQLTarget(id string, args PostgreSQLArgs) (*PostgreSQLTarget, error) {
func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}) (*PostgreSQLTarget, error) {
var firstPing bool
params := []string{args.ConnectionString}
if !args.Host.IsEmpty() {
params = append(params, "host="+args.Host.String())
@@ -234,45 +338,42 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs) (*PostgreSQLTarget, err
return nil, err
}
if err = db.Ping(); err != nil {
return nil, err
}
var store Store
if _, err = db.Exec(fmt.Sprintf(psqlTableExists, args.Table)); err != nil {
createStmt := psqlCreateNamespaceTable
if args.Format == event.AccessFormat {
createStmt = psqlCreateAccessTable
}
if _, err = db.Exec(fmt.Sprintf(createStmt, args.Table)); err != nil {
return nil, err
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-postgresql-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
return nil, oErr
}
}
var updateStmt, deleteStmt, insertStmt *sql.Stmt
switch args.Format {
case event.NamespaceFormat:
// insert or update statement
if updateStmt, err = db.Prepare(fmt.Sprintf(psqlUpdateRow, args.Table)); err != nil {
return nil, err
}
// delete statement
if deleteStmt, err = db.Prepare(fmt.Sprintf(psqlDeleteRow, args.Table)); err != nil {
return nil, err
}
case event.AccessFormat:
// insert statement
if insertStmt, err = db.Prepare(fmt.Sprintf(psqlInsertRow, args.Table)); err != nil {
return nil, err
}
target := &PostgreSQLTarget{
id: event.TargetID{ID: id, Name: "postgresql"},
args: args,
db: db,
store: store,
firstPing: firstPing,
}
return &PostgreSQLTarget{
id: event.TargetID{ID: id, Name: "postgresql"},
args: args,
updateStmt: updateStmt,
deleteStmt: deleteStmt,
insertStmt: insertStmt,
db: db,
}, nil
err = target.db.Ping()
if err != nil {
if target.store == nil || !IsConnRefusedErr(err) {
return nil, err
}
} else {
if err = target.executeStmts(); err != nil {
return nil, err
}
target.firstPing = true
}
if target.store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh)
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh)
}
return target, nil
}