// Copyright (c) 2015-2021 MinIO, Inc. // // This file is part of MinIO Object Storage stack // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . package target import ( "context" "fmt" "net/http" "net/url" "os" "path/filepath" "strings" "time" "github.com/minio/minio/pkg/event" xnet "github.com/minio/minio/pkg/net" "github.com/pkg/errors" "github.com/olivere/elastic/v7" ) // Elastic constants const ( ElasticFormat = "format" ElasticURL = "url" ElasticIndex = "index" ElasticQueueDir = "queue_dir" ElasticQueueLimit = "queue_limit" ElasticUsername = "username" ElasticPassword = "password" EnvElasticEnable = "MINIO_NOTIFY_ELASTICSEARCH_ENABLE" EnvElasticFormat = "MINIO_NOTIFY_ELASTICSEARCH_FORMAT" EnvElasticURL = "MINIO_NOTIFY_ELASTICSEARCH_URL" EnvElasticIndex = "MINIO_NOTIFY_ELASTICSEARCH_INDEX" EnvElasticQueueDir = "MINIO_NOTIFY_ELASTICSEARCH_QUEUE_DIR" EnvElasticQueueLimit = "MINIO_NOTIFY_ELASTICSEARCH_QUEUE_LIMIT" EnvElasticUsername = "MINIO_NOTIFY_ELASTICSEARCH_USERNAME" EnvElasticPassword = "MINIO_NOTIFY_ELASTICSEARCH_PASSWORD" ) // ElasticsearchArgs - Elasticsearch target arguments. type ElasticsearchArgs struct { Enable bool `json:"enable"` Format string `json:"format"` URL xnet.URL `json:"url"` Index string `json:"index"` QueueDir string `json:"queueDir"` QueueLimit uint64 `json:"queueLimit"` Transport *http.Transport `json:"-"` Username string `json:"username"` Password string `json:"password"` } // Validate ElasticsearchArgs fields func (a ElasticsearchArgs) Validate() error { if !a.Enable { return nil } if a.URL.IsEmpty() { return errors.New("empty URL") } if a.Format != "" { f := strings.ToLower(a.Format) if f != event.NamespaceFormat && f != event.AccessFormat { return errors.New("format value unrecognized") } } if a.Index == "" { return errors.New("empty index value") } if (a.Username == "" && a.Password != "") || (a.Username != "" && a.Password == "") { return errors.New("username and password should be set in pairs") } return nil } // ElasticsearchTarget - Elasticsearch target. type ElasticsearchTarget struct { id event.TargetID args ElasticsearchArgs client *elastic.Client store Store loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) } // ID - returns target ID. func (target *ElasticsearchTarget) ID() event.TargetID { return target.id } // HasQueueStore - Checks if the queueStore has been configured for the target func (target *ElasticsearchTarget) HasQueueStore() bool { return target.store != nil } // IsActive - Return true if target is up and active func (target *ElasticsearchTarget) IsActive() (bool, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if target.client == nil { client, err := newClient(target.args) if err != nil { return false, err } target.client = client } _, code, err := target.client.Ping(target.args.URL.String()).HttpHeadOnly(true).Do(ctx) if err != nil { if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err, false) { return false, errNotConnected } return false, err } return !(code >= http.StatusBadRequest), nil } // Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active. func (target *ElasticsearchTarget) Save(eventData event.Event) error { if target.store != nil { return target.store.Put(eventData) } err := target.send(eventData) if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err, false) { return errNotConnected } return err } // send - sends the event to the target. func (target *ElasticsearchTarget) send(eventData event.Event) error { var key string exists := func() (bool, error) { return target.client.Exists().Index(target.args.Index).Type("event").Id(key).Do(context.Background()) } remove := func() error { exists, err := exists() if err == nil && exists { _, err = target.client.Delete().Index(target.args.Index).Type("event").Id(key).Do(context.Background()) } return err } update := func() error { _, err := target.client.Index().Index(target.args.Index).Type("event").BodyJson(map[string]interface{}{"Records": []event.Event{eventData}}).Id(key).Do(context.Background()) return err } add := func() error { _, err := target.client.Index().Index(target.args.Index).Type("event").BodyJson(map[string]interface{}{"Records": []event.Event{eventData}}).Do(context.Background()) return err } if target.args.Format == event.NamespaceFormat { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err != nil { return err } key = eventData.S3.Bucket.Name + "/" + objectName if eventData.EventName == event.ObjectRemovedDelete { err = remove() } else { err = update() } return err } if target.args.Format == event.AccessFormat { return add() } return nil } // Send - reads an event from store and sends it to Elasticsearch. func (target *ElasticsearchTarget) Send(eventKey string) error { var err error if target.client == nil { target.client, err = newClient(target.args) if err != nil { return err } } eventData, eErr := target.store.Get(eventKey) if eErr != nil { // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() // Such events will not exist and wouldve been already been sent successfully. if os.IsNotExist(eErr) { return nil } return eErr } if err := target.send(eventData); err != nil { if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err, false) { return errNotConnected } return err } // Delete the event from store. return target.store.Del(eventKey) } // Close - does nothing and available for interface compatibility. func (target *ElasticsearchTarget) Close() error { if target.client != nil { // Stops the background processes that the client is running. target.client.Stop() } return nil } // createIndex - creates the index if it does not exist. func createIndex(client *elastic.Client, args ElasticsearchArgs) error { exists, err := client.IndexExists(args.Index).Do(context.Background()) if err != nil { return err } if !exists { var createIndex *elastic.IndicesCreateResult if createIndex, err = client.CreateIndex(args.Index).Do(context.Background()); err != nil { return err } if !createIndex.Acknowledged { return fmt.Errorf("index %v not created", args.Index) } } return nil } // newClient - creates a new elastic client with args provided. func newClient(args ElasticsearchArgs) (*elastic.Client, error) { // Client options options := []elastic.ClientOptionFunc{elastic.SetURL(args.URL.String()), elastic.SetMaxRetries(10), elastic.SetSniff(false), elastic.SetHttpClient(&http.Client{Transport: args.Transport})} // Set basic auth if args.Username != "" && args.Password != "" { options = append(options, elastic.SetBasicAuth(args.Username, args.Password)) } // Create a client client, err := elastic.NewClient(options...) if err != nil { // https://github.com/olivere/elastic/wiki/Connection-Errors if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err, false) { return nil, errNotConnected } return nil, err } if err = createIndex(client, args); err != nil { return nil, err } return client, nil } // NewElasticsearchTarget - creates new Elasticsearch target. func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*ElasticsearchTarget, error) { target := &ElasticsearchTarget{ id: event.TargetID{ID: id, Name: "elasticsearch"}, args: args, loggerOnce: loggerOnce, } if args.QueueDir != "" { queueDir := filepath.Join(args.QueueDir, storePrefix+"-elasticsearch-"+id) target.store = NewQueueStore(queueDir, args.QueueLimit) if err := target.store.Open(); err != nil { target.loggerOnce(context.Background(), err, target.ID()) return target, err } } var err error target.client, err = newClient(args) if err != nil { if target.store == nil || err != errNotConnected { target.loggerOnce(context.Background(), err, target.ID()) return target, err } } if target.store != nil && !test { // Replays the events from the store. eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID()) // Start replaying events from the store. go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce) } return target, nil }