From a4305742e848845fe3038f8eefe3894f52e3986e Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Wed, 19 Apr 2017 11:26:35 -0700 Subject: [PATCH] Add key for Kafka messages (fixes #4143) (#4151) --- cmd/notify-kafka.go | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/cmd/notify-kafka.go b/cmd/notify-kafka.go index 8a938855a..acd4ec80b 100644 --- a/cmd/notify-kafka.go +++ b/cmd/notify-kafka.go @@ -17,8 +17,6 @@ package cmd import ( - "errors" - "fmt" "io/ioutil" "net" @@ -27,6 +25,10 @@ import ( sarama "gopkg.in/Shopify/sarama.v1" ) +var ( + kkErrFunc = newNotificationErrorFactory("Kafka") +) + // kafkaNotify holds the configuration of the Kafka server/cluster to // send notifications to. type kafkaNotify struct { @@ -46,7 +48,7 @@ func (k *kafkaNotify) Validate() error { return nil } if len(k.Brokers) == 0 { - return errors.New("No broker specified") + return kkErrFunc("No broker(s) specified.") } // Validate all specified brokers. for _, brokerAddr := range k.Brokers { @@ -70,8 +72,8 @@ func dialKafka(kn kafkaNotify) (kafkaConn, error) { } if kn.Topic == "" { - return kafkaConn{}, fmt.Errorf( - "Kafka Notifier Error: Topic was not specified in configuration") + return kafkaConn{}, kkErrFunc( + "Topic was not specified in configuration") } config := sarama.NewConfig() @@ -83,10 +85,7 @@ func dialKafka(kn kafkaNotify) (kafkaConn, error) { p, err := sarama.NewSyncProducer(kn.Brokers, config) if err != nil { - return kafkaConn{}, fmt.Errorf( - "Kafka Notifier Error: Failed to start producer: %v", - err, - ) + return kafkaConn{}, kkErrFunc("Failed to start producer: %v", err) } return kafkaConn{p, kn.Topic}, nil @@ -121,16 +120,24 @@ func (kC kafkaConn) Fire(entry *logrus.Entry) error { return err } + // Extract the key of the event as a string + keyStr, ok := entry.Data["Key"].(string) + if !ok { + return kkErrFunc("Unable to convert event key %v to string.", + entry.Data["Key"]) + } + // Construct message to send to Kafka msg := sarama.ProducerMessage{ Topic: kC.topic, + Key: sarama.StringEncoder(keyStr), Value: sarama.ByteEncoder(body.Bytes()), } // Attempt sending the message to Kafka _, _, err = kC.producer.SendMessage(&msg) if err != nil { - return fmt.Errorf("Error sending event to Kafka - %v", err) + return kkErrFunc("Error sending event to Kafka - %v", err) } return nil }