mirror of
https://github.com/minio/minio.git
synced 2025-04-04 11:50:36 -04:00
parent
640ebb2f79
commit
a4305742e8
@ -17,8 +17,6 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
@ -27,6 +25,10 @@ import (
|
|||||||
sarama "gopkg.in/Shopify/sarama.v1"
|
sarama "gopkg.in/Shopify/sarama.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
kkErrFunc = newNotificationErrorFactory("Kafka")
|
||||||
|
)
|
||||||
|
|
||||||
// kafkaNotify holds the configuration of the Kafka server/cluster to
|
// kafkaNotify holds the configuration of the Kafka server/cluster to
|
||||||
// send notifications to.
|
// send notifications to.
|
||||||
type kafkaNotify struct {
|
type kafkaNotify struct {
|
||||||
@ -46,7 +48,7 @@ func (k *kafkaNotify) Validate() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if len(k.Brokers) == 0 {
|
if len(k.Brokers) == 0 {
|
||||||
return errors.New("No broker specified")
|
return kkErrFunc("No broker(s) specified.")
|
||||||
}
|
}
|
||||||
// Validate all specified brokers.
|
// Validate all specified brokers.
|
||||||
for _, brokerAddr := range k.Brokers {
|
for _, brokerAddr := range k.Brokers {
|
||||||
@ -70,8 +72,8 @@ func dialKafka(kn kafkaNotify) (kafkaConn, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if kn.Topic == "" {
|
if kn.Topic == "" {
|
||||||
return kafkaConn{}, fmt.Errorf(
|
return kafkaConn{}, kkErrFunc(
|
||||||
"Kafka Notifier Error: Topic was not specified in configuration")
|
"Topic was not specified in configuration")
|
||||||
}
|
}
|
||||||
|
|
||||||
config := sarama.NewConfig()
|
config := sarama.NewConfig()
|
||||||
@ -83,10 +85,7 @@ func dialKafka(kn kafkaNotify) (kafkaConn, error) {
|
|||||||
|
|
||||||
p, err := sarama.NewSyncProducer(kn.Brokers, config)
|
p, err := sarama.NewSyncProducer(kn.Brokers, config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return kafkaConn{}, fmt.Errorf(
|
return kafkaConn{}, kkErrFunc("Failed to start producer: %v", err)
|
||||||
"Kafka Notifier Error: Failed to start producer: %v",
|
|
||||||
err,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return kafkaConn{p, kn.Topic}, nil
|
return kafkaConn{p, kn.Topic}, nil
|
||||||
@ -121,16 +120,24 @@ func (kC kafkaConn) Fire(entry *logrus.Entry) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Extract the key of the event as a string
|
||||||
|
keyStr, ok := entry.Data["Key"].(string)
|
||||||
|
if !ok {
|
||||||
|
return kkErrFunc("Unable to convert event key %v to string.",
|
||||||
|
entry.Data["Key"])
|
||||||
|
}
|
||||||
|
|
||||||
// Construct message to send to Kafka
|
// Construct message to send to Kafka
|
||||||
msg := sarama.ProducerMessage{
|
msg := sarama.ProducerMessage{
|
||||||
Topic: kC.topic,
|
Topic: kC.topic,
|
||||||
|
Key: sarama.StringEncoder(keyStr),
|
||||||
Value: sarama.ByteEncoder(body.Bytes()),
|
Value: sarama.ByteEncoder(body.Bytes()),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Attempt sending the message to Kafka
|
// Attempt sending the message to Kafka
|
||||||
_, _, err = kC.producer.SendMessage(&msg)
|
_, _, err = kC.producer.SendMessage(&msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Error sending event to Kafka - %v", err)
|
return kkErrFunc("Error sending event to Kafka - %v", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user