Enable persistent event store in elasticsearch (#7564)

This commit is contained in:
Praveen raj Mani
2019-07-12 08:23:20 +05:30
committed by Nitish Tiwari
parent 2337e5f803
commit bba562235b
13 changed files with 164 additions and 45 deletions

View File

@@ -18,23 +18,28 @@ package target
import (
"context"
"errors"
"fmt"
"net"
"net/url"
"os"
"path/filepath"
"strings"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
"github.com/pkg/errors"
"gopkg.in/olivere/elastic.v5"
)
// ElasticsearchArgs - Elasticsearch target arguments.
type ElasticsearchArgs struct {
Enable bool `json:"enable"`
Format string `json:"format"`
URL xnet.URL `json:"url"`
Index string `json:"index"`
Enable bool `json:"enable"`
Format string `json:"format"`
URL xnet.URL `json:"url"`
Index string `json:"index"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
}
// Validate ElasticsearchArgs fields
@@ -54,6 +59,9 @@ func (a ElasticsearchArgs) Validate() error {
if a.Index == "" {
return errors.New("empty index value")
}
if a.QueueLimit > 10000 {
return errors.New("queueLimit should not exceed 10000")
}
return nil
}
@@ -62,6 +70,7 @@ type ElasticsearchTarget struct {
id event.TargetID
args ElasticsearchArgs
client *elastic.Client
store Store
}
// ID - returns target ID.
@@ -69,17 +78,31 @@ func (target *ElasticsearchTarget) ID() event.TargetID {
return target.id
}
// Save - Sends event directly without persisting.
// Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active.
func (target *ElasticsearchTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
if _, err := net.Dial("tcp", target.args.URL.Host); err != nil {
return errNotConnected
}
return target.send(eventData)
}
// send - sends the event to the target.
func (target *ElasticsearchTarget) send(eventData event.Event) error {
var key string
exists := func() (bool, error) {
return target.client.Exists().Index(target.args.Index).Type("event").Id(key).Do(context.Background())
}
remove := func() error {
_, err := target.client.Delete().Index(target.args.Index).Type("event").Id(key).Do(context.Background())
exists, err := exists()
if err == nil && exists {
_, err = target.client.Delete().Index(target.args.Index).Type("event").Id(key).Do(context.Background())
}
return err
}
@@ -116,9 +139,38 @@ func (target *ElasticsearchTarget) send(eventData event.Event) error {
return nil
}
// Send - interface compatible method does no-op.
// Send - reads an event from store and sends it to Elasticsearch.
func (target *ElasticsearchTarget) Send(eventKey string) error {
return nil
var err error
if target.client == nil {
target.client, err = newClient(target.args)
if err != nil {
return err
}
}
if _, err := net.Dial("tcp", target.args.URL.Host); err != nil {
return errNotConnected
}
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os.IsNotExist(eErr) {
return nil
}
return eErr
}
if err := target.send(eventData); err != nil {
return err
}
// Delete the event from store.
return target.store.Del(eventKey)
}
// Close - does nothing and available for interface compatibility.
@@ -126,32 +178,80 @@ func (target *ElasticsearchTarget) Close() error {
return nil
}
// NewElasticsearchTarget - creates new Elasticsearch target.
func NewElasticsearchTarget(id string, args ElasticsearchArgs) (*ElasticsearchTarget, error) {
client, err := elastic.NewClient(elastic.SetURL(args.URL.String()), elastic.SetSniff(false), elastic.SetMaxRetries(10))
if err != nil {
return nil, err
}
// createIndex - creates the index if it does not exist.
func createIndex(client *elastic.Client, args ElasticsearchArgs) error {
exists, err := client.IndexExists(args.Index).Do(context.Background())
if err != nil {
return nil, err
return err
}
if !exists {
var createIndex *elastic.IndicesCreateResult
if createIndex, err = client.CreateIndex(args.Index).Do(context.Background()); err != nil {
return nil, err
return err
}
if !createIndex.Acknowledged {
return nil, fmt.Errorf("index %v not created", args.Index)
return fmt.Errorf("index %v not created", args.Index)
}
}
return nil
}
// newClient - creates a new elastic client with args provided.
func newClient(args ElasticsearchArgs) (*elastic.Client, error) {
client, clientErr := elastic.NewClient(elastic.SetURL(args.URL.String()), elastic.SetSniff(false), elastic.SetMaxRetries(10))
if clientErr != nil {
if !(errors.Cause(clientErr) == elastic.ErrNoClient) {
return nil, clientErr
}
} else {
if err := createIndex(client, args); err != nil {
return nil, err
}
}
return client, nil
}
// NewElasticsearchTarget - creates new Elasticsearch target.
func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}) (*ElasticsearchTarget, error) {
var client *elastic.Client
var err error
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-elasticsearch-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
return nil, oErr
}
}
return &ElasticsearchTarget{
_, derr := net.Dial("tcp", args.URL.Host)
if derr != nil {
if store == nil {
return nil, derr
}
} else {
client, err = newClient(args)
if err != nil {
return nil, err
}
}
target := &ElasticsearchTarget{
id: event.TargetID{ID: id, Name: "elasticsearch"},
args: args,
client: client,
}, nil
store: store,
}
if target.store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh)
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh)
}
return target, nil
}

View File

@@ -37,7 +37,7 @@ type KafkaArgs struct {
Brokers []xnet.Host `json:"brokers"`
Topic string `json:"topic"`
QueueDir string `json:"queueDir"`
QueueLimit uint16 `json:"queueLimit"`
QueueLimit uint64 `json:"queueLimit"`
TLS struct {
Enable bool `json:"enable"`
SkipVerify bool `json:"skipVerify"`

View File

@@ -48,7 +48,7 @@ type MQTTArgs struct {
KeepAlive time.Duration `json:"keepAliveInterval"`
RootCAs *x509.CertPool `json:"-"`
QueueDir string `json:"queueDir"`
QueueLimit uint16 `json:"queueLimit"`
QueueLimit uint64 `json:"queueLimit"`
}
// Validate MQTTArgs fields
@@ -139,7 +139,7 @@ func (target *MQTTTarget) Send(eventKey string) error {
return target.store.Del(eventKey)
}
// Save - saves the events to the store if questore is configured, which will be replayed when the mqtt connection is active.
// Save - saves the events to the store if queuestore is configured, which will be replayed when the mqtt connection is active.
func (target *MQTTTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)

View File

@@ -25,6 +25,7 @@ import (
"sync"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/sys"
)
const (
@@ -36,15 +37,22 @@ const (
type QueueStore struct {
sync.RWMutex
directory string
eC uint16
limit uint16
eC uint64
limit uint64
}
// NewQueueStore - Creates an instance for QueueStore.
func NewQueueStore(directory string, limit uint16) *QueueStore {
func NewQueueStore(directory string, limit uint64) *QueueStore {
if limit == 0 {
limit = maxLimit
currRlimit, _, err := sys.GetMaxOpenFileLimit()
if err == nil {
if currRlimit > limit {
limit = currRlimit
}
}
}
queueStore := &QueueStore{
directory: directory,
limit: limit,
@@ -61,7 +69,7 @@ func (store *QueueStore) Open() error {
return terr
}
eCount := uint16(len(store.list()))
eCount := uint64(len(store.list()))
if eCount >= store.limit {
return errLimitExceeded
}

View File

@@ -33,7 +33,7 @@ var queueDir = filepath.Join(os.TempDir(), "minio_test")
var testEvent = event.Event{EventVersion: "1.0", EventSource: "test_source", AwsRegion: "test_region", EventTime: "test_time", EventName: event.ObjectAccessedGet}
// Initialize the store.
func setUpStore(directory string, limit uint16) (Store, error) {
func setUpStore(directory string, limit uint64) (Store, error) {
store := NewQueueStore(queueDir, limit)
if oErr := store.Open(); oErr != nil {
return nil, oErr

View File

@@ -43,7 +43,7 @@ type WebhookArgs struct {
Endpoint xnet.URL `json:"endpoint"`
RootCAs *x509.CertPool `json:"-"`
QueueDir string `json:"queueDir"`
QueueLimit uint16 `json:"queueLimit"`
QueueLimit uint64 `json:"queueLimit"`
}
// Validate WebhookArgs fields