minio/cmd/notifiers.go
Harshavardhana 533338bdeb all/windows: Be case in-sensitive about pattern matching. (#3682)
Resource strings and paths are case insensitive on windows
deployments but if user happens to use upper case instead of
lower case for certain configuration params like bucket
policies and bucket notification config. We might not honor
them which leads to a wrong behavior on windows.

This is windows only behavior, for all other platforms case
is still kept sensitive.
2017-02-03 23:27:50 -08:00

215 lines
5.6 KiB
Go

/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"errors"
"github.com/minio/minio/pkg/wildcard"
)
// SQS type.
const (
// Minio sqs ARN prefix.
minioSqs = "arn:minio:sqs:"
// Static string indicating queue type 'amqp'.
queueTypeAMQP = "amqp"
// Static string indicating queue type 'nats'.
queueTypeNATS = "nats"
// Static string indicating queue type 'elasticsearch'.
queueTypeElastic = "elasticsearch"
// Static string indicating queue type 'redis'.
queueTypeRedis = "redis"
// Static string indicating queue type 'postgresql'.
queueTypePostgreSQL = "postgresql"
// Static string indicating queue type 'kafka'.
queueTypeKafka = "kafka"
// Static string for Webhooks
queueTypeWebhook = "webhook"
)
// Topic type.
const (
// Minio topic ARN prefix.
minioTopic = "arn:minio:sns:"
// Static string indicating sns type 'listen'.
snsTypeMinio = "listen"
)
var errNotifyNotEnabled = errors.New("requested notifier not enabled")
// Notifier represents collection of supported notification queues.
type notifier struct {
AMQP map[string]amqpNotify `json:"amqp"`
NATS map[string]natsNotify `json:"nats"`
ElasticSearch map[string]elasticSearchNotify `json:"elasticsearch"`
Redis map[string]redisNotify `json:"redis"`
PostgreSQL map[string]postgreSQLNotify `json:"postgresql"`
Kafka map[string]kafkaNotify `json:"kafka"`
Webhook map[string]webhookNotify `json:"webhook"`
// Add new notification queues.
}
// Returns true if queueArn is for an AMQP queue.
func isAMQPQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeAMQP {
return false
}
amqpL := serverConfig.GetAMQPNotifyByID(sqsArn.AccountID)
if !amqpL.Enable {
return false
}
// Connect to amqp server to validate.
amqpC, err := dialAMQP(amqpL)
if err != nil {
errorIf(err, "Unable to connect to amqp service. %#v", amqpL)
return false
}
defer amqpC.Close()
return true
}
// Returns true if natsArn is for an NATS queue.
func isNATSQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeNATS {
return false
}
natsL := serverConfig.GetNATSNotifyByID(sqsArn.AccountID)
if !natsL.Enable {
return false
}
// Connect to nats server to validate.
natsC, err := dialNATS(natsL, true)
if err != nil {
errorIf(err, "Unable to connect to nats service. %#v", natsL)
return false
}
closeNATS(natsC)
return true
}
// Returns true if queueArn is for an Webhook queue
func isWebhookQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeWebhook {
return false
}
rNotify := serverConfig.GetWebhookNotifyByID(sqsArn.AccountID)
if !rNotify.Enable {
return false
}
return true
}
// Returns true if queueArn is for an Redis queue.
func isRedisQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeRedis {
return false
}
rNotify := serverConfig.GetRedisNotifyByID(sqsArn.AccountID)
if !rNotify.Enable {
return false
}
// Connect to redis server to validate.
rPool, err := dialRedis(rNotify)
if err != nil {
errorIf(err, "Unable to connect to redis service. %#v", rNotify)
return false
}
defer rPool.Close()
return true
}
// Returns true if queueArn is for an ElasticSearch queue.
func isElasticQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeElastic {
return false
}
esNotify := serverConfig.GetElasticSearchNotifyByID(sqsArn.AccountID)
if !esNotify.Enable {
return false
}
elasticC, err := dialElastic(esNotify)
if err != nil {
errorIf(err, "Unable to connect to elasticsearch service %#v", esNotify)
return false
}
defer elasticC.Stop()
return true
}
// Returns true if queueArn is for PostgreSQL.
func isPostgreSQLQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypePostgreSQL {
return false
}
pgNotify := serverConfig.GetPostgreSQLNotifyByID(sqsArn.AccountID)
if !pgNotify.Enable {
return false
}
pgC, err := dialPostgreSQL(pgNotify)
if err != nil {
errorIf(err, "Unable to connect to PostgreSQL server %#v", pgNotify)
return false
}
defer pgC.Close()
return true
}
// Returns true if queueArn is for Kafka.
func isKafkaQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeKafka {
return false
}
kafkaNotifyCfg := serverConfig.GetKafkaNotifyByID(sqsArn.AccountID)
if !kafkaNotifyCfg.Enable {
return false
}
kafkaC, err := dialKafka(kafkaNotifyCfg)
if err != nil {
errorIf(err, "Unable to dial Kafka server %#v", kafkaNotifyCfg)
return false
}
defer kafkaC.Close()
return true
}
// Match function matches wild cards in 'pattern' for events.
func eventMatch(eventType string, events []string) (ok bool) {
for _, event := range events {
ok = wildcard.MatchSimple(event, eventType)
if ok {
break
}
}
return ok
}
// Filter rule match, matches an object against the filter rules.
func filterRuleMatch(object string, frs []filterRule) bool {
var prefixMatch, suffixMatch = true, true
for _, fr := range frs {
if isValidFilterNamePrefix(fr.Name) {
prefixMatch = hasPrefix(object, fr.Value)
} else if isValidFilterNameSuffix(fr.Name) {
suffixMatch = hasSuffix(object, fr.Value)
}
}
return prefixMatch && suffixMatch
}