minio/pkg/event/target/redis.go

298 lines
6.9 KiB
Go

/*
* MinIO Cloud Storage, (C) 2018 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
"strings"
"time"
"github.com/gomodule/redigo/redis"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
)
// RedisArgs - Redis target arguments.
type RedisArgs struct {
Enable bool `json:"enable"`
Format string `json:"format"`
Addr xnet.Host `json:"address"`
Password string `json:"password"`
Key string `json:"key"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
}
// Validate RedisArgs fields
func (r RedisArgs) Validate() error {
if !r.Enable {
return nil
}
if r.Format != "" {
f := strings.ToLower(r.Format)
if f != event.NamespaceFormat && f != event.AccessFormat {
return fmt.Errorf("unrecognized format")
}
}
if r.Key == "" {
return fmt.Errorf("empty key")
}
if r.QueueDir != "" {
if !filepath.IsAbs(r.QueueDir) {
return errors.New("queueDir path should be absolute")
}
}
if r.QueueLimit > 10000 {
return errors.New("queueLimit should not exceed 10000")
}
return nil
}
func (r RedisArgs) validateFormat(c redis.Conn) error {
typeAvailable, err := redis.String(c.Do("TYPE", r.Key))
if err != nil {
return err
}
if typeAvailable != "none" {
expectedType := "hash"
if r.Format == event.AccessFormat {
expectedType = "list"
}
if typeAvailable != expectedType {
return fmt.Errorf("expected type %v does not match with available type %v", expectedType, typeAvailable)
}
}
return nil
}
// RedisTarget - Redis target.
type RedisTarget struct {
id event.TargetID
args RedisArgs
pool *redis.Pool
store Store
firstPing bool
}
// ID - returns target ID.
func (target *RedisTarget) ID() event.TargetID {
return target.id
}
// Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active.
func (target *RedisTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
conn := target.pool.Get()
defer func() {
cErr := conn.Close()
logger.LogOnceIf(context.Background(), cErr, target.ID())
}()
_, pingErr := conn.Do("PING")
if pingErr != nil {
if IsConnRefusedErr(pingErr) {
return errNotConnected
}
return pingErr
}
return target.send(eventData)
}
// send - sends an event to the redis.
func (target *RedisTarget) send(eventData event.Event) error {
conn := target.pool.Get()
defer func() {
cErr := conn.Close()
logger.LogOnceIf(context.Background(), cErr, target.ID())
}()
if target.args.Format == event.NamespaceFormat {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
if eventData.EventName == event.ObjectRemovedDelete {
_, err = conn.Do("HDEL", target.args.Key, key)
} else {
var data []byte
if data, err = json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}}); err != nil {
return err
}
_, err = conn.Do("HSET", target.args.Key, key, data)
}
if err != nil {
return err
}
}
if target.args.Format == event.AccessFormat {
data, err := json.Marshal([]interface{}{eventData.EventTime, []event.Event{eventData}})
if err != nil {
return err
}
if _, err := conn.Do("RPUSH", target.args.Key, data); err != nil {
return err
}
}
return nil
}
// Send - reads an event from store and sends it to redis.
func (target *RedisTarget) Send(eventKey string) error {
conn := target.pool.Get()
defer func() {
cErr := conn.Close()
logger.LogOnceIf(context.Background(), cErr, target.ID())
}()
_, pingErr := conn.Do("PING")
if pingErr != nil {
if IsConnRefusedErr(pingErr) {
return errNotConnected
}
return pingErr
}
if !target.firstPing {
if err := target.args.validateFormat(conn); err != nil {
if IsConnRefusedErr(err) {
return errNotConnected
}
return err
}
target.firstPing = true
}
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and would've been already been sent successfully.
if os.IsNotExist(eErr) {
return nil
}
return eErr
}
if err := target.send(eventData); err != nil {
if IsConnRefusedErr(err) {
return errNotConnected
}
return err
}
// Delete the event from store.
return target.store.Del(eventKey)
}
// Close - does nothing and available for interface compatibility.
func (target *RedisTarget) Close() error {
return nil
}
// NewRedisTarget - creates new Redis target.
func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}) (*RedisTarget, error) {
pool := &redis.Pool{
MaxIdle: 3,
IdleTimeout: 2 * 60 * time.Second,
Dial: func() (redis.Conn, error) {
conn, err := redis.Dial("tcp", args.Addr.String())
if err != nil {
return nil, err
}
if args.Password == "" {
return conn, nil
}
if _, err = conn.Do("AUTH", args.Password); err != nil {
cErr := conn.Close()
targetID := event.TargetID{ID: id, Name: "redis"}
logger.LogOnceIf(context.Background(), cErr, targetID.String())
return nil, err
}
return conn, nil
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-redis-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
return nil, oErr
}
}
target := &RedisTarget{
id: event.TargetID{ID: id, Name: "redis"},
args: args,
pool: pool,
store: store,
}
conn := target.pool.Get()
defer func() {
cErr := conn.Close()
logger.LogOnceIf(context.Background(), cErr, target.ID())
}()
_, pingErr := conn.Do("PING")
if pingErr != nil {
if target.store == nil || !(IsConnRefusedErr(pingErr) || IsConnResetErr(pingErr)) {
return nil, pingErr
}
} else {
if err := target.args.validateFormat(conn); err != nil {
return nil, err
}
target.firstPing = true
}
if target.store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh)
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh)
}
return target, nil
}