mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
events: ElasticSearch doesnt support objects with '/' in them. (#2747)
Fix this by using a unique sha256 generated for each unique key.
This commit is contained in:
parent
a5066e8f76
commit
018c90dae7
@ -204,7 +204,7 @@ func eventNotify(event eventData) {
|
|||||||
targetLog := globalEventNotifier.GetQueueTarget(qConfig.QueueARN)
|
targetLog := globalEventNotifier.GetQueueTarget(qConfig.QueueARN)
|
||||||
if targetLog != nil {
|
if targetLog != nil {
|
||||||
targetLog.WithFields(logrus.Fields{
|
targetLog.WithFields(logrus.Fields{
|
||||||
"Key": objectName,
|
"Key": path.Join(event.Bucket, objectName),
|
||||||
"EventType": eventType,
|
"EventType": eventType,
|
||||||
"Records": notificationEvent,
|
"Records": notificationEvent,
|
||||||
}).Info()
|
}).Info()
|
||||||
|
@ -17,10 +17,12 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
|
"github.com/minio/sha256-simd"
|
||||||
"gopkg.in/olivere/elastic.v3"
|
"gopkg.in/olivere/elastic.v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -41,7 +43,11 @@ func dialElastic(esNotify elasticSearchNotify) (*elastic.Client, error) {
|
|||||||
if !esNotify.Enable {
|
if !esNotify.Enable {
|
||||||
return nil, errNotifyNotEnabled
|
return nil, errNotifyNotEnabled
|
||||||
}
|
}
|
||||||
client, err := elastic.NewClient(elastic.SetURL(esNotify.URL), elastic.SetSniff(false))
|
client, err := elastic.NewClient(
|
||||||
|
elastic.SetURL(esNotify.URL),
|
||||||
|
elastic.SetSniff(false),
|
||||||
|
elastic.SetMaxRetries(10),
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -70,7 +76,7 @@ func newElasticNotify(accountID string) (*logrus.Logger, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if !createIndex.Acknowledged {
|
if !createIndex.Acknowledged {
|
||||||
return nil, errors.New("index not created")
|
return nil, errors.New("Index not created.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -106,19 +112,24 @@ func (q elasticClient) Fire(entry *logrus.Entry) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Calculate a unique key id. Choosing sha256 here.
|
||||||
|
shaKey := sha256.Sum256([]byte(keyStr))
|
||||||
|
keyStr = hex.EncodeToString(shaKey[:])
|
||||||
|
|
||||||
// If event matches as delete, we purge the previous index.
|
// If event matches as delete, we purge the previous index.
|
||||||
if eventMatch(entryStr, []string{"s3:ObjectRemoved:*"}) {
|
if eventMatch(entryStr, []string{"s3:ObjectRemoved:*"}) {
|
||||||
_, err := q.Client.DeleteIndex(keyStr).Do()
|
_, err := q.Client.Delete().Index(q.params.Index).
|
||||||
|
Type("event").Id(keyStr).Do()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
} // else we update elastic index or create a new one.
|
} // else we update elastic index or create a new one.
|
||||||
_, err := q.Client.Index().Index(keyStr).
|
_, err := q.Client.Index().Index(q.params.Index).
|
||||||
Type("event").
|
Type("event").
|
||||||
BodyJson(map[string]interface{}{
|
BodyJson(map[string]interface{}{
|
||||||
"Records": entry.Data["Records"],
|
"Records": entry.Data["Records"],
|
||||||
}).Do()
|
}).Id(keyStr).Do()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user