Add support for Kafka as a notifications target (#2869) (#3439)

This commit is contained in:
Aditya Manthramurthy
2016-12-15 21:53:48 +05:30
committed by Harshavardhana
parent 664ff063a1
commit 8e6e9301ce
113 changed files with 16290 additions and 35 deletions

View File

@@ -38,6 +38,8 @@ const (
queueTypeRedis = "redis"
// Static string indicating queue type 'postgresql'.
queueTypePostgreSQL = "postgresql"
// Static string indicating queue type 'kafka'.
queueTypeKafka = "kafka"
)
// Topic type.
@@ -58,6 +60,7 @@ type notifier struct {
ElasticSearch map[string]elasticSearchNotify `json:"elasticsearch"`
Redis map[string]redisNotify `json:"redis"`
PostgreSQL map[string]postgreSQLNotify `json:"postgresql"`
Kafka map[string]kafkaNotify `json:"kafka"`
// Add new notification queues.
}
@@ -154,6 +157,24 @@ func isPostgreSQLQueue(sqsArn arnSQS) bool {
return true
}
// Returns true if queueArn is for Kafka.
func isKafkaQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeKafka {
return false
}
kafkaNotifyCfg := serverConfig.GetKafkaNotifyByID(sqsArn.AccountID)
if !kafkaNotifyCfg.Enable {
return false
}
kafkaC, err := dialKafka(kafkaNotifyCfg)
if err != nil {
errorIf(err, "Unable to dial Kafka server %#v", kafkaNotifyCfg)
return false
}
defer kafkaC.Close()
return true
}
// Match function matches wild cards in 'pattern' for events.
func eventMatch(eventType string, events []string) (ok bool) {
for _, event := range events {