2021-04-18 15:41:13 -04:00
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
2018-03-15 16:03:41 -04:00
package target
import (
2021-05-10 18:06:58 -04:00
"bytes"
2018-03-15 16:03:41 -04:00
"context"
2021-05-10 18:06:58 -04:00
"encoding/base64"
"encoding/json"
2018-03-15 16:03:41 -04:00
"fmt"
2021-05-10 18:06:58 -04:00
"io"
"io/ioutil"
2020-04-28 16:57:56 -04:00
"net/http"
2018-03-15 16:03:41 -04:00
"net/url"
2019-07-11 22:53:20 -04:00
"os"
"path/filepath"
2021-05-10 18:06:58 -04:00
"strconv"
2018-07-18 14:22:29 -04:00
"strings"
2020-04-28 16:57:56 -04:00
"time"
2018-03-15 16:03:41 -04:00
2021-06-01 17:59:40 -04:00
"github.com/minio/minio/internal/event"
2021-06-14 17:54:37 -04:00
xnet "github.com/minio/pkg/net"
2019-07-11 22:53:20 -04:00
"github.com/pkg/errors"
2018-03-15 16:03:41 -04:00
2021-05-10 18:06:58 -04:00
elasticsearch7 "github.com/elastic/go-elasticsearch/v7"
"github.com/minio/highwayhash"
2020-12-10 16:23:06 -05:00
"github.com/olivere/elastic/v7"
2018-03-15 16:03:41 -04:00
)
2019-10-23 01:59:13 -04:00
// Elastic constants
const (
ElasticFormat = "format"
ElasticURL = "url"
ElasticIndex = "index"
ElasticQueueDir = "queue_dir"
ElasticQueueLimit = "queue_limit"
2020-08-23 12:43:48 -04:00
ElasticUsername = "username"
ElasticPassword = "password"
2019-10-23 01:59:13 -04:00
2019-12-04 18:32:37 -05:00
EnvElasticEnable = "MINIO_NOTIFY_ELASTICSEARCH_ENABLE"
2019-10-23 01:59:13 -04:00
EnvElasticFormat = "MINIO_NOTIFY_ELASTICSEARCH_FORMAT"
EnvElasticURL = "MINIO_NOTIFY_ELASTICSEARCH_URL"
EnvElasticIndex = "MINIO_NOTIFY_ELASTICSEARCH_INDEX"
EnvElasticQueueDir = "MINIO_NOTIFY_ELASTICSEARCH_QUEUE_DIR"
EnvElasticQueueLimit = "MINIO_NOTIFY_ELASTICSEARCH_QUEUE_LIMIT"
2020-08-23 12:43:48 -04:00
EnvElasticUsername = "MINIO_NOTIFY_ELASTICSEARCH_USERNAME"
EnvElasticPassword = "MINIO_NOTIFY_ELASTICSEARCH_PASSWORD"
2019-10-23 01:59:13 -04:00
)
2021-05-10 18:06:58 -04:00
// ESSupportStatus is a typed string representing the support status for
// Elasticsearch
type ESSupportStatus string
const (
// ESSUnknown is default value
ESSUnknown ESSupportStatus = "ESSUnknown"
// ESSDeprecated -> support will be removed in future
ESSDeprecated ESSupportStatus = "ESSDeprecated"
// ESSUnsupported -> we wont work with this ES server
ESSUnsupported ESSupportStatus = "ESSUnsupported"
// ESSSupported -> all good!
ESSSupported ESSupportStatus = "ESSSupported"
)
func getESVersionSupportStatus ( version string ) ( res ESSupportStatus , err error ) {
parts := strings . Split ( version , "." )
if len ( parts ) < 1 {
err = fmt . Errorf ( "bad ES version string: %s" , version )
return
}
majorVersion , err := strconv . Atoi ( parts [ 0 ] )
if err != nil {
err = fmt . Errorf ( "bad ES version string: %s" , version )
return
}
switch {
case majorVersion <= 4 :
res = ESSUnsupported
case majorVersion <= 6 :
res = ESSDeprecated
default :
res = ESSSupported
}
return
}
// magic HH-256 key as HH-256 hash of the first 100 decimals of π as utf-8 string with a zero key.
var magicHighwayHash256Key = [ ] byte ( "\x4b\xe7\x34\xfa\x8e\x23\x8a\xcd\x26\x3e\x83\xe6\xbb\x96\x85\x52\x04\x0f\x93\x5d\xa3\x9f\x44\x14\x97\xe0\x9d\x13\x22\xde\x36\xa0" )
// Interface for elasticsearch client objects
type esClient interface {
isAtleastV7 ( ) bool
createIndex ( ElasticsearchArgs ) error
ping ( context . Context , ElasticsearchArgs ) ( bool , error )
stop ( )
entryExists ( context . Context , string , string ) ( bool , error )
removeEntry ( context . Context , string , string ) error
updateEntry ( context . Context , string , string , event . Event ) error
addEntry ( context . Context , string , event . Event ) error
}
2018-03-15 16:03:41 -04:00
// ElasticsearchArgs - Elasticsearch target arguments.
type ElasticsearchArgs struct {
2020-08-23 12:43:48 -04:00
Enable bool ` json:"enable" `
Format string ` json:"format" `
URL xnet . URL ` json:"url" `
Index string ` json:"index" `
QueueDir string ` json:"queueDir" `
QueueLimit uint64 ` json:"queueLimit" `
Transport * http . Transport ` json:"-" `
Username string ` json:"username" `
Password string ` json:"password" `
2018-03-15 16:03:41 -04:00
}
2018-07-18 14:22:29 -04:00
// Validate ElasticsearchArgs fields
func ( a ElasticsearchArgs ) Validate ( ) error {
if ! a . Enable {
return nil
}
if a . URL . IsEmpty ( ) {
return errors . New ( "empty URL" )
}
if a . Format != "" {
f := strings . ToLower ( a . Format )
if f != event . NamespaceFormat && f != event . AccessFormat {
return errors . New ( "format value unrecognized" )
}
}
if a . Index == "" {
return errors . New ( "empty index value" )
}
2020-08-23 12:43:48 -04:00
if ( a . Username == "" && a . Password != "" ) || ( a . Username != "" && a . Password == "" ) {
return errors . New ( "username and password should be set in pairs" )
}
2018-07-18 14:22:29 -04:00
return nil
}
2018-03-15 16:03:41 -04:00
// ElasticsearchTarget - Elasticsearch target.
type ElasticsearchTarget struct {
2020-04-14 14:19:25 -04:00
id event . TargetID
args ElasticsearchArgs
2021-05-10 18:06:58 -04:00
client esClient
2020-04-14 14:19:25 -04:00
store Store
loggerOnce func ( ctx context . Context , err error , id interface { } , errKind ... interface { } )
2018-03-15 16:03:41 -04:00
}
// ID - returns target ID.
func ( target * ElasticsearchTarget ) ID ( ) event . TargetID {
return target . id
}
2020-04-21 12:38:32 -04:00
// HasQueueStore - Checks if the queueStore has been configured for the target
func ( target * ElasticsearchTarget ) HasQueueStore ( ) bool {
return target . store != nil
}
2019-12-11 17:27:03 -05:00
// IsActive - Return true if target is up and active
func ( target * ElasticsearchTarget ) IsActive ( ) ( bool , error ) {
2020-04-28 16:57:56 -04:00
ctx , cancel := context . WithTimeout ( context . Background ( ) , 5 * time . Second )
defer cancel ( )
2021-05-10 18:06:58 -04:00
err := target . checkAndInitClient ( ctx )
2020-04-28 16:57:56 -04:00
if err != nil {
return false , err
2019-12-11 17:27:03 -05:00
}
2021-05-10 18:06:58 -04:00
return target . client . ping ( ctx , target . args )
2019-12-11 17:27:03 -05:00
}
2019-07-11 22:53:20 -04:00
// Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active.
2019-04-10 08:46:01 -04:00
func ( target * ElasticsearchTarget ) Save ( eventData event . Event ) error {
2019-07-11 22:53:20 -04:00
if target . store != nil {
return target . store . Put ( eventData )
}
2020-04-28 16:57:56 -04:00
err := target . send ( eventData )
2020-10-29 12:52:11 -04:00
if elastic . IsConnErr ( err ) || elastic . IsContextErr ( err ) || xnet . IsNetworkOrHostDown ( err , false ) {
2020-04-28 16:57:56 -04:00
return errNotConnected
2019-07-11 22:53:20 -04:00
}
2020-04-28 16:57:56 -04:00
return err
2019-04-10 08:46:01 -04:00
}
2019-07-11 22:53:20 -04:00
// send - sends the event to the target.
2019-04-10 08:46:01 -04:00
func ( target * ElasticsearchTarget ) send ( eventData event . Event ) error {
2021-05-10 18:06:58 -04:00
ctx , cancel := context . WithTimeout ( context . Background ( ) , 5 * time . Second )
defer cancel ( )
2018-03-15 16:03:41 -04:00
if target . args . Format == event . NamespaceFormat {
objectName , err := url . QueryUnescape ( eventData . S3 . Object . Key )
if err != nil {
return err
}
2021-05-10 18:06:58 -04:00
// Calculate a hash of the key for the id of the ES document.
// Id's are limited to 512 bytes in V7+, so we need to do this.
var keyHash string
{
key := eventData . S3 . Bucket . Name + "/" + objectName
if target . client . isAtleastV7 ( ) {
hh , _ := highwayhash . New ( magicHighwayHash256Key ) // New will never return error since key is 256 bit
hh . Write ( [ ] byte ( key ) )
hashBytes := hh . Sum ( nil )
keyHash = base64 . URLEncoding . EncodeToString ( hashBytes )
} else {
keyHash = key
}
}
2018-03-15 16:03:41 -04:00
if eventData . EventName == event . ObjectRemovedDelete {
2021-05-10 18:06:58 -04:00
err = target . client . removeEntry ( ctx , target . args . Index , keyHash )
2018-03-15 16:03:41 -04:00
} else {
2021-05-10 18:06:58 -04:00
err = target . client . updateEntry ( ctx , target . args . Index , keyHash , eventData )
2018-03-15 16:03:41 -04:00
}
return err
}
if target . args . Format == event . AccessFormat {
2021-05-10 18:06:58 -04:00
return target . client . addEntry ( ctx , target . args . Index , eventData )
2018-03-15 16:03:41 -04:00
}
return nil
}
2019-07-11 22:53:20 -04:00
// Send - reads an event from store and sends it to Elasticsearch.
2019-04-10 08:46:01 -04:00
func ( target * ElasticsearchTarget ) Send ( eventKey string ) error {
2021-05-10 18:06:58 -04:00
ctx , cancel := context . WithTimeout ( context . Background ( ) , 5 * time . Second )
defer cancel ( )
err := target . checkAndInitClient ( ctx )
if err != nil {
return err
2019-07-11 22:53:20 -04:00
}
eventData , eErr := target . store . Get ( eventKey )
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os . IsNotExist ( eErr ) {
return nil
}
return eErr
}
if err := target . send ( eventData ) ; err != nil {
2020-10-29 12:52:11 -04:00
if elastic . IsConnErr ( err ) || elastic . IsContextErr ( err ) || xnet . IsNetworkOrHostDown ( err , false ) {
2019-09-19 11:23:43 -04:00
return errNotConnected
}
2019-07-11 22:53:20 -04:00
return err
}
// Delete the event from store.
return target . store . Del ( eventKey )
2019-04-10 08:46:01 -04:00
}
2018-03-15 16:03:41 -04:00
// Close - does nothing and available for interface compatibility.
func ( target * ElasticsearchTarget ) Close ( ) error {
2020-01-09 09:15:44 -05:00
if target . client != nil {
// Stops the background processes that the client is running.
2021-05-10 18:06:58 -04:00
target . client . stop ( )
2020-01-09 09:15:44 -05:00
}
2018-03-15 16:03:41 -04:00
return nil
}
2021-05-10 18:06:58 -04:00
func ( target * ElasticsearchTarget ) checkAndInitClient ( ctx context . Context ) error {
if target . client != nil {
return nil
}
clientV7 , err := newClientV7 ( target . args )
if err != nil {
return err
}
// Check es version to confirm if it is supported.
serverSupportStatus , version , err := clientV7 . getServerSupportStatus ( ctx )
if err != nil {
return err
}
switch serverSupportStatus {
case ESSUnknown :
return errors . New ( "unable to determine support status of ES (should not happen)" )
case ESSDeprecated :
fmt . Printf ( "DEPRECATION WARNING: Support for Elasticsearch version '%s' will be dropped in a future release. Please upgrade to a version >= 7.x." , version )
target . client , err = newClientV56 ( target . args )
if err != nil {
return err
}
case ESSSupported :
target . client = clientV7
default :
// ESSUnsupported case
return fmt . Errorf ( "Elasticsearch version '%s' is not supported! Please use at least version 7.x." , version )
}
target . client . createIndex ( target . args )
return nil
}
// NewElasticsearchTarget - creates new Elasticsearch target.
func NewElasticsearchTarget ( id string , args ElasticsearchArgs , doneCh <- chan struct { } , loggerOnce func ( ctx context . Context , err error , id interface { } , kind ... interface { } ) , test bool ) ( * ElasticsearchTarget , error ) {
target := & ElasticsearchTarget {
id : event . TargetID { ID : id , Name : "elasticsearch" } ,
args : args ,
loggerOnce : loggerOnce ,
}
if args . QueueDir != "" {
queueDir := filepath . Join ( args . QueueDir , storePrefix + "-elasticsearch-" + id )
target . store = NewQueueStore ( queueDir , args . QueueLimit )
if err := target . store . Open ( ) ; err != nil {
target . loggerOnce ( context . Background ( ) , err , target . ID ( ) )
return target , err
}
}
ctx , cancel := context . WithTimeout ( context . Background ( ) , 5 * time . Second )
defer cancel ( )
err := target . checkAndInitClient ( ctx )
if err != nil {
if target . store == nil || err != errNotConnected {
target . loggerOnce ( context . Background ( ) , err , target . ID ( ) )
return target , err
}
}
if target . store != nil && ! test {
// Replays the events from the store.
eventKeyCh := replayEvents ( target . store , doneCh , target . loggerOnce , target . ID ( ) )
// Start replaying events from the store.
go sendEvents ( target , eventKeyCh , doneCh , target . loggerOnce )
}
return target , nil
}
// ES Client definitions and methods
type esClientV7 struct {
* elasticsearch7 . Client
}
func newClientV7 ( args ElasticsearchArgs ) ( * esClientV7 , error ) {
// Client options
elasticConfig := elasticsearch7 . Config {
Addresses : [ ] string { args . URL . String ( ) } ,
Transport : args . Transport ,
MaxRetries : 10 ,
}
// Set basic auth
if args . Username != "" && args . Password != "" {
elasticConfig . Username = args . Username
elasticConfig . Password = args . Password
}
// Create a client
client , err := elasticsearch7 . NewClient ( elasticConfig )
if err != nil {
return nil , err
}
clientV7 := & esClientV7 { client }
return clientV7 , nil
}
func ( c * esClientV7 ) getServerSupportStatus ( ctx context . Context ) ( ESSupportStatus , string , error ) {
resp , err := c . Info (
c . Info . WithContext ( ctx ) ,
)
if err != nil {
return ESSUnknown , "" , errNotConnected
}
defer resp . Body . Close ( )
m := make ( map [ string ] interface { } )
err = json . NewDecoder ( resp . Body ) . Decode ( & m )
if err != nil {
return ESSUnknown , "" , fmt . Errorf ( "unable to get ES Server version - json parse error: %v" , err )
}
if v , ok := m [ "version" ] . ( map [ string ] interface { } ) ; ok {
if ver , ok := v [ "number" ] . ( string ) ; ok {
status , err := getESVersionSupportStatus ( ver )
return status , ver , err
}
}
return ESSUnknown , "" , fmt . Errorf ( "Unable to get ES Server Version - got INFO response: %v" , m )
}
func ( c * esClientV7 ) isAtleastV7 ( ) bool {
return true
}
2019-07-11 22:53:20 -04:00
// createIndex - creates the index if it does not exist.
2021-05-10 18:06:58 -04:00
func ( c * esClientV7 ) createIndex ( args ElasticsearchArgs ) error {
res , err := c . Indices . ResolveIndex ( [ ] string { args . Index } )
2018-03-15 16:03:41 -04:00
if err != nil {
2019-07-11 22:53:20 -04:00
return err
2018-03-15 16:03:41 -04:00
}
2021-05-10 18:06:58 -04:00
defer res . Body . Close ( )
var v map [ string ] interface { }
found := false
if err := json . NewDecoder ( res . Body ) . Decode ( & v ) ; err != nil {
return fmt . Errorf ( "Error parsing response body: %v" , err )
}
indices := v [ "indices" ] . ( [ ] interface { } )
for _ , index := range indices {
name := index . ( map [ string ] interface { } ) [ "name" ]
if name == args . Index {
found = true
break
}
}
if ! found {
resp , err := c . Indices . Create ( args . Index )
if err != nil {
2019-07-11 22:53:20 -04:00
return err
2018-03-15 16:03:41 -04:00
}
2021-05-10 18:06:58 -04:00
defer resp . Body . Close ( )
if resp . IsError ( ) {
err := fmt . Errorf ( "Create index err: %s" , res . String ( ) )
return err
}
io . Copy ( ioutil . Discard , resp . Body )
return nil
}
return nil
}
2018-03-15 16:03:41 -04:00
2021-05-10 18:06:58 -04:00
func ( c * esClientV7 ) ping ( ctx context . Context , _ ElasticsearchArgs ) ( bool , error ) {
resp , err := c . Ping (
c . Ping . WithContext ( ctx ) ,
)
if err != nil {
return false , errNotConnected
}
io . Copy ( ioutil . Discard , resp . Body )
resp . Body . Close ( )
return ! resp . IsError ( ) , nil
}
func ( c * esClientV7 ) entryExists ( ctx context . Context , index string , key string ) ( bool , error ) {
res , err := c . Exists (
index ,
key ,
c . Exists . WithContext ( ctx ) ,
)
if err != nil {
return false , err
}
io . Copy ( ioutil . Discard , res . Body )
res . Body . Close ( )
return ! res . IsError ( ) , nil
}
func ( c * esClientV7 ) removeEntry ( ctx context . Context , index string , key string ) error {
exists , err := c . entryExists ( ctx , index , key )
if err == nil && exists {
res , err := c . Delete (
index ,
key ,
c . Delete . WithContext ( ctx ) ,
)
if err != nil {
return err
2019-07-11 22:53:20 -04:00
}
2021-05-10 18:06:58 -04:00
defer res . Body . Close ( )
if res . IsError ( ) {
err := fmt . Errorf ( "Delete err: %s" , res . String ( ) )
return err
}
io . Copy ( ioutil . Discard , res . Body )
return nil
}
return err
}
func ( c * esClientV7 ) updateEntry ( ctx context . Context , index string , key string , eventData event . Event ) error {
doc := map [ string ] interface { } {
"Records" : [ ] event . Event { eventData } ,
}
var buf bytes . Buffer
enc := json . NewEncoder ( & buf )
err := enc . Encode ( doc )
if err != nil {
return err
}
res , err := c . Index (
index ,
& buf ,
c . Index . WithDocumentID ( key ) ,
c . Index . WithContext ( ctx ) ,
)
if err != nil {
return err
}
defer res . Body . Close ( )
if res . IsError ( ) {
err := fmt . Errorf ( "Update err: %s" , res . String ( ) )
return err
}
io . Copy ( ioutil . Discard , res . Body )
return nil
}
func ( c * esClientV7 ) addEntry ( ctx context . Context , index string , eventData event . Event ) error {
doc := map [ string ] interface { } {
"Records" : [ ] event . Event { eventData } ,
}
var buf bytes . Buffer
enc := json . NewEncoder ( & buf )
err := enc . Encode ( doc )
if err != nil {
return err
}
res , err := c . Index (
index ,
& buf ,
c . Index . WithContext ( ctx ) ,
)
if err != nil {
return err
}
defer res . Body . Close ( )
if res . IsError ( ) {
err := fmt . Errorf ( "Add err: %s" , res . String ( ) )
return err
2019-07-11 22:53:20 -04:00
}
2021-05-10 18:06:58 -04:00
io . Copy ( ioutil . Discard , res . Body )
2019-07-11 22:53:20 -04:00
return nil
}
2021-05-10 18:06:58 -04:00
func ( c * esClientV7 ) stop ( ) {
}
// For versions under 7
type esClientV56 struct {
* elastic . Client
}
func newClientV56 ( args ElasticsearchArgs ) ( * esClientV56 , error ) {
2020-08-23 12:43:48 -04:00
// Client options
options := [ ] elastic . ClientOptionFunc { elastic . SetURL ( args . URL . String ( ) ) ,
elastic . SetMaxRetries ( 10 ) ,
elastic . SetSniff ( false ) ,
elastic . SetHttpClient ( & http . Client { Transport : args . Transport } ) }
// Set basic auth
if args . Username != "" && args . Password != "" {
options = append ( options , elastic . SetBasicAuth ( args . Username , args . Password ) )
}
// Create a client
client , err := elastic . NewClient ( options ... )
2020-04-28 16:57:56 -04:00
if err != nil {
// https://github.com/olivere/elastic/wiki/Connection-Errors
2020-10-29 12:52:11 -04:00
if elastic . IsConnErr ( err ) || elastic . IsContextErr ( err ) || xnet . IsNetworkOrHostDown ( err , false ) {
2020-04-28 16:57:56 -04:00
return nil , errNotConnected
2018-03-15 16:03:41 -04:00
}
2020-04-28 16:57:56 -04:00
return nil , err
}
2021-05-10 18:06:58 -04:00
return & esClientV56 { client } , nil
2019-07-11 22:53:20 -04:00
}
2021-05-10 18:06:58 -04:00
func ( c * esClientV56 ) isAtleastV7 ( ) bool {
return false
}
// createIndex - creates the index if it does not exist.
func ( c * esClientV56 ) createIndex ( args ElasticsearchArgs ) error {
exists , err := c . IndexExists ( args . Index ) . Do ( context . Background ( ) )
if err != nil {
return err
2020-04-14 14:19:25 -04:00
}
2021-05-10 18:06:58 -04:00
if ! exists {
var createIndex * elastic . IndicesCreateResult
if createIndex , err = c . CreateIndex ( args . Index ) . Do ( context . Background ( ) ) ; err != nil {
return err
}
2020-04-14 14:19:25 -04:00
2021-05-10 18:06:58 -04:00
if ! createIndex . Acknowledged {
return fmt . Errorf ( "index %v not created" , args . Index )
2019-07-11 22:53:20 -04:00
}
}
2021-05-10 18:06:58 -04:00
return nil
}
2019-07-11 22:53:20 -04:00
2021-05-10 18:06:58 -04:00
func ( c * esClientV56 ) ping ( ctx context . Context , args ElasticsearchArgs ) ( bool , error ) {
_ , code , err := c . Ping ( args . URL . String ( ) ) . HttpHeadOnly ( true ) . Do ( ctx )
2020-04-28 16:57:56 -04:00
if err != nil {
2021-05-10 18:06:58 -04:00
if elastic . IsConnErr ( err ) || elastic . IsContextErr ( err ) || xnet . IsNetworkOrHostDown ( err , false ) {
return false , errNotConnected
2019-07-11 22:53:20 -04:00
}
2021-05-10 18:06:58 -04:00
return false , err
2019-07-11 22:53:20 -04:00
}
2021-05-10 18:06:58 -04:00
return ! ( code >= http . StatusBadRequest ) , nil
2019-07-11 22:53:20 -04:00
2021-05-10 18:06:58 -04:00
}
func ( c * esClientV56 ) entryExists ( ctx context . Context , index string , key string ) ( bool , error ) {
return c . Exists ( ) . Index ( index ) . Type ( "event" ) . Id ( key ) . Do ( ctx )
}
func ( c * esClientV56 ) removeEntry ( ctx context . Context , index string , key string ) error {
exists , err := c . entryExists ( ctx , index , key )
if err == nil && exists {
_ , err = c . Delete ( ) . Index ( index ) . Type ( "event" ) . Id ( key ) . Do ( ctx )
2019-07-11 22:53:20 -04:00
}
2021-05-10 18:06:58 -04:00
return err
2019-07-11 22:53:20 -04:00
2021-05-10 18:06:58 -04:00
}
func ( c * esClientV56 ) updateEntry ( ctx context . Context , index string , key string , eventData event . Event ) error {
_ , err := c . Index ( ) . Index ( index ) . Type ( "event" ) . BodyJson ( map [ string ] interface { } { "Records" : [ ] event . Event { eventData } } ) . Id ( key ) . Do ( ctx )
return err
}
func ( c * esClientV56 ) addEntry ( ctx context . Context , index string , eventData event . Event ) error {
_ , err := c . Index ( ) . Index ( index ) . Type ( "event" ) . BodyJson ( map [ string ] interface { } { "Records" : [ ] event . Event { eventData } } ) . Do ( ctx )
return err
}
func ( c * esClientV56 ) stop ( ) {
c . Stop ( )
2018-03-15 16:03:41 -04:00
}