From 1692bab60932e5d9f727698b9fc09960b4eb1038 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Mon, 10 May 2021 15:06:58 -0700 Subject: [PATCH] Add support for Elasticsearch 7.x (#12053) - Check ES server version by querying its API - Minimum required version of ES is 5.x - Add deprecation warnings for ES versions < 7.x - Still works with 5.x and 6.x, but support to be removed at a later date. Signed-off-by: Aditya Manthramurthy --- go.mod | 1 + go.sum | 3 + pkg/event/target/elasticsearch.go | 503 +++++++++++++++++++++++++----- 3 files changed, 422 insertions(+), 85 deletions(-) diff --git a/go.mod b/go.mod index 5d0b1a025..43508dcdb 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/dswarbrick/smart v0.0.0-20190505152634-909a45200d6d github.com/dustin/go-humanize v1.0.0 github.com/eclipse/paho.mqtt.golang v1.3.0 + github.com/elastic/go-elasticsearch/v7 v7.12.0 github.com/fatih/color v1.10.0 github.com/fatih/structs v1.1.0 github.com/go-ldap/ldap/v3 v3.2.4 diff --git a/go.sum b/go.sum index 9a1847410..168b570e4 100644 --- a/go.sum +++ b/go.sum @@ -164,6 +164,9 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP github.com/eclipse/paho.mqtt.golang v1.3.0 h1:MU79lqr3FKNKbSrGN7d7bNYqh8MwWW7Zcx0iG+VIw9I= github.com/eclipse/paho.mqtt.golang v1.3.0/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= +github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA= +github.com/elastic/go-elasticsearch/v7 v7.12.0 h1:j4tvcMrZJLp39L2NYvBb7f+lHKPqPHSL3nvB8+/DV+s= +github.com/elastic/go-elasticsearch/v7 v7.12.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= diff --git a/pkg/event/target/elasticsearch.go b/pkg/event/target/elasticsearch.go index 2c948dead..b8a0d7de7 100644 --- a/pkg/event/target/elasticsearch.go +++ b/pkg/event/target/elasticsearch.go @@ -18,12 +18,18 @@ package target import ( + "bytes" "context" + "encoding/base64" + "encoding/json" "fmt" + "io" + "io/ioutil" "net/http" "net/url" "os" "path/filepath" + "strconv" "strings" "time" @@ -31,6 +37,8 @@ import ( xnet "github.com/minio/minio/pkg/net" "github.com/pkg/errors" + elasticsearch7 "github.com/elastic/go-elasticsearch/v7" + "github.com/minio/highwayhash" "github.com/olivere/elastic/v7" ) @@ -54,6 +62,61 @@ const ( EnvElasticPassword = "MINIO_NOTIFY_ELASTICSEARCH_PASSWORD" ) +// ESSupportStatus is a typed string representing the support status for +// Elasticsearch +type ESSupportStatus string + +const ( + // ESSUnknown is default value + ESSUnknown ESSupportStatus = "ESSUnknown" + // ESSDeprecated -> support will be removed in future + ESSDeprecated ESSupportStatus = "ESSDeprecated" + // ESSUnsupported -> we wont work with this ES server + ESSUnsupported ESSupportStatus = "ESSUnsupported" + // ESSSupported -> all good! + ESSSupported ESSupportStatus = "ESSSupported" +) + +func getESVersionSupportStatus(version string) (res ESSupportStatus, err error) { + parts := strings.Split(version, ".") + if len(parts) < 1 { + err = fmt.Errorf("bad ES version string: %s", version) + return + } + + majorVersion, err := strconv.Atoi(parts[0]) + if err != nil { + err = fmt.Errorf("bad ES version string: %s", version) + return + } + + switch { + case majorVersion <= 4: + res = ESSUnsupported + case majorVersion <= 6: + res = ESSDeprecated + default: + res = ESSSupported + } + return +} + +// magic HH-256 key as HH-256 hash of the first 100 decimals of π as utf-8 string with a zero key. +var magicHighwayHash256Key = []byte("\x4b\xe7\x34\xfa\x8e\x23\x8a\xcd\x26\x3e\x83\xe6\xbb\x96\x85\x52\x04\x0f\x93\x5d\xa3\x9f\x44\x14\x97\xe0\x9d\x13\x22\xde\x36\xa0") + +// Interface for elasticsearch client objects +type esClient interface { + isAtleastV7() bool + createIndex(ElasticsearchArgs) error + ping(context.Context, ElasticsearchArgs) (bool, error) + stop() + + entryExists(context.Context, string, string) (bool, error) + removeEntry(context.Context, string, string) error + updateEntry(context.Context, string, string, event.Event) error + addEntry(context.Context, string, event.Event) error +} + // ElasticsearchArgs - Elasticsearch target arguments. type ElasticsearchArgs struct { Enable bool `json:"enable"` @@ -96,7 +159,7 @@ func (a ElasticsearchArgs) Validate() error { type ElasticsearchTarget struct { id event.TargetID args ElasticsearchArgs - client *elastic.Client + client esClient store Store loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) } @@ -116,21 +179,11 @@ func (target *ElasticsearchTarget) IsActive() (bool, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - if target.client == nil { - client, err := newClient(target.args) - if err != nil { - return false, err - } - target.client = client - } - _, code, err := target.client.Ping(target.args.URL.String()).HttpHeadOnly(true).Do(ctx) + err := target.checkAndInitClient(ctx) if err != nil { - if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err, false) { - return false, errNotConnected - } return false, err } - return !(code >= http.StatusBadRequest), nil + return target.client.ping(ctx, target.args) } // Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active. @@ -147,30 +200,8 @@ func (target *ElasticsearchTarget) Save(eventData event.Event) error { // send - sends the event to the target. func (target *ElasticsearchTarget) send(eventData event.Event) error { - - var key string - - exists := func() (bool, error) { - return target.client.Exists().Index(target.args.Index).Type("event").Id(key).Do(context.Background()) - } - - remove := func() error { - exists, err := exists() - if err == nil && exists { - _, err = target.client.Delete().Index(target.args.Index).Type("event").Id(key).Do(context.Background()) - } - return err - } - - update := func() error { - _, err := target.client.Index().Index(target.args.Index).Type("event").BodyJson(map[string]interface{}{"Records": []event.Event{eventData}}).Id(key).Do(context.Background()) - return err - } - - add := func() error { - _, err := target.client.Index().Index(target.args.Index).Type("event").BodyJson(map[string]interface{}{"Records": []event.Event{eventData}}).Do(context.Background()) - return err - } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() if target.args.Format == event.NamespaceFormat { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) @@ -178,17 +209,31 @@ func (target *ElasticsearchTarget) send(eventData event.Event) error { return err } - key = eventData.S3.Bucket.Name + "/" + objectName + // Calculate a hash of the key for the id of the ES document. + // Id's are limited to 512 bytes in V7+, so we need to do this. + var keyHash string + { + key := eventData.S3.Bucket.Name + "/" + objectName + if target.client.isAtleastV7() { + hh, _ := highwayhash.New(magicHighwayHash256Key) // New will never return error since key is 256 bit + hh.Write([]byte(key)) + hashBytes := hh.Sum(nil) + keyHash = base64.URLEncoding.EncodeToString(hashBytes) + } else { + keyHash = key + } + } + if eventData.EventName == event.ObjectRemovedDelete { - err = remove() + err = target.client.removeEntry(ctx, target.args.Index, keyHash) } else { - err = update() + err = target.client.updateEntry(ctx, target.args.Index, keyHash, eventData) } return err } if target.args.Format == event.AccessFormat { - return add() + return target.client.addEntry(ctx, target.args.Index, eventData) } return nil @@ -196,12 +241,12 @@ func (target *ElasticsearchTarget) send(eventData event.Event) error { // Send - reads an event from store and sends it to Elasticsearch. func (target *ElasticsearchTarget) Send(eventKey string) error { - var err error - if target.client == nil { - target.client, err = newClient(target.args) - if err != nil { - return err - } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err := target.checkAndInitClient(ctx) + if err != nil { + return err } eventData, eErr := target.store.Get(eventKey) @@ -229,54 +274,48 @@ func (target *ElasticsearchTarget) Send(eventKey string) error { func (target *ElasticsearchTarget) Close() error { if target.client != nil { // Stops the background processes that the client is running. - target.client.Stop() + target.client.stop() } return nil } -// createIndex - creates the index if it does not exist. -func createIndex(client *elastic.Client, args ElasticsearchArgs) error { - exists, err := client.IndexExists(args.Index).Do(context.Background()) +func (target *ElasticsearchTarget) checkAndInitClient(ctx context.Context) error { + if target.client != nil { + return nil + } + + clientV7, err := newClientV7(target.args) if err != nil { return err } - if !exists { - var createIndex *elastic.IndicesCreateResult - if createIndex, err = client.CreateIndex(args.Index).Do(context.Background()); err != nil { + + // Check es version to confirm if it is supported. + serverSupportStatus, version, err := clientV7.getServerSupportStatus(ctx) + if err != nil { + return err + } + + switch serverSupportStatus { + case ESSUnknown: + return errors.New("unable to determine support status of ES (should not happen)") + + case ESSDeprecated: + fmt.Printf("DEPRECATION WARNING: Support for Elasticsearch version '%s' will be dropped in a future release. Please upgrade to a version >= 7.x.", version) + target.client, err = newClientV56(target.args) + if err != nil { return err } - if !createIndex.Acknowledged { - return fmt.Errorf("index %v not created", args.Index) - } - } - return nil -} + case ESSSupported: + target.client = clientV7 -// newClient - creates a new elastic client with args provided. -func newClient(args ElasticsearchArgs) (*elastic.Client, error) { - // Client options - options := []elastic.ClientOptionFunc{elastic.SetURL(args.URL.String()), - elastic.SetMaxRetries(10), - elastic.SetSniff(false), - elastic.SetHttpClient(&http.Client{Transport: args.Transport})} - // Set basic auth - if args.Username != "" && args.Password != "" { - options = append(options, elastic.SetBasicAuth(args.Username, args.Password)) + default: + // ESSUnsupported case + return fmt.Errorf("Elasticsearch version '%s' is not supported! Please use at least version 7.x.", version) } - // Create a client - client, err := elastic.NewClient(options...) - if err != nil { - // https://github.com/olivere/elastic/wiki/Connection-Errors - if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err, false) { - return nil, errNotConnected - } - return nil, err - } - if err = createIndex(client, args); err != nil { - return nil, err - } - return client, nil + + target.client.createIndex(target.args) + return nil } // NewElasticsearchTarget - creates new Elasticsearch target. @@ -296,8 +335,10 @@ func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan str } } - var err error - target.client, err = newClient(args) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err := target.checkAndInitClient(ctx) if err != nil { if target.store == nil || err != errNotConnected { target.loggerOnce(context.Background(), err, target.ID()) @@ -314,3 +355,295 @@ func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan str return target, nil } + +// ES Client definitions and methods + +type esClientV7 struct { + *elasticsearch7.Client +} + +func newClientV7(args ElasticsearchArgs) (*esClientV7, error) { + // Client options + elasticConfig := elasticsearch7.Config{ + Addresses: []string{args.URL.String()}, + Transport: args.Transport, + MaxRetries: 10, + } + // Set basic auth + if args.Username != "" && args.Password != "" { + elasticConfig.Username = args.Username + elasticConfig.Password = args.Password + } + // Create a client + client, err := elasticsearch7.NewClient(elasticConfig) + if err != nil { + return nil, err + } + clientV7 := &esClientV7{client} + return clientV7, nil +} + +func (c *esClientV7) getServerSupportStatus(ctx context.Context) (ESSupportStatus, string, error) { + resp, err := c.Info( + c.Info.WithContext(ctx), + ) + if err != nil { + return ESSUnknown, "", errNotConnected + } + + defer resp.Body.Close() + + m := make(map[string]interface{}) + err = json.NewDecoder(resp.Body).Decode(&m) + if err != nil { + return ESSUnknown, "", fmt.Errorf("unable to get ES Server version - json parse error: %v", err) + } + + if v, ok := m["version"].(map[string]interface{}); ok { + if ver, ok := v["number"].(string); ok { + status, err := getESVersionSupportStatus(ver) + return status, ver, err + } + } + return ESSUnknown, "", fmt.Errorf("Unable to get ES Server Version - got INFO response: %v", m) + +} + +func (c *esClientV7) isAtleastV7() bool { + return true +} + +// createIndex - creates the index if it does not exist. +func (c *esClientV7) createIndex(args ElasticsearchArgs) error { + res, err := c.Indices.ResolveIndex([]string{args.Index}) + if err != nil { + return err + } + defer res.Body.Close() + + var v map[string]interface{} + found := false + if err := json.NewDecoder(res.Body).Decode(&v); err != nil { + return fmt.Errorf("Error parsing response body: %v", err) + } + + indices := v["indices"].([]interface{}) + for _, index := range indices { + name := index.(map[string]interface{})["name"] + if name == args.Index { + found = true + break + } + } + + if !found { + resp, err := c.Indices.Create(args.Index) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.IsError() { + err := fmt.Errorf("Create index err: %s", res.String()) + return err + } + io.Copy(ioutil.Discard, resp.Body) + return nil + } + return nil +} + +func (c *esClientV7) ping(ctx context.Context, _ ElasticsearchArgs) (bool, error) { + resp, err := c.Ping( + c.Ping.WithContext(ctx), + ) + if err != nil { + return false, errNotConnected + } + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + return !resp.IsError(), nil +} + +func (c *esClientV7) entryExists(ctx context.Context, index string, key string) (bool, error) { + res, err := c.Exists( + index, + key, + c.Exists.WithContext(ctx), + ) + if err != nil { + return false, err + } + io.Copy(ioutil.Discard, res.Body) + res.Body.Close() + return !res.IsError(), nil +} + +func (c *esClientV7) removeEntry(ctx context.Context, index string, key string) error { + exists, err := c.entryExists(ctx, index, key) + if err == nil && exists { + res, err := c.Delete( + index, + key, + c.Delete.WithContext(ctx), + ) + if err != nil { + return err + } + defer res.Body.Close() + if res.IsError() { + err := fmt.Errorf("Delete err: %s", res.String()) + return err + } + io.Copy(ioutil.Discard, res.Body) + return nil + } + return err +} + +func (c *esClientV7) updateEntry(ctx context.Context, index string, key string, eventData event.Event) error { + doc := map[string]interface{}{ + "Records": []event.Event{eventData}, + } + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + err := enc.Encode(doc) + if err != nil { + return err + } + res, err := c.Index( + index, + &buf, + c.Index.WithDocumentID(key), + c.Index.WithContext(ctx), + ) + if err != nil { + return err + } + defer res.Body.Close() + if res.IsError() { + err := fmt.Errorf("Update err: %s", res.String()) + return err + } + io.Copy(ioutil.Discard, res.Body) + return nil +} + +func (c *esClientV7) addEntry(ctx context.Context, index string, eventData event.Event) error { + doc := map[string]interface{}{ + "Records": []event.Event{eventData}, + } + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + err := enc.Encode(doc) + if err != nil { + return err + } + res, err := c.Index( + index, + &buf, + c.Index.WithContext(ctx), + ) + if err != nil { + return err + } + defer res.Body.Close() + if res.IsError() { + err := fmt.Errorf("Add err: %s", res.String()) + return err + } + io.Copy(ioutil.Discard, res.Body) + return nil +} + +func (c *esClientV7) stop() { +} + +// For versions under 7 +type esClientV56 struct { + *elastic.Client +} + +func newClientV56(args ElasticsearchArgs) (*esClientV56, error) { + // Client options + options := []elastic.ClientOptionFunc{elastic.SetURL(args.URL.String()), + elastic.SetMaxRetries(10), + elastic.SetSniff(false), + elastic.SetHttpClient(&http.Client{Transport: args.Transport})} + // Set basic auth + if args.Username != "" && args.Password != "" { + options = append(options, elastic.SetBasicAuth(args.Username, args.Password)) + } + // Create a client + client, err := elastic.NewClient(options...) + if err != nil { + // https://github.com/olivere/elastic/wiki/Connection-Errors + if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err, false) { + return nil, errNotConnected + } + return nil, err + } + return &esClientV56{client}, nil +} + +func (c *esClientV56) isAtleastV7() bool { + return false +} + +// createIndex - creates the index if it does not exist. +func (c *esClientV56) createIndex(args ElasticsearchArgs) error { + exists, err := c.IndexExists(args.Index).Do(context.Background()) + if err != nil { + return err + } + if !exists { + var createIndex *elastic.IndicesCreateResult + if createIndex, err = c.CreateIndex(args.Index).Do(context.Background()); err != nil { + return err + } + + if !createIndex.Acknowledged { + return fmt.Errorf("index %v not created", args.Index) + } + } + return nil +} + +func (c *esClientV56) ping(ctx context.Context, args ElasticsearchArgs) (bool, error) { + _, code, err := c.Ping(args.URL.String()).HttpHeadOnly(true).Do(ctx) + if err != nil { + if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err, false) { + return false, errNotConnected + } + return false, err + } + return !(code >= http.StatusBadRequest), nil + +} + +func (c *esClientV56) entryExists(ctx context.Context, index string, key string) (bool, error) { + return c.Exists().Index(index).Type("event").Id(key).Do(ctx) +} + +func (c *esClientV56) removeEntry(ctx context.Context, index string, key string) error { + exists, err := c.entryExists(ctx, index, key) + if err == nil && exists { + _, err = c.Delete().Index(index).Type("event").Id(key).Do(ctx) + } + return err + +} + +func (c *esClientV56) updateEntry(ctx context.Context, index string, key string, eventData event.Event) error { + _, err := c.Index().Index(index).Type("event").BodyJson(map[string]interface{}{"Records": []event.Event{eventData}}).Id(key).Do(ctx) + return err +} + +func (c *esClientV56) addEntry(ctx context.Context, index string, eventData event.Event) error { + _, err := c.Index().Index(index).Type("event").BodyJson(map[string]interface{}{"Records": []event.Event{eventData}}).Do(ctx) + return err + +} + +func (c *esClientV56) stop() { + c.Stop() +}