rename all remaining packages to internal/ (#12418)

This is to ensure that there are no projects
that try to import `minio/minio/pkg` into
their own repo. Any such common packages should
go to `https://github.com/minio/pkg`
This commit is contained in:
Harshavardhana
2021-06-01 14:59:40 -07:00
committed by GitHub
parent bf87c4b1e4
commit 1f262daf6f
540 changed files with 757 additions and 778 deletions

View File

@@ -0,0 +1,311 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package target
import (
"context"
"encoding/json"
"errors"
"net"
"net/url"
"os"
"path/filepath"
"sync"
"github.com/minio/minio/internal/event"
xnet "github.com/minio/minio/internal/net"
"github.com/streadway/amqp"
)
// AMQPArgs - AMQP target arguments.
type AMQPArgs struct {
Enable bool `json:"enable"`
URL xnet.URL `json:"url"`
Exchange string `json:"exchange"`
RoutingKey string `json:"routingKey"`
ExchangeType string `json:"exchangeType"`
DeliveryMode uint8 `json:"deliveryMode"`
Mandatory bool `json:"mandatory"`
Immediate bool `json:"immediate"`
Durable bool `json:"durable"`
Internal bool `json:"internal"`
NoWait bool `json:"noWait"`
AutoDeleted bool `json:"autoDeleted"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
}
//lint:file-ignore ST1003 We cannot change these exported names.
// AMQP input constants.
const (
AmqpQueueDir = "queue_dir"
AmqpQueueLimit = "queue_limit"
AmqpURL = "url"
AmqpExchange = "exchange"
AmqpRoutingKey = "routing_key"
AmqpExchangeType = "exchange_type"
AmqpDeliveryMode = "delivery_mode"
AmqpMandatory = "mandatory"
AmqpImmediate = "immediate"
AmqpDurable = "durable"
AmqpInternal = "internal"
AmqpNoWait = "no_wait"
AmqpAutoDeleted = "auto_deleted"
AmqpArguments = "arguments"
AmqpPublishingHeaders = "publishing_headers"
EnvAMQPEnable = "MINIO_NOTIFY_AMQP_ENABLE"
EnvAMQPURL = "MINIO_NOTIFY_AMQP_URL"
EnvAMQPExchange = "MINIO_NOTIFY_AMQP_EXCHANGE"
EnvAMQPRoutingKey = "MINIO_NOTIFY_AMQP_ROUTING_KEY"
EnvAMQPExchangeType = "MINIO_NOTIFY_AMQP_EXCHANGE_TYPE"
EnvAMQPDeliveryMode = "MINIO_NOTIFY_AMQP_DELIVERY_MODE"
EnvAMQPMandatory = "MINIO_NOTIFY_AMQP_MANDATORY"
EnvAMQPImmediate = "MINIO_NOTIFY_AMQP_IMMEDIATE"
EnvAMQPDurable = "MINIO_NOTIFY_AMQP_DURABLE"
EnvAMQPInternal = "MINIO_NOTIFY_AMQP_INTERNAL"
EnvAMQPNoWait = "MINIO_NOTIFY_AMQP_NO_WAIT"
EnvAMQPAutoDeleted = "MINIO_NOTIFY_AMQP_AUTO_DELETED"
EnvAMQPArguments = "MINIO_NOTIFY_AMQP_ARGUMENTS"
EnvAMQPPublishingHeaders = "MINIO_NOTIFY_AMQP_PUBLISHING_HEADERS"
EnvAMQPQueueDir = "MINIO_NOTIFY_AMQP_QUEUE_DIR"
EnvAMQPQueueLimit = "MINIO_NOTIFY_AMQP_QUEUE_LIMIT"
)
// Validate AMQP arguments
func (a *AMQPArgs) Validate() error {
if !a.Enable {
return nil
}
if _, err := amqp.ParseURI(a.URL.String()); err != nil {
return err
}
if a.QueueDir != "" {
if !filepath.IsAbs(a.QueueDir) {
return errors.New("queueDir path should be absolute")
}
}
return nil
}
// AMQPTarget - AMQP target
type AMQPTarget struct {
id event.TargetID
args AMQPArgs
conn *amqp.Connection
connMutex sync.Mutex
store Store
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
}
// ID - returns TargetID.
func (target *AMQPTarget) ID() event.TargetID {
return target.id
}
// IsActive - Return true if target is up and active
func (target *AMQPTarget) IsActive() (bool, error) {
ch, err := target.channel()
if err != nil {
return false, err
}
defer func() {
ch.Close()
}()
return true, nil
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *AMQPTarget) HasQueueStore() bool {
return target.store != nil
}
func (target *AMQPTarget) channel() (*amqp.Channel, error) {
var err error
var conn *amqp.Connection
var ch *amqp.Channel
isAMQPClosedErr := func(err error) bool {
if err == amqp.ErrClosed {
return true
}
if nerr, ok := err.(*net.OpError); ok {
return (nerr.Err.Error() == "use of closed network connection")
}
return false
}
target.connMutex.Lock()
defer target.connMutex.Unlock()
if target.conn != nil {
ch, err = target.conn.Channel()
if err == nil {
return ch, nil
}
if !isAMQPClosedErr(err) {
return nil, err
}
}
conn, err = amqp.Dial(target.args.URL.String())
if err != nil {
if IsConnRefusedErr(err) {
return nil, errNotConnected
}
return nil, err
}
ch, err = conn.Channel()
if err != nil {
return nil, err
}
target.conn = conn
return ch, nil
}
// send - sends an event to the AMQP.
func (target *AMQPTarget) send(eventData event.Event, ch *amqp.Channel) error {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
data, err := json.Marshal(event.Log{EventName: eventData.EventName, Key: key, Records: []event.Event{eventData}})
if err != nil {
return err
}
if err = ch.ExchangeDeclare(target.args.Exchange, target.args.ExchangeType, target.args.Durable,
target.args.AutoDeleted, target.args.Internal, target.args.NoWait, nil); err != nil {
return err
}
return ch.Publish(target.args.Exchange, target.args.RoutingKey, target.args.Mandatory,
target.args.Immediate, amqp.Publishing{
ContentType: "application/json",
DeliveryMode: target.args.DeliveryMode,
Body: data,
})
}
// Save - saves the events to the store which will be replayed when the amqp connection is active.
func (target *AMQPTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
ch, err := target.channel()
if err != nil {
return err
}
defer func() {
cErr := ch.Close()
target.loggerOnce(context.Background(), cErr, target.ID())
}()
return target.send(eventData, ch)
}
// Send - sends event to AMQP.
func (target *AMQPTarget) Send(eventKey string) error {
ch, err := target.channel()
if err != nil {
return err
}
defer func() {
cErr := ch.Close()
target.loggerOnce(context.Background(), cErr, target.ID())
}()
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os.IsNotExist(eErr) {
return nil
}
return eErr
}
if err := target.send(eventData, ch); err != nil {
return err
}
// Delete the event from store.
return target.store.Del(eventKey)
}
// Close - does nothing and available for interface compatibility.
func (target *AMQPTarget) Close() error {
if target.conn != nil {
return target.conn.Close()
}
return nil
}
// NewAMQPTarget - creates new AMQP target.
func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}), test bool) (*AMQPTarget, error) {
var conn *amqp.Connection
var err error
var store Store
target := &AMQPTarget{
id: event.TargetID{ID: id, Name: "amqp"},
args: args,
loggerOnce: loggerOnce,
}
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-amqp-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
target.loggerOnce(context.Background(), oErr, target.ID())
return target, oErr
}
target.store = store
}
conn, err = amqp.Dial(args.URL.String())
if err != nil {
if store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
}
target.conn = conn
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil
}

View File

@@ -0,0 +1,29 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package target
import "github.com/google/uuid"
func getNewUUID() (string, error) {
u, err := uuid.NewRandom()
if err != nil {
return "", err
}
return u.String(), nil
}

View File

@@ -0,0 +1,649 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package target
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/minio/minio/internal/event"
xnet "github.com/minio/minio/internal/net"
"github.com/pkg/errors"
elasticsearch7 "github.com/elastic/go-elasticsearch/v7"
"github.com/minio/highwayhash"
"github.com/olivere/elastic/v7"
)
// Elastic constants
const (
ElasticFormat = "format"
ElasticURL = "url"
ElasticIndex = "index"
ElasticQueueDir = "queue_dir"
ElasticQueueLimit = "queue_limit"
ElasticUsername = "username"
ElasticPassword = "password"
EnvElasticEnable = "MINIO_NOTIFY_ELASTICSEARCH_ENABLE"
EnvElasticFormat = "MINIO_NOTIFY_ELASTICSEARCH_FORMAT"
EnvElasticURL = "MINIO_NOTIFY_ELASTICSEARCH_URL"
EnvElasticIndex = "MINIO_NOTIFY_ELASTICSEARCH_INDEX"
EnvElasticQueueDir = "MINIO_NOTIFY_ELASTICSEARCH_QUEUE_DIR"
EnvElasticQueueLimit = "MINIO_NOTIFY_ELASTICSEARCH_QUEUE_LIMIT"
EnvElasticUsername = "MINIO_NOTIFY_ELASTICSEARCH_USERNAME"
EnvElasticPassword = "MINIO_NOTIFY_ELASTICSEARCH_PASSWORD"
)
// ESSupportStatus is a typed string representing the support status for
// Elasticsearch
type ESSupportStatus string
const (
// ESSUnknown is default value
ESSUnknown ESSupportStatus = "ESSUnknown"
// ESSDeprecated -> support will be removed in future
ESSDeprecated ESSupportStatus = "ESSDeprecated"
// ESSUnsupported -> we wont work with this ES server
ESSUnsupported ESSupportStatus = "ESSUnsupported"
// ESSSupported -> all good!
ESSSupported ESSupportStatus = "ESSSupported"
)
func getESVersionSupportStatus(version string) (res ESSupportStatus, err error) {
parts := strings.Split(version, ".")
if len(parts) < 1 {
err = fmt.Errorf("bad ES version string: %s", version)
return
}
majorVersion, err := strconv.Atoi(parts[0])
if err != nil {
err = fmt.Errorf("bad ES version string: %s", version)
return
}
switch {
case majorVersion <= 4:
res = ESSUnsupported
case majorVersion <= 6:
res = ESSDeprecated
default:
res = ESSSupported
}
return
}
// magic HH-256 key as HH-256 hash of the first 100 decimals of π as utf-8 string with a zero key.
var magicHighwayHash256Key = []byte("\x4b\xe7\x34\xfa\x8e\x23\x8a\xcd\x26\x3e\x83\xe6\xbb\x96\x85\x52\x04\x0f\x93\x5d\xa3\x9f\x44\x14\x97\xe0\x9d\x13\x22\xde\x36\xa0")
// Interface for elasticsearch client objects
type esClient interface {
isAtleastV7() bool
createIndex(ElasticsearchArgs) error
ping(context.Context, ElasticsearchArgs) (bool, error)
stop()
entryExists(context.Context, string, string) (bool, error)
removeEntry(context.Context, string, string) error
updateEntry(context.Context, string, string, event.Event) error
addEntry(context.Context, string, event.Event) error
}
// ElasticsearchArgs - Elasticsearch target arguments.
type ElasticsearchArgs struct {
Enable bool `json:"enable"`
Format string `json:"format"`
URL xnet.URL `json:"url"`
Index string `json:"index"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
Transport *http.Transport `json:"-"`
Username string `json:"username"`
Password string `json:"password"`
}
// Validate ElasticsearchArgs fields
func (a ElasticsearchArgs) Validate() error {
if !a.Enable {
return nil
}
if a.URL.IsEmpty() {
return errors.New("empty URL")
}
if a.Format != "" {
f := strings.ToLower(a.Format)
if f != event.NamespaceFormat && f != event.AccessFormat {
return errors.New("format value unrecognized")
}
}
if a.Index == "" {
return errors.New("empty index value")
}
if (a.Username == "" && a.Password != "") || (a.Username != "" && a.Password == "") {
return errors.New("username and password should be set in pairs")
}
return nil
}
// ElasticsearchTarget - Elasticsearch target.
type ElasticsearchTarget struct {
id event.TargetID
args ElasticsearchArgs
client esClient
store Store
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
}
// ID - returns target ID.
func (target *ElasticsearchTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *ElasticsearchTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *ElasticsearchTarget) IsActive() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := target.checkAndInitClient(ctx)
if err != nil {
return false, err
}
return target.client.ping(ctx, target.args)
}
// Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active.
func (target *ElasticsearchTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
err := target.send(eventData)
if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err, false) {
return errNotConnected
}
return err
}
// send - sends the event to the target.
func (target *ElasticsearchTarget) send(eventData event.Event) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if target.args.Format == event.NamespaceFormat {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
// Calculate a hash of the key for the id of the ES document.
// Id's are limited to 512 bytes in V7+, so we need to do this.
var keyHash string
{
key := eventData.S3.Bucket.Name + "/" + objectName
if target.client.isAtleastV7() {
hh, _ := highwayhash.New(magicHighwayHash256Key) // New will never return error since key is 256 bit
hh.Write([]byte(key))
hashBytes := hh.Sum(nil)
keyHash = base64.URLEncoding.EncodeToString(hashBytes)
} else {
keyHash = key
}
}
if eventData.EventName == event.ObjectRemovedDelete {
err = target.client.removeEntry(ctx, target.args.Index, keyHash)
} else {
err = target.client.updateEntry(ctx, target.args.Index, keyHash, eventData)
}
return err
}
if target.args.Format == event.AccessFormat {
return target.client.addEntry(ctx, target.args.Index, eventData)
}
return nil
}
// Send - reads an event from store and sends it to Elasticsearch.
func (target *ElasticsearchTarget) Send(eventKey string) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := target.checkAndInitClient(ctx)
if err != nil {
return err
}
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os.IsNotExist(eErr) {
return nil
}
return eErr
}
if err := target.send(eventData); err != nil {
if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err, false) {
return errNotConnected
}
return err
}
// Delete the event from store.
return target.store.Del(eventKey)
}
// Close - does nothing and available for interface compatibility.
func (target *ElasticsearchTarget) Close() error {
if target.client != nil {
// Stops the background processes that the client is running.
target.client.stop()
}
return nil
}
func (target *ElasticsearchTarget) checkAndInitClient(ctx context.Context) error {
if target.client != nil {
return nil
}
clientV7, err := newClientV7(target.args)
if err != nil {
return err
}
// Check es version to confirm if it is supported.
serverSupportStatus, version, err := clientV7.getServerSupportStatus(ctx)
if err != nil {
return err
}
switch serverSupportStatus {
case ESSUnknown:
return errors.New("unable to determine support status of ES (should not happen)")
case ESSDeprecated:
fmt.Printf("DEPRECATION WARNING: Support for Elasticsearch version '%s' will be dropped in a future release. Please upgrade to a version >= 7.x.", version)
target.client, err = newClientV56(target.args)
if err != nil {
return err
}
case ESSSupported:
target.client = clientV7
default:
// ESSUnsupported case
return fmt.Errorf("Elasticsearch version '%s' is not supported! Please use at least version 7.x.", version)
}
target.client.createIndex(target.args)
return nil
}
// NewElasticsearchTarget - creates new Elasticsearch target.
func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*ElasticsearchTarget, error) {
target := &ElasticsearchTarget{
id: event.TargetID{ID: id, Name: "elasticsearch"},
args: args,
loggerOnce: loggerOnce,
}
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-elasticsearch-"+id)
target.store = NewQueueStore(queueDir, args.QueueLimit)
if err := target.store.Open(); err != nil {
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := target.checkAndInitClient(ctx)
if err != nil {
if target.store == nil || err != errNotConnected {
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil
}
// ES Client definitions and methods
type esClientV7 struct {
*elasticsearch7.Client
}
func newClientV7(args ElasticsearchArgs) (*esClientV7, error) {
// Client options
elasticConfig := elasticsearch7.Config{
Addresses: []string{args.URL.String()},
Transport: args.Transport,
MaxRetries: 10,
}
// Set basic auth
if args.Username != "" && args.Password != "" {
elasticConfig.Username = args.Username
elasticConfig.Password = args.Password
}
// Create a client
client, err := elasticsearch7.NewClient(elasticConfig)
if err != nil {
return nil, err
}
clientV7 := &esClientV7{client}
return clientV7, nil
}
func (c *esClientV7) getServerSupportStatus(ctx context.Context) (ESSupportStatus, string, error) {
resp, err := c.Info(
c.Info.WithContext(ctx),
)
if err != nil {
return ESSUnknown, "", errNotConnected
}
defer resp.Body.Close()
m := make(map[string]interface{})
err = json.NewDecoder(resp.Body).Decode(&m)
if err != nil {
return ESSUnknown, "", fmt.Errorf("unable to get ES Server version - json parse error: %v", err)
}
if v, ok := m["version"].(map[string]interface{}); ok {
if ver, ok := v["number"].(string); ok {
status, err := getESVersionSupportStatus(ver)
return status, ver, err
}
}
return ESSUnknown, "", fmt.Errorf("Unable to get ES Server Version - got INFO response: %v", m)
}
func (c *esClientV7) isAtleastV7() bool {
return true
}
// createIndex - creates the index if it does not exist.
func (c *esClientV7) createIndex(args ElasticsearchArgs) error {
res, err := c.Indices.ResolveIndex([]string{args.Index})
if err != nil {
return err
}
defer res.Body.Close()
var v map[string]interface{}
found := false
if err := json.NewDecoder(res.Body).Decode(&v); err != nil {
return fmt.Errorf("Error parsing response body: %v", err)
}
indices := v["indices"].([]interface{})
for _, index := range indices {
name := index.(map[string]interface{})["name"]
if name == args.Index {
found = true
break
}
}
if !found {
resp, err := c.Indices.Create(args.Index)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.IsError() {
err := fmt.Errorf("Create index err: %s", res.String())
return err
}
io.Copy(ioutil.Discard, resp.Body)
return nil
}
return nil
}
func (c *esClientV7) ping(ctx context.Context, _ ElasticsearchArgs) (bool, error) {
resp, err := c.Ping(
c.Ping.WithContext(ctx),
)
if err != nil {
return false, errNotConnected
}
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
return !resp.IsError(), nil
}
func (c *esClientV7) entryExists(ctx context.Context, index string, key string) (bool, error) {
res, err := c.Exists(
index,
key,
c.Exists.WithContext(ctx),
)
if err != nil {
return false, err
}
io.Copy(ioutil.Discard, res.Body)
res.Body.Close()
return !res.IsError(), nil
}
func (c *esClientV7) removeEntry(ctx context.Context, index string, key string) error {
exists, err := c.entryExists(ctx, index, key)
if err == nil && exists {
res, err := c.Delete(
index,
key,
c.Delete.WithContext(ctx),
)
if err != nil {
return err
}
defer res.Body.Close()
if res.IsError() {
err := fmt.Errorf("Delete err: %s", res.String())
return err
}
io.Copy(ioutil.Discard, res.Body)
return nil
}
return err
}
func (c *esClientV7) updateEntry(ctx context.Context, index string, key string, eventData event.Event) error {
doc := map[string]interface{}{
"Records": []event.Event{eventData},
}
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
err := enc.Encode(doc)
if err != nil {
return err
}
res, err := c.Index(
index,
&buf,
c.Index.WithDocumentID(key),
c.Index.WithContext(ctx),
)
if err != nil {
return err
}
defer res.Body.Close()
if res.IsError() {
err := fmt.Errorf("Update err: %s", res.String())
return err
}
io.Copy(ioutil.Discard, res.Body)
return nil
}
func (c *esClientV7) addEntry(ctx context.Context, index string, eventData event.Event) error {
doc := map[string]interface{}{
"Records": []event.Event{eventData},
}
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
err := enc.Encode(doc)
if err != nil {
return err
}
res, err := c.Index(
index,
&buf,
c.Index.WithContext(ctx),
)
if err != nil {
return err
}
defer res.Body.Close()
if res.IsError() {
err := fmt.Errorf("Add err: %s", res.String())
return err
}
io.Copy(ioutil.Discard, res.Body)
return nil
}
func (c *esClientV7) stop() {
}
// For versions under 7
type esClientV56 struct {
*elastic.Client
}
func newClientV56(args ElasticsearchArgs) (*esClientV56, error) {
// Client options
options := []elastic.ClientOptionFunc{elastic.SetURL(args.URL.String()),
elastic.SetMaxRetries(10),
elastic.SetSniff(false),
elastic.SetHttpClient(&http.Client{Transport: args.Transport})}
// Set basic auth
if args.Username != "" && args.Password != "" {
options = append(options, elastic.SetBasicAuth(args.Username, args.Password))
}
// Create a client
client, err := elastic.NewClient(options...)
if err != nil {
// https://github.com/olivere/elastic/wiki/Connection-Errors
if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err, false) {
return nil, errNotConnected
}
return nil, err
}
return &esClientV56{client}, nil
}
func (c *esClientV56) isAtleastV7() bool {
return false
}
// createIndex - creates the index if it does not exist.
func (c *esClientV56) createIndex(args ElasticsearchArgs) error {
exists, err := c.IndexExists(args.Index).Do(context.Background())
if err != nil {
return err
}
if !exists {
var createIndex *elastic.IndicesCreateResult
if createIndex, err = c.CreateIndex(args.Index).Do(context.Background()); err != nil {
return err
}
if !createIndex.Acknowledged {
return fmt.Errorf("index %v not created", args.Index)
}
}
return nil
}
func (c *esClientV56) ping(ctx context.Context, args ElasticsearchArgs) (bool, error) {
_, code, err := c.Ping(args.URL.String()).HttpHeadOnly(true).Do(ctx)
if err != nil {
if elastic.IsConnErr(err) || elastic.IsContextErr(err) || xnet.IsNetworkOrHostDown(err, false) {
return false, errNotConnected
}
return false, err
}
return !(code >= http.StatusBadRequest), nil
}
func (c *esClientV56) entryExists(ctx context.Context, index string, key string) (bool, error) {
return c.Exists().Index(index).Type("event").Id(key).Do(ctx)
}
func (c *esClientV56) removeEntry(ctx context.Context, index string, key string) error {
exists, err := c.entryExists(ctx, index, key)
if err == nil && exists {
_, err = c.Delete().Index(index).Type("event").Id(key).Do(ctx)
}
return err
}
func (c *esClientV56) updateEntry(ctx context.Context, index string, key string, eventData event.Event) error {
_, err := c.Index().Index(index).Type("event").BodyJson(map[string]interface{}{"Records": []event.Event{eventData}}).Id(key).Do(ctx)
return err
}
func (c *esClientV56) addEntry(ctx context.Context, index string, eventData event.Event) error {
_, err := c.Index().Index(index).Type("event").BodyJson(map[string]interface{}{"Records": []event.Event{eventData}}).Do(ctx)
return err
}
func (c *esClientV56) stop() {
c.Stop()
}

View File

@@ -0,0 +1,331 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package target
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"net"
"net/url"
"os"
"path/filepath"
"github.com/minio/minio/internal/event"
xnet "github.com/minio/minio/internal/net"
sarama "github.com/Shopify/sarama"
saramatls "github.com/Shopify/sarama/tools/tls"
)
// Kafka input constants
const (
KafkaBrokers = "brokers"
KafkaTopic = "topic"
KafkaQueueDir = "queue_dir"
KafkaQueueLimit = "queue_limit"
KafkaTLS = "tls"
KafkaTLSSkipVerify = "tls_skip_verify"
KafkaTLSClientAuth = "tls_client_auth"
KafkaSASL = "sasl"
KafkaSASLUsername = "sasl_username"
KafkaSASLPassword = "sasl_password"
KafkaSASLMechanism = "sasl_mechanism"
KafkaClientTLSCert = "client_tls_cert"
KafkaClientTLSKey = "client_tls_key"
KafkaVersion = "version"
EnvKafkaEnable = "MINIO_NOTIFY_KAFKA_ENABLE"
EnvKafkaBrokers = "MINIO_NOTIFY_KAFKA_BROKERS"
EnvKafkaTopic = "MINIO_NOTIFY_KAFKA_TOPIC"
EnvKafkaQueueDir = "MINIO_NOTIFY_KAFKA_QUEUE_DIR"
EnvKafkaQueueLimit = "MINIO_NOTIFY_KAFKA_QUEUE_LIMIT"
EnvKafkaTLS = "MINIO_NOTIFY_KAFKA_TLS"
EnvKafkaTLSSkipVerify = "MINIO_NOTIFY_KAFKA_TLS_SKIP_VERIFY"
EnvKafkaTLSClientAuth = "MINIO_NOTIFY_KAFKA_TLS_CLIENT_AUTH"
EnvKafkaSASLEnable = "MINIO_NOTIFY_KAFKA_SASL"
EnvKafkaSASLUsername = "MINIO_NOTIFY_KAFKA_SASL_USERNAME"
EnvKafkaSASLPassword = "MINIO_NOTIFY_KAFKA_SASL_PASSWORD"
EnvKafkaSASLMechanism = "MINIO_NOTIFY_KAFKA_SASL_MECHANISM"
EnvKafkaClientTLSCert = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_CERT"
EnvKafkaClientTLSKey = "MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY"
EnvKafkaVersion = "MINIO_NOTIFY_KAFKA_VERSION"
)
// KafkaArgs - Kafka target arguments.
type KafkaArgs struct {
Enable bool `json:"enable"`
Brokers []xnet.Host `json:"brokers"`
Topic string `json:"topic"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
Version string `json:"version"`
TLS struct {
Enable bool `json:"enable"`
RootCAs *x509.CertPool `json:"-"`
SkipVerify bool `json:"skipVerify"`
ClientAuth tls.ClientAuthType `json:"clientAuth"`
ClientTLSCert string `json:"clientTLSCert"`
ClientTLSKey string `json:"clientTLSKey"`
} `json:"tls"`
SASL struct {
Enable bool `json:"enable"`
User string `json:"username"`
Password string `json:"password"`
Mechanism string `json:"mechanism"`
} `json:"sasl"`
}
// Validate KafkaArgs fields
func (k KafkaArgs) Validate() error {
if !k.Enable {
return nil
}
if len(k.Brokers) == 0 {
return errors.New("no broker address found")
}
for _, b := range k.Brokers {
if _, err := xnet.ParseHost(b.String()); err != nil {
return err
}
}
if k.QueueDir != "" {
if !filepath.IsAbs(k.QueueDir) {
return errors.New("queueDir path should be absolute")
}
}
if k.Version != "" {
if _, err := sarama.ParseKafkaVersion(k.Version); err != nil {
return err
}
}
return nil
}
// KafkaTarget - Kafka target.
type KafkaTarget struct {
id event.TargetID
args KafkaArgs
producer sarama.SyncProducer
config *sarama.Config
store Store
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
}
// ID - returns target ID.
func (target *KafkaTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *KafkaTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *KafkaTarget) IsActive() (bool, error) {
if !target.args.pingBrokers() {
return false, errNotConnected
}
return true, nil
}
// Save - saves the events to the store which will be replayed when the Kafka connection is active.
func (target *KafkaTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.IsActive()
if err != nil {
return err
}
return target.send(eventData)
}
// send - sends an event to the kafka.
func (target *KafkaTarget) send(eventData event.Event) error {
if target.producer == nil {
return errNotConnected
}
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
data, err := json.Marshal(event.Log{EventName: eventData.EventName, Key: key, Records: []event.Event{eventData}})
if err != nil {
return err
}
msg := sarama.ProducerMessage{
Topic: target.args.Topic,
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(data),
}
_, _, err = target.producer.SendMessage(&msg)
return err
}
// Send - reads an event from store and sends it to Kafka.
func (target *KafkaTarget) Send(eventKey string) error {
var err error
_, err = target.IsActive()
if err != nil {
return err
}
if target.producer == nil {
brokers := []string{}
for _, broker := range target.args.Brokers {
brokers = append(brokers, broker.String())
}
target.producer, err = sarama.NewSyncProducer(brokers, target.config)
if err != nil {
if err != sarama.ErrOutOfBrokers {
return err
}
return errNotConnected
}
}
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os.IsNotExist(eErr) {
return nil
}
return eErr
}
err = target.send(eventData)
if err != nil {
// Sarama opens the ciruit breaker after 3 consecutive connection failures.
if err == sarama.ErrLeaderNotAvailable || err.Error() == "circuit breaker is open" {
return errNotConnected
}
return err
}
// Delete the event from store.
return target.store.Del(eventKey)
}
// Close - closes underneath kafka connection.
func (target *KafkaTarget) Close() error {
if target.producer != nil {
return target.producer.Close()
}
return nil
}
// Check if atleast one broker in cluster is active
func (k KafkaArgs) pingBrokers() bool {
for _, broker := range k.Brokers {
_, dErr := net.Dial("tcp", broker.String())
if dErr == nil {
return true
}
}
return false
}
// NewKafkaTarget - creates new Kafka target with auth credentials.
func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*KafkaTarget, error) {
config := sarama.NewConfig()
target := &KafkaTarget{
id: event.TargetID{ID: id, Name: "kafka"},
args: args,
loggerOnce: loggerOnce,
}
if args.Version != "" {
kafkaVersion, err := sarama.ParseKafkaVersion(args.Version)
if err != nil {
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
config.Version = kafkaVersion
}
config.Net.SASL.User = args.SASL.User
config.Net.SASL.Password = args.SASL.Password
initScramClient(args, config) // initializes configured scram client.
config.Net.SASL.Enable = args.SASL.Enable
tlsConfig, err := saramatls.NewConfig(args.TLS.ClientTLSCert, args.TLS.ClientTLSKey)
if err != nil {
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
config.Net.TLS.Enable = args.TLS.Enable
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Config.InsecureSkipVerify = args.TLS.SkipVerify
config.Net.TLS.Config.ClientAuth = args.TLS.ClientAuth
config.Net.TLS.Config.RootCAs = args.TLS.RootCAs
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 10
config.Producer.Return.Successes = true
target.config = config
brokers := []string{}
for _, broker := range args.Brokers {
brokers = append(brokers, broker.String())
}
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-kafka-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
target.loggerOnce(context.Background(), oErr, target.ID())
return target, oErr
}
target.store = store
}
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
if store == nil || err != sarama.ErrOutOfBrokers {
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
}
target.producer = producer
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil
}

View File

@@ -0,0 +1,83 @@
/*
* MinIO Object Storage (c) 2021 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"crypto/sha256"
"crypto/sha512"
"github.com/Shopify/sarama"
"github.com/xdg/scram"
)
func initScramClient(args KafkaArgs, config *sarama.Config) {
if args.SASL.Mechanism == "sha512" {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: KafkaSHA512} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
} else if args.SASL.Mechanism == "sha256" {
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: KafkaSHA256} }
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
} else {
// default to PLAIN
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext)
}
}
// KafkaSHA256 is a function that returns a crypto/sha256 hasher and should be used
// to create Client objects configured for SHA-256 hashing.
var KafkaSHA256 scram.HashGeneratorFcn = sha256.New
// KafkaSHA512 is a function that returns a crypto/sha512 hasher and should be used
// to create Client objects configured for SHA-512 hashing.
var KafkaSHA512 scram.HashGeneratorFcn = sha512.New
// XDGSCRAMClient implements the client-side of an authentication
// conversation with a server. A new conversation must be created for
// each authentication attempt.
type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}
// Begin constructs a SCRAM client component based on a given hash.Hash
// factory receiver. This constructor will normalize the username, password
// and authzID via the SASLprep algorithm, as recommended by RFC-5802. If
// SASLprep fails, the method returns an error.
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}
// Step takes a string provided from a server (or just an empty string for the
// very first conversation step) and attempts to move the authentication
// conversation forward. It returns a string to be sent to the server or an
// error if the server message is invalid. Calling Step after a conversation
// completes is also an error.
func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}
// Done returns true if the conversation is completed or has errored.
func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}

View File

@@ -0,0 +1,281 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package target
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/minio/minio/internal/event"
xnet "github.com/minio/minio/internal/net"
)
const (
reconnectInterval = 5 // In Seconds
storePrefix = "minio"
)
// MQTT input constants
const (
MqttBroker = "broker"
MqttTopic = "topic"
MqttQoS = "qos"
MqttUsername = "username"
MqttPassword = "password"
MqttReconnectInterval = "reconnect_interval"
MqttKeepAliveInterval = "keep_alive_interval"
MqttQueueDir = "queue_dir"
MqttQueueLimit = "queue_limit"
EnvMQTTEnable = "MINIO_NOTIFY_MQTT_ENABLE"
EnvMQTTBroker = "MINIO_NOTIFY_MQTT_BROKER"
EnvMQTTTopic = "MINIO_NOTIFY_MQTT_TOPIC"
EnvMQTTQoS = "MINIO_NOTIFY_MQTT_QOS"
EnvMQTTUsername = "MINIO_NOTIFY_MQTT_USERNAME"
EnvMQTTPassword = "MINIO_NOTIFY_MQTT_PASSWORD"
EnvMQTTReconnectInterval = "MINIO_NOTIFY_MQTT_RECONNECT_INTERVAL"
EnvMQTTKeepAliveInterval = "MINIO_NOTIFY_MQTT_KEEP_ALIVE_INTERVAL"
EnvMQTTQueueDir = "MINIO_NOTIFY_MQTT_QUEUE_DIR"
EnvMQTTQueueLimit = "MINIO_NOTIFY_MQTT_QUEUE_LIMIT"
)
// MQTTArgs - MQTT target arguments.
type MQTTArgs struct {
Enable bool `json:"enable"`
Broker xnet.URL `json:"broker"`
Topic string `json:"topic"`
QoS byte `json:"qos"`
User string `json:"username"`
Password string `json:"password"`
MaxReconnectInterval time.Duration `json:"reconnectInterval"`
KeepAlive time.Duration `json:"keepAliveInterval"`
RootCAs *x509.CertPool `json:"-"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
}
// Validate MQTTArgs fields
func (m MQTTArgs) Validate() error {
if !m.Enable {
return nil
}
u, err := xnet.ParseURL(m.Broker.String())
if err != nil {
return err
}
switch u.Scheme {
case "ws", "wss", "tcp", "ssl", "tls", "tcps":
default:
return errors.New("unknown protocol in broker address")
}
if m.QueueDir != "" {
if !filepath.IsAbs(m.QueueDir) {
return errors.New("queueDir path should be absolute")
}
if m.QoS == 0 {
return errors.New("qos should be set to 1 or 2 if queueDir is set")
}
}
return nil
}
// MQTTTarget - MQTT target.
type MQTTTarget struct {
id event.TargetID
args MQTTArgs
client mqtt.Client
store Store
quitCh chan struct{}
loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})
}
// ID - returns target ID.
func (target *MQTTTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *MQTTTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *MQTTTarget) IsActive() (bool, error) {
if !target.client.IsConnectionOpen() {
return false, errNotConnected
}
return true, nil
}
// send - sends an event to the mqtt.
func (target *MQTTTarget) send(eventData event.Event) error {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
data, err := json.Marshal(event.Log{EventName: eventData.EventName, Key: key, Records: []event.Event{eventData}})
if err != nil {
return err
}
token := target.client.Publish(target.args.Topic, target.args.QoS, false, string(data))
if !token.WaitTimeout(reconnectInterval * time.Second) {
return errNotConnected
}
return token.Error()
}
// Send - reads an event from store and sends it to MQTT.
func (target *MQTTTarget) Send(eventKey string) error {
// Do not send if the connection is not active.
_, err := target.IsActive()
if err != nil {
return err
}
eventData, err := target.store.Get(eventKey)
if err != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os.IsNotExist(err) {
return nil
}
return err
}
if err = target.send(eventData); err != nil {
return err
}
// Delete the event from store.
return target.store.Del(eventKey)
}
// Save - saves the events to the store if queuestore is configured, which will
// be replayed when the mqtt connection is active.
func (target *MQTTTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
// Do not send if the connection is not active.
_, err := target.IsActive()
if err != nil {
return err
}
return target.send(eventData)
}
// Close - does nothing and available for interface compatibility.
func (target *MQTTTarget) Close() error {
target.client.Disconnect(100)
close(target.quitCh)
return nil
}
// NewMQTTTarget - creates new MQTT target.
func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*MQTTTarget, error) {
if args.MaxReconnectInterval == 0 {
// Default interval
// https://github.com/eclipse/paho.mqtt.golang/blob/master/options.go#L115
args.MaxReconnectInterval = 10 * time.Minute
}
options := mqtt.NewClientOptions().
SetClientID("").
SetCleanSession(true).
SetUsername(args.User).
SetPassword(args.Password).
SetMaxReconnectInterval(args.MaxReconnectInterval).
SetKeepAlive(args.KeepAlive).
SetTLSConfig(&tls.Config{RootCAs: args.RootCAs}).
AddBroker(args.Broker.String())
client := mqtt.NewClient(options)
target := &MQTTTarget{
id: event.TargetID{ID: id, Name: "mqtt"},
args: args,
client: client,
quitCh: make(chan struct{}),
loggerOnce: loggerOnce,
}
token := client.Connect()
retryRegister := func() {
for {
retry:
select {
case <-doneCh:
return
case <-target.quitCh:
return
default:
ok := token.WaitTimeout(reconnectInterval * time.Second)
if ok && token.Error() != nil {
target.loggerOnce(context.Background(),
fmt.Errorf("Previous connect failed with %w attempting a reconnect",
token.Error()),
target.ID())
time.Sleep(reconnectInterval * time.Second)
token = client.Connect()
goto retry
}
if ok {
// Successfully connected.
return
}
}
}
}
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mqtt-"+id)
target.store = NewQueueStore(queueDir, args.QueueLimit)
if err := target.store.Open(); err != nil {
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
if !test {
go retryRegister()
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
} else {
if token.Wait() && token.Error() != nil {
return target, token.Error()
}
}
return target, nil
}

View File

@@ -0,0 +1,401 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package target
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/go-sql-driver/mysql"
"github.com/minio/minio/internal/event"
xnet "github.com/minio/minio/internal/net"
)
const (
mysqlTableExists = `SELECT 1 FROM %s;`
mysqlCreateNamespaceTable = `CREATE TABLE %s (key_name VARCHAR(2048), value JSON, PRIMARY KEY (key_name));`
mysqlCreateAccessTable = `CREATE TABLE %s (event_time DATETIME NOT NULL, event_data JSON);`
mysqlUpdateRow = `INSERT INTO %s (key_name, value) VALUES (?, ?) ON DUPLICATE KEY UPDATE value=VALUES(value);`
mysqlDeleteRow = `DELETE FROM %s WHERE key_name = ?;`
mysqlInsertRow = `INSERT INTO %s (event_time, event_data) VALUES (?, ?);`
)
// MySQL related constants
const (
MySQLFormat = "format"
MySQLDSNString = "dsn_string"
MySQLTable = "table"
MySQLHost = "host"
MySQLPort = "port"
MySQLUsername = "username"
MySQLPassword = "password"
MySQLDatabase = "database"
MySQLQueueLimit = "queue_limit"
MySQLQueueDir = "queue_dir"
MySQLMaxOpenConnections = "max_open_connections"
EnvMySQLEnable = "MINIO_NOTIFY_MYSQL_ENABLE"
EnvMySQLFormat = "MINIO_NOTIFY_MYSQL_FORMAT"
EnvMySQLDSNString = "MINIO_NOTIFY_MYSQL_DSN_STRING"
EnvMySQLTable = "MINIO_NOTIFY_MYSQL_TABLE"
EnvMySQLHost = "MINIO_NOTIFY_MYSQL_HOST"
EnvMySQLPort = "MINIO_NOTIFY_MYSQL_PORT"
EnvMySQLUsername = "MINIO_NOTIFY_MYSQL_USERNAME"
EnvMySQLPassword = "MINIO_NOTIFY_MYSQL_PASSWORD"
EnvMySQLDatabase = "MINIO_NOTIFY_MYSQL_DATABASE"
EnvMySQLQueueLimit = "MINIO_NOTIFY_MYSQL_QUEUE_LIMIT"
EnvMySQLQueueDir = "MINIO_NOTIFY_MYSQL_QUEUE_DIR"
EnvMySQLMaxOpenConnections = "MINIO_NOTIFY_MYSQL_MAX_OPEN_CONNECTIONS"
)
// MySQLArgs - MySQL target arguments.
type MySQLArgs struct {
Enable bool `json:"enable"`
Format string `json:"format"`
DSN string `json:"dsnString"`
Table string `json:"table"`
Host xnet.URL `json:"host"`
Port string `json:"port"`
User string `json:"user"`
Password string `json:"password"`
Database string `json:"database"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
MaxOpenConnections int `json:"maxOpenConnections"`
}
// Validate MySQLArgs fields
func (m MySQLArgs) Validate() error {
if !m.Enable {
return nil
}
if m.Format != "" {
f := strings.ToLower(m.Format)
if f != event.NamespaceFormat && f != event.AccessFormat {
return fmt.Errorf("unrecognized format")
}
}
if m.Table == "" {
return fmt.Errorf("table unspecified")
}
if m.DSN != "" {
if _, err := mysql.ParseDSN(m.DSN); err != nil {
return err
}
} else {
// Some fields need to be specified when DSN is unspecified
if m.Port == "" {
return fmt.Errorf("unspecified port")
}
if _, err := strconv.Atoi(m.Port); err != nil {
return fmt.Errorf("invalid port")
}
if m.Database == "" {
return fmt.Errorf("database unspecified")
}
}
if m.QueueDir != "" {
if !filepath.IsAbs(m.QueueDir) {
return errors.New("queueDir path should be absolute")
}
}
if m.MaxOpenConnections < 0 {
return errors.New("maxOpenConnections cannot be less than zero")
}
return nil
}
// MySQLTarget - MySQL target.
type MySQLTarget struct {
id event.TargetID
args MySQLArgs
updateStmt *sql.Stmt
deleteStmt *sql.Stmt
insertStmt *sql.Stmt
db *sql.DB
store Store
firstPing bool
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
}
// ID - returns target ID.
func (target *MySQLTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *MySQLTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *MySQLTarget) IsActive() (bool, error) {
if target.db == nil {
db, sErr := sql.Open("mysql", target.args.DSN)
if sErr != nil {
return false, sErr
}
target.db = db
if target.args.MaxOpenConnections > 0 {
// Set the maximum connections limit
target.db.SetMaxOpenConns(target.args.MaxOpenConnections)
}
}
if err := target.db.Ping(); err != nil {
if IsConnErr(err) {
return false, errNotConnected
}
return false, err
}
return true, nil
}
// Save - saves the events to the store which will be replayed when the SQL connection is active.
func (target *MySQLTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.IsActive()
if err != nil {
return err
}
return target.send(eventData)
}
// send - sends an event to the mysql.
func (target *MySQLTarget) send(eventData event.Event) error {
if target.args.Format == event.NamespaceFormat {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
if eventData.EventName == event.ObjectRemovedDelete {
_, err = target.deleteStmt.Exec(key)
} else {
var data []byte
if data, err = json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}}); err != nil {
return err
}
_, err = target.updateStmt.Exec(key, data)
}
return err
}
if target.args.Format == event.AccessFormat {
eventTime, err := time.Parse(event.AMZTimeFormat, eventData.EventTime)
if err != nil {
return err
}
data, err := json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}})
if err != nil {
return err
}
_, err = target.insertStmt.Exec(eventTime, data)
return err
}
return nil
}
// Send - reads an event from store and sends it to MySQL.
func (target *MySQLTarget) Send(eventKey string) error {
_, err := target.IsActive()
if err != nil {
return err
}
if !target.firstPing {
if err := target.executeStmts(); err != nil {
if IsConnErr(err) {
return errNotConnected
}
return err
}
}
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os.IsNotExist(eErr) {
return nil
}
return eErr
}
if err := target.send(eventData); err != nil {
if IsConnErr(err) {
return errNotConnected
}
return err
}
// Delete the event from store.
return target.store.Del(eventKey)
}
// Close - closes underneath connections to MySQL database.
func (target *MySQLTarget) Close() error {
if target.updateStmt != nil {
// FIXME: log returned error. ignore time being.
_ = target.updateStmt.Close()
}
if target.deleteStmt != nil {
// FIXME: log returned error. ignore time being.
_ = target.deleteStmt.Close()
}
if target.insertStmt != nil {
// FIXME: log returned error. ignore time being.
_ = target.insertStmt.Close()
}
return target.db.Close()
}
// Executes the table creation statements.
func (target *MySQLTarget) executeStmts() error {
_, err := target.db.Exec(fmt.Sprintf(mysqlTableExists, target.args.Table))
if err != nil {
createStmt := mysqlCreateNamespaceTable
if target.args.Format == event.AccessFormat {
createStmt = mysqlCreateAccessTable
}
if _, dbErr := target.db.Exec(fmt.Sprintf(createStmt, target.args.Table)); dbErr != nil {
return dbErr
}
}
switch target.args.Format {
case event.NamespaceFormat:
// insert or update statement
if target.updateStmt, err = target.db.Prepare(fmt.Sprintf(mysqlUpdateRow, target.args.Table)); err != nil {
return err
}
// delete statement
if target.deleteStmt, err = target.db.Prepare(fmt.Sprintf(mysqlDeleteRow, target.args.Table)); err != nil {
return err
}
case event.AccessFormat:
// insert statement
if target.insertStmt, err = target.db.Prepare(fmt.Sprintf(mysqlInsertRow, target.args.Table)); err != nil {
return err
}
}
return nil
}
// NewMySQLTarget - creates new MySQL target.
func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*MySQLTarget, error) {
if args.DSN == "" {
config := mysql.Config{
User: args.User,
Passwd: args.Password,
Net: "tcp",
Addr: args.Host.String() + ":" + args.Port,
DBName: args.Database,
AllowNativePasswords: true,
CheckConnLiveness: true,
}
args.DSN = config.FormatDSN()
}
target := &MySQLTarget{
id: event.TargetID{ID: id, Name: "mysql"},
args: args,
firstPing: false,
loggerOnce: loggerOnce,
}
db, err := sql.Open("mysql", args.DSN)
if err != nil {
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
target.db = db
if args.MaxOpenConnections > 0 {
// Set the maximum connections limit
target.db.SetMaxOpenConns(args.MaxOpenConnections)
}
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mysql-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
target.loggerOnce(context.Background(), oErr, target.ID())
return target, oErr
}
target.store = store
}
err = target.db.Ping()
if err != nil {
if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
} else {
if err = target.executeStmts(); err != nil {
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
target.firstPing = true
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil
}

View File

@@ -0,0 +1,38 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package target
import (
"database/sql"
"testing"
)
// TestPostgreSQLRegistration checks if sql driver
// is registered and fails otherwise.
func TestMySQLRegistration(t *testing.T) {
var found bool
for _, drv := range sql.Drivers() {
if drv == "mysql" {
found = true
break
}
}
if !found {
t.Fatal("mysql driver not registered")
}
}

View File

@@ -0,0 +1,373 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package target
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"net/url"
"os"
"path/filepath"
"github.com/minio/minio/internal/event"
xnet "github.com/minio/minio/internal/net"
"github.com/nats-io/nats.go"
"github.com/nats-io/stan.go"
)
// NATS related constants
const (
NATSAddress = "address"
NATSSubject = "subject"
NATSUsername = "username"
NATSPassword = "password"
NATSToken = "token"
NATSTLS = "tls"
NATSTLSSkipVerify = "tls_skip_verify"
NATSPingInterval = "ping_interval"
NATSQueueDir = "queue_dir"
NATSQueueLimit = "queue_limit"
NATSCertAuthority = "cert_authority"
NATSClientCert = "client_cert"
NATSClientKey = "client_key"
// Streaming constants
NATSStreaming = "streaming"
NATSStreamingClusterID = "streaming_cluster_id"
NATSStreamingAsync = "streaming_async"
NATSStreamingMaxPubAcksInFlight = "streaming_max_pub_acks_in_flight"
EnvNATSEnable = "MINIO_NOTIFY_NATS_ENABLE"
EnvNATSAddress = "MINIO_NOTIFY_NATS_ADDRESS"
EnvNATSSubject = "MINIO_NOTIFY_NATS_SUBJECT"
EnvNATSUsername = "MINIO_NOTIFY_NATS_USERNAME"
EnvNATSPassword = "MINIO_NOTIFY_NATS_PASSWORD"
EnvNATSToken = "MINIO_NOTIFY_NATS_TOKEN"
EnvNATSTLS = "MINIO_NOTIFY_NATS_TLS"
EnvNATSTLSSkipVerify = "MINIO_NOTIFY_NATS_TLS_SKIP_VERIFY"
EnvNATSPingInterval = "MINIO_NOTIFY_NATS_PING_INTERVAL"
EnvNATSQueueDir = "MINIO_NOTIFY_NATS_QUEUE_DIR"
EnvNATSQueueLimit = "MINIO_NOTIFY_NATS_QUEUE_LIMIT"
EnvNATSCertAuthority = "MINIO_NOTIFY_NATS_CERT_AUTHORITY"
EnvNATSClientCert = "MINIO_NOTIFY_NATS_CLIENT_CERT"
EnvNATSClientKey = "MINIO_NOTIFY_NATS_CLIENT_KEY"
// Streaming constants
EnvNATSStreaming = "MINIO_NOTIFY_NATS_STREAMING"
EnvNATSStreamingClusterID = "MINIO_NOTIFY_NATS_STREAMING_CLUSTER_ID"
EnvNATSStreamingAsync = "MINIO_NOTIFY_NATS_STREAMING_ASYNC"
EnvNATSStreamingMaxPubAcksInFlight = "MINIO_NOTIFY_NATS_STREAMING_MAX_PUB_ACKS_IN_FLIGHT"
)
// NATSArgs - NATS target arguments.
type NATSArgs struct {
Enable bool `json:"enable"`
Address xnet.Host `json:"address"`
Subject string `json:"subject"`
Username string `json:"username"`
Password string `json:"password"`
Token string `json:"token"`
TLS bool `json:"tls"`
TLSSkipVerify bool `json:"tlsSkipVerify"`
Secure bool `json:"secure"`
CertAuthority string `json:"certAuthority"`
ClientCert string `json:"clientCert"`
ClientKey string `json:"clientKey"`
PingInterval int64 `json:"pingInterval"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
Streaming struct {
Enable bool `json:"enable"`
ClusterID string `json:"clusterID"`
Async bool `json:"async"`
MaxPubAcksInflight int `json:"maxPubAcksInflight"`
} `json:"streaming"`
RootCAs *x509.CertPool `json:"-"`
}
// Validate NATSArgs fields
func (n NATSArgs) Validate() error {
if !n.Enable {
return nil
}
if n.Address.IsEmpty() {
return errors.New("empty address")
}
if n.Subject == "" {
return errors.New("empty subject")
}
if n.ClientCert != "" && n.ClientKey == "" || n.ClientCert == "" && n.ClientKey != "" {
return errors.New("cert and key must be specified as a pair")
}
if n.Username != "" && n.Password == "" || n.Username == "" && n.Password != "" {
return errors.New("username and password must be specified as a pair")
}
if n.Streaming.Enable {
if n.Streaming.ClusterID == "" {
return errors.New("empty cluster id")
}
}
if n.QueueDir != "" {
if !filepath.IsAbs(n.QueueDir) {
return errors.New("queueDir path should be absolute")
}
}
return nil
}
// To obtain a nats connection from args.
func (n NATSArgs) connectNats() (*nats.Conn, error) {
connOpts := []nats.Option{nats.Name("Minio Notification")}
if n.Username != "" && n.Password != "" {
connOpts = append(connOpts, nats.UserInfo(n.Username, n.Password))
}
if n.Token != "" {
connOpts = append(connOpts, nats.Token(n.Token))
}
if n.Secure || n.TLS && n.TLSSkipVerify {
connOpts = append(connOpts, nats.Secure(nil))
} else if n.TLS {
connOpts = append(connOpts, nats.Secure(&tls.Config{RootCAs: n.RootCAs}))
}
if n.CertAuthority != "" {
connOpts = append(connOpts, nats.RootCAs(n.CertAuthority))
}
if n.ClientCert != "" && n.ClientKey != "" {
connOpts = append(connOpts, nats.ClientCert(n.ClientCert, n.ClientKey))
}
return nats.Connect(n.Address.String(), connOpts...)
}
// To obtain a streaming connection from args.
func (n NATSArgs) connectStan() (stan.Conn, error) {
scheme := "nats"
if n.Secure {
scheme = "tls"
}
var addressURL string
if n.Username != "" && n.Password != "" {
addressURL = scheme + "://" + n.Username + ":" + n.Password + "@" + n.Address.String()
} else if n.Token != "" {
addressURL = scheme + "://" + n.Token + "@" + n.Address.String()
} else {
addressURL = scheme + "://" + n.Address.String()
}
clientID, err := getNewUUID()
if err != nil {
return nil, err
}
connOpts := []stan.Option{stan.NatsURL(addressURL)}
if n.Streaming.MaxPubAcksInflight > 0 {
connOpts = append(connOpts, stan.MaxPubAcksInflight(n.Streaming.MaxPubAcksInflight))
}
return stan.Connect(n.Streaming.ClusterID, clientID, connOpts...)
}
// NATSTarget - NATS target.
type NATSTarget struct {
id event.TargetID
args NATSArgs
natsConn *nats.Conn
stanConn stan.Conn
store Store
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
}
// ID - returns target ID.
func (target *NATSTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *NATSTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *NATSTarget) IsActive() (bool, error) {
var connErr error
if target.args.Streaming.Enable {
if target.stanConn == nil || target.stanConn.NatsConn() == nil {
target.stanConn, connErr = target.args.connectStan()
} else {
if !target.stanConn.NatsConn().IsConnected() {
return false, errNotConnected
}
}
} else {
if target.natsConn == nil {
target.natsConn, connErr = target.args.connectNats()
} else {
if !target.natsConn.IsConnected() {
return false, errNotConnected
}
}
}
if connErr != nil {
if connErr.Error() == nats.ErrNoServers.Error() {
return false, errNotConnected
}
return false, connErr
}
return true, nil
}
// Save - saves the events to the store which will be replayed when the Nats connection is active.
func (target *NATSTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.IsActive()
if err != nil {
return err
}
return target.send(eventData)
}
// send - sends an event to the Nats.
func (target *NATSTarget) send(eventData event.Event) error {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
data, err := json.Marshal(event.Log{EventName: eventData.EventName, Key: key, Records: []event.Event{eventData}})
if err != nil {
return err
}
if target.stanConn != nil {
if target.args.Streaming.Async {
_, err = target.stanConn.PublishAsync(target.args.Subject, data, nil)
} else {
err = target.stanConn.Publish(target.args.Subject, data)
}
} else {
err = target.natsConn.Publish(target.args.Subject, data)
}
return err
}
// Send - sends event to Nats.
func (target *NATSTarget) Send(eventKey string) error {
_, err := target.IsActive()
if err != nil {
return err
}
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os.IsNotExist(eErr) {
return nil
}
return eErr
}
if err := target.send(eventData); err != nil {
return err
}
return target.store.Del(eventKey)
}
// Close - closes underneath connections to NATS server.
func (target *NATSTarget) Close() (err error) {
if target.stanConn != nil {
// closing the streaming connection does not close the provided NATS connection.
if target.stanConn.NatsConn() != nil {
target.stanConn.NatsConn().Close()
}
err = target.stanConn.Close()
}
if target.natsConn != nil {
target.natsConn.Close()
}
return err
}
// NewNATSTarget - creates new NATS target.
func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*NATSTarget, error) {
var natsConn *nats.Conn
var stanConn stan.Conn
var err error
var store Store
target := &NATSTarget{
id: event.TargetID{ID: id, Name: "nats"},
args: args,
loggerOnce: loggerOnce,
}
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-nats-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
target.loggerOnce(context.Background(), oErr, target.ID())
return target, oErr
}
target.store = store
}
if args.Streaming.Enable {
stanConn, err = args.connectStan()
target.stanConn = stanConn
} else {
natsConn, err = args.connectNats()
target.natsConn = natsConn
}
if err != nil {
if store == nil || err.Error() != nats.ErrNoServers.Error() {
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil
}

View File

@@ -0,0 +1,92 @@
/*
* MinIO Object Storage (c) 2021 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"testing"
xnet "github.com/minio/minio/internal/net"
natsserver "github.com/nats-io/nats-server/v2/test"
)
func TestNatsConnPlain(t *testing.T) {
opts := natsserver.DefaultTestOptions
opts.Port = 14222
s := natsserver.RunServer(&opts)
defer s.Shutdown()
clientConfig := &NATSArgs{
Enable: true,
Address: xnet.Host{Name: "localhost",
Port: (xnet.Port(opts.Port)),
IsPortSet: true},
Subject: "test",
}
con, err := clientConfig.connectNats()
if err != nil {
t.Errorf("Could not connect to nats: %v", err)
}
defer con.Close()
}
func TestNatsConnUserPass(t *testing.T) {
opts := natsserver.DefaultTestOptions
opts.Port = 14223
opts.Username = "testminio"
opts.Password = "miniotest"
s := natsserver.RunServer(&opts)
defer s.Shutdown()
clientConfig := &NATSArgs{
Enable: true,
Address: xnet.Host{Name: "localhost",
Port: (xnet.Port(opts.Port)),
IsPortSet: true},
Subject: "test",
Username: opts.Username,
Password: opts.Password,
}
con, err := clientConfig.connectNats()
if err != nil {
t.Errorf("Could not connect to nats: %v", err)
}
defer con.Close()
}
func TestNatsConnToken(t *testing.T) {
opts := natsserver.DefaultTestOptions
opts.Port = 14223
opts.Authorization = "s3cr3t"
s := natsserver.RunServer(&opts)
defer s.Shutdown()
clientConfig := &NATSArgs{
Enable: true,
Address: xnet.Host{Name: "localhost",
Port: (xnet.Port(opts.Port)),
IsPortSet: true},
Subject: "test",
Token: opts.Authorization,
}
con, err := clientConfig.connectNats()
if err != nil {
t.Errorf("Could not connect to nats: %v", err)
}
defer con.Close()
}

View File

@@ -0,0 +1,70 @@
/*
* MinIO Object Storage (c) 2021 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"path"
"path/filepath"
"testing"
xnet "github.com/minio/minio/internal/net"
natsserver "github.com/nats-io/nats-server/v2/test"
)
func TestNatsConnTLSCustomCA(t *testing.T) {
s, opts := natsserver.RunServerWithConfig(filepath.Join("testdata", "contrib", "nats_tls.conf"))
defer s.Shutdown()
clientConfig := &NATSArgs{
Enable: true,
Address: xnet.Host{Name: "localhost",
Port: (xnet.Port(opts.Port)),
IsPortSet: true},
Subject: "test",
Secure: true,
CertAuthority: path.Join("testdata", "contrib", "certs", "root_ca_cert.pem"),
}
con, err := clientConfig.connectNats()
if err != nil {
t.Errorf("Could not connect to nats: %v", err)
}
defer con.Close()
}
func TestNatsConnTLSClientAuthorization(t *testing.T) {
s, opts := natsserver.RunServerWithConfig(filepath.Join("testdata", "contrib", "nats_tls_client_cert.conf"))
defer s.Shutdown()
clientConfig := &NATSArgs{
Enable: true,
Address: xnet.Host{Name: "localhost",
Port: (xnet.Port(opts.Port)),
IsPortSet: true},
Subject: "test",
Secure: true,
CertAuthority: path.Join("testdata", "contrib", "certs", "root_ca_cert.pem"),
ClientCert: path.Join("testdata", "contrib", "certs", "nats_client_cert.pem"),
ClientKey: path.Join("testdata", "contrib", "certs", "nats_client_key.pem"),
}
con, err := clientConfig.connectNats()
if err != nil {
t.Errorf("Could not connect to nats: %v", err)
}
defer con.Close()
}

View File

@@ -0,0 +1,242 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package target
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"net/url"
"os"
"path/filepath"
"github.com/nsqio/go-nsq"
"github.com/minio/minio/internal/event"
xnet "github.com/minio/minio/internal/net"
)
// NSQ constants
const (
NSQAddress = "nsqd_address"
NSQTopic = "topic"
NSQTLS = "tls"
NSQTLSSkipVerify = "tls_skip_verify"
NSQQueueDir = "queue_dir"
NSQQueueLimit = "queue_limit"
EnvNSQEnable = "MINIO_NOTIFY_NSQ_ENABLE"
EnvNSQAddress = "MINIO_NOTIFY_NSQ_NSQD_ADDRESS"
EnvNSQTopic = "MINIO_NOTIFY_NSQ_TOPIC"
EnvNSQTLS = "MINIO_NOTIFY_NSQ_TLS"
EnvNSQTLSSkipVerify = "MINIO_NOTIFY_NSQ_TLS_SKIP_VERIFY"
EnvNSQQueueDir = "MINIO_NOTIFY_NSQ_QUEUE_DIR"
EnvNSQQueueLimit = "MINIO_NOTIFY_NSQ_QUEUE_LIMIT"
)
// NSQArgs - NSQ target arguments.
type NSQArgs struct {
Enable bool `json:"enable"`
NSQDAddress xnet.Host `json:"nsqdAddress"`
Topic string `json:"topic"`
TLS struct {
Enable bool `json:"enable"`
SkipVerify bool `json:"skipVerify"`
} `json:"tls"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
}
// Validate NSQArgs fields
func (n NSQArgs) Validate() error {
if !n.Enable {
return nil
}
if n.NSQDAddress.IsEmpty() {
return errors.New("empty nsqdAddress")
}
if n.Topic == "" {
return errors.New("empty topic")
}
if n.QueueDir != "" {
if !filepath.IsAbs(n.QueueDir) {
return errors.New("queueDir path should be absolute")
}
}
return nil
}
// NSQTarget - NSQ target.
type NSQTarget struct {
id event.TargetID
args NSQArgs
producer *nsq.Producer
store Store
config *nsq.Config
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
}
// ID - returns target ID.
func (target *NSQTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *NSQTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *NSQTarget) IsActive() (bool, error) {
if target.producer == nil {
producer, err := nsq.NewProducer(target.args.NSQDAddress.String(), target.config)
if err != nil {
return false, err
}
target.producer = producer
}
if err := target.producer.Ping(); err != nil {
// To treat "connection refused" errors as errNotConnected.
if IsConnRefusedErr(err) {
return false, errNotConnected
}
return false, err
}
return true, nil
}
// Save - saves the events to the store which will be replayed when the nsq connection is active.
func (target *NSQTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.IsActive()
if err != nil {
return err
}
return target.send(eventData)
}
// send - sends an event to the NSQ.
func (target *NSQTarget) send(eventData event.Event) error {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
data, err := json.Marshal(event.Log{EventName: eventData.EventName, Key: key, Records: []event.Event{eventData}})
if err != nil {
return err
}
return target.producer.Publish(target.args.Topic, data)
}
// Send - reads an event from store and sends it to NSQ.
func (target *NSQTarget) Send(eventKey string) error {
_, err := target.IsActive()
if err != nil {
return err
}
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os.IsNotExist(eErr) {
return nil
}
return eErr
}
if err := target.send(eventData); err != nil {
return err
}
// Delete the event from store.
return target.store.Del(eventKey)
}
// Close - closes underneath connections to NSQD server.
func (target *NSQTarget) Close() (err error) {
if target.producer != nil {
// this blocks until complete:
target.producer.Stop()
}
return nil
}
// NewNSQTarget - creates new NSQ target.
func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*NSQTarget, error) {
config := nsq.NewConfig()
if args.TLS.Enable {
config.TlsV1 = true
config.TlsConfig = &tls.Config{
InsecureSkipVerify: args.TLS.SkipVerify,
}
}
var store Store
target := &NSQTarget{
id: event.TargetID{ID: id, Name: "nsq"},
args: args,
config: config,
loggerOnce: loggerOnce,
}
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-nsq-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
target.loggerOnce(context.Background(), oErr, target.ID())
return target, oErr
}
target.store = store
}
producer, err := nsq.NewProducer(args.NSQDAddress.String(), config)
if err != nil {
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
target.producer = producer
if err := target.producer.Ping(); err != nil {
// To treat "connection refused" errors as errNotConnected.
if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil
}

View File

@@ -0,0 +1,98 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package target
import (
"testing"
xnet "github.com/minio/minio/internal/net"
)
func TestNSQArgs_Validate(t *testing.T) {
type fields struct {
Enable bool
NSQDAddress xnet.Host
Topic string
TLS struct {
Enable bool
SkipVerify bool
}
}
tests := []struct {
name string
fields fields
wantErr bool
}{
{
name: "test1_missing_topic",
fields: fields{
Enable: true,
NSQDAddress: xnet.Host{
Name: "127.0.0.1",
Port: 4150,
IsPortSet: true,
},
Topic: "",
},
wantErr: true,
},
{
name: "test2_disabled",
fields: fields{
Enable: false,
NSQDAddress: xnet.Host{},
Topic: "topic",
},
wantErr: false,
},
{
name: "test3_OK",
fields: fields{
Enable: true,
NSQDAddress: xnet.Host{
Name: "127.0.0.1",
Port: 4150,
IsPortSet: true,
},
Topic: "topic",
},
wantErr: false,
},
{
name: "test4_emptynsqdaddr",
fields: fields{
Enable: true,
NSQDAddress: xnet.Host{},
Topic: "topic",
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
n := NSQArgs{
Enable: tt.fields.Enable,
NSQDAddress: tt.fields.NSQDAddress,
Topic: tt.fields.Topic,
}
if err := n.Validate(); (err != nil) != tt.wantErr {
t.Errorf("NSQArgs.Validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

View File

@@ -0,0 +1,408 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package target
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"
_ "github.com/lib/pq" // Register postgres driver
"github.com/minio/minio/internal/event"
xnet "github.com/minio/minio/internal/net"
)
const (
psqlTableExists = `SELECT 1 FROM %s;`
psqlCreateNamespaceTable = `CREATE TABLE %s (key VARCHAR PRIMARY KEY, value JSONB);`
psqlCreateAccessTable = `CREATE TABLE %s (event_time TIMESTAMP WITH TIME ZONE NOT NULL, event_data JSONB);`
psqlUpdateRow = `INSERT INTO %s (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value;`
psqlDeleteRow = `DELETE FROM %s WHERE key = $1;`
psqlInsertRow = `INSERT INTO %s (event_time, event_data) VALUES ($1, $2);`
)
// Postgres constants
const (
PostgresFormat = "format"
PostgresConnectionString = "connection_string"
PostgresTable = "table"
PostgresHost = "host"
PostgresPort = "port"
PostgresUsername = "username"
PostgresPassword = "password"
PostgresDatabase = "database"
PostgresQueueDir = "queue_dir"
PostgresQueueLimit = "queue_limit"
PostgresMaxOpenConnections = "max_open_connections"
EnvPostgresEnable = "MINIO_NOTIFY_POSTGRES_ENABLE"
EnvPostgresFormat = "MINIO_NOTIFY_POSTGRES_FORMAT"
EnvPostgresConnectionString = "MINIO_NOTIFY_POSTGRES_CONNECTION_STRING"
EnvPostgresTable = "MINIO_NOTIFY_POSTGRES_TABLE"
EnvPostgresHost = "MINIO_NOTIFY_POSTGRES_HOST"
EnvPostgresPort = "MINIO_NOTIFY_POSTGRES_PORT"
EnvPostgresUsername = "MINIO_NOTIFY_POSTGRES_USERNAME"
EnvPostgresPassword = "MINIO_NOTIFY_POSTGRES_PASSWORD"
EnvPostgresDatabase = "MINIO_NOTIFY_POSTGRES_DATABASE"
EnvPostgresQueueDir = "MINIO_NOTIFY_POSTGRES_QUEUE_DIR"
EnvPostgresQueueLimit = "MINIO_NOTIFY_POSTGRES_QUEUE_LIMIT"
EnvPostgresMaxOpenConnections = "MINIO_NOTIFY_POSTGRES_MAX_OPEN_CONNECTIONS"
)
// PostgreSQLArgs - PostgreSQL target arguments.
type PostgreSQLArgs struct {
Enable bool `json:"enable"`
Format string `json:"format"`
ConnectionString string `json:"connectionString"`
Table string `json:"table"`
Host xnet.Host `json:"host"` // default: localhost
Port string `json:"port"` // default: 5432
Username string `json:"username"` // default: user running minio
Password string `json:"password"` // default: no password
Database string `json:"database"` // default: same as user
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
MaxOpenConnections int `json:"maxOpenConnections"`
}
// Validate PostgreSQLArgs fields
func (p PostgreSQLArgs) Validate() error {
if !p.Enable {
return nil
}
if p.Table == "" {
return fmt.Errorf("empty table name")
}
if p.Format != "" {
f := strings.ToLower(p.Format)
if f != event.NamespaceFormat && f != event.AccessFormat {
return fmt.Errorf("unrecognized format value")
}
}
if p.ConnectionString != "" {
// No pq API doesn't help to validate connection string
// prior connection, so no validation for now.
} else {
// Some fields need to be specified when ConnectionString is unspecified
if p.Port == "" {
return fmt.Errorf("unspecified port")
}
if _, err := strconv.Atoi(p.Port); err != nil {
return fmt.Errorf("invalid port")
}
if p.Database == "" {
return fmt.Errorf("database unspecified")
}
}
if p.QueueDir != "" {
if !filepath.IsAbs(p.QueueDir) {
return errors.New("queueDir path should be absolute")
}
}
if p.MaxOpenConnections < 0 {
return errors.New("maxOpenConnections cannot be less than zero")
}
return nil
}
// PostgreSQLTarget - PostgreSQL target.
type PostgreSQLTarget struct {
id event.TargetID
args PostgreSQLArgs
updateStmt *sql.Stmt
deleteStmt *sql.Stmt
insertStmt *sql.Stmt
db *sql.DB
store Store
firstPing bool
connString string
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
}
// ID - returns target ID.
func (target *PostgreSQLTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *PostgreSQLTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *PostgreSQLTarget) IsActive() (bool, error) {
if target.db == nil {
db, err := sql.Open("postgres", target.connString)
if err != nil {
return false, err
}
target.db = db
if target.args.MaxOpenConnections > 0 {
// Set the maximum connections limit
target.db.SetMaxOpenConns(target.args.MaxOpenConnections)
}
}
if err := target.db.Ping(); err != nil {
if IsConnErr(err) {
return false, errNotConnected
}
return false, err
}
return true, nil
}
// Save - saves the events to the store if questore is configured, which will be replayed when the PostgreSQL connection is active.
func (target *PostgreSQLTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.IsActive()
if err != nil {
return err
}
return target.send(eventData)
}
// IsConnErr - To detect a connection error.
func IsConnErr(err error) bool {
return IsConnRefusedErr(err) || err.Error() == "sql: database is closed" || err.Error() == "sql: statement is closed" || err.Error() == "invalid connection"
}
// send - sends an event to the PostgreSQL.
func (target *PostgreSQLTarget) send(eventData event.Event) error {
if target.args.Format == event.NamespaceFormat {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
if eventData.EventName == event.ObjectRemovedDelete {
_, err = target.deleteStmt.Exec(key)
} else {
var data []byte
if data, err = json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}}); err != nil {
return err
}
_, err = target.updateStmt.Exec(key, data)
}
return err
}
if target.args.Format == event.AccessFormat {
eventTime, err := time.Parse(event.AMZTimeFormat, eventData.EventTime)
if err != nil {
return err
}
data, err := json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}})
if err != nil {
return err
}
if _, err = target.insertStmt.Exec(eventTime, data); err != nil {
return err
}
}
return nil
}
// Send - reads an event from store and sends it to PostgreSQL.
func (target *PostgreSQLTarget) Send(eventKey string) error {
_, err := target.IsActive()
if err != nil {
return err
}
if !target.firstPing {
if err := target.executeStmts(); err != nil {
if IsConnErr(err) {
return errNotConnected
}
return err
}
}
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os.IsNotExist(eErr) {
return nil
}
return eErr
}
if err := target.send(eventData); err != nil {
if IsConnErr(err) {
return errNotConnected
}
return err
}
// Delete the event from store.
return target.store.Del(eventKey)
}
// Close - closes underneath connections to PostgreSQL database.
func (target *PostgreSQLTarget) Close() error {
if target.updateStmt != nil {
// FIXME: log returned error. ignore time being.
_ = target.updateStmt.Close()
}
if target.deleteStmt != nil {
// FIXME: log returned error. ignore time being.
_ = target.deleteStmt.Close()
}
if target.insertStmt != nil {
// FIXME: log returned error. ignore time being.
_ = target.insertStmt.Close()
}
return target.db.Close()
}
// Executes the table creation statements.
func (target *PostgreSQLTarget) executeStmts() error {
_, err := target.db.Exec(fmt.Sprintf(psqlTableExists, target.args.Table))
if err != nil {
createStmt := psqlCreateNamespaceTable
if target.args.Format == event.AccessFormat {
createStmt = psqlCreateAccessTable
}
if _, dbErr := target.db.Exec(fmt.Sprintf(createStmt, target.args.Table)); dbErr != nil {
return dbErr
}
}
switch target.args.Format {
case event.NamespaceFormat:
// insert or update statement
if target.updateStmt, err = target.db.Prepare(fmt.Sprintf(psqlUpdateRow, target.args.Table)); err != nil {
return err
}
// delete statement
if target.deleteStmt, err = target.db.Prepare(fmt.Sprintf(psqlDeleteRow, target.args.Table)); err != nil {
return err
}
case event.AccessFormat:
// insert statement
if target.insertStmt, err = target.db.Prepare(fmt.Sprintf(psqlInsertRow, target.args.Table)); err != nil {
return err
}
}
return nil
}
// NewPostgreSQLTarget - creates new PostgreSQL target.
func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*PostgreSQLTarget, error) {
params := []string{args.ConnectionString}
if args.ConnectionString == "" {
params = []string{}
if !args.Host.IsEmpty() {
params = append(params, "host="+args.Host.String())
}
if args.Port != "" {
params = append(params, "port="+args.Port)
}
if args.Username != "" {
params = append(params, "username="+args.Username)
}
if args.Password != "" {
params = append(params, "password="+args.Password)
}
if args.Database != "" {
params = append(params, "dbname="+args.Database)
}
}
connStr := strings.Join(params, " ")
target := &PostgreSQLTarget{
id: event.TargetID{ID: id, Name: "postgresql"},
args: args,
firstPing: false,
connString: connStr,
loggerOnce: loggerOnce,
}
db, err := sql.Open("postgres", connStr)
if err != nil {
return target, err
}
target.db = db
if args.MaxOpenConnections > 0 {
// Set the maximum connections limit
target.db.SetMaxOpenConns(args.MaxOpenConnections)
}
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-postgresql-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
target.loggerOnce(context.Background(), oErr, target.ID())
return target, oErr
}
target.store = store
}
err = target.db.Ping()
if err != nil {
if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
} else {
if err = target.executeStmts(); err != nil {
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
target.firstPing = true
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil
}

View File

@@ -0,0 +1,38 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package target
import (
"database/sql"
"testing"
)
// TestPostgreSQLRegistration checks if postgres driver
// is registered and fails otherwise.
func TestPostgreSQLRegistration(t *testing.T) {
var found bool
for _, drv := range sql.Drivers() {
if drv == "postgres" {
found = true
break
}
}
if !found {
t.Fatal("postgres driver not registered")
}
}

View File

@@ -0,0 +1,207 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package target
import (
"encoding/json"
"io/ioutil"
"math"
"os"
"path/filepath"
"sort"
"sync"
"github.com/minio/minio/internal/event"
"github.com/minio/pkg/sys"
)
const (
defaultLimit = 100000 // Default store limit.
eventExt = ".event"
)
// QueueStore - Filestore for persisting events.
type QueueStore struct {
sync.RWMutex
currentEntries uint64
entryLimit uint64
directory string
}
// NewQueueStore - Creates an instance for QueueStore.
func NewQueueStore(directory string, limit uint64) Store {
if limit == 0 {
limit = defaultLimit
_, maxRLimit, err := sys.GetMaxOpenFileLimit()
if err == nil {
// Limit the maximum number of entries
// to maximum open file limit
if maxRLimit < limit {
limit = maxRLimit
}
}
}
return &QueueStore{
directory: directory,
entryLimit: limit,
}
}
// Open - Creates the directory if not present.
func (store *QueueStore) Open() error {
store.Lock()
defer store.Unlock()
if err := os.MkdirAll(store.directory, os.FileMode(0770)); err != nil {
return err
}
names, err := store.list()
if err != nil {
return err
}
currentEntries := uint64(len(names))
if currentEntries >= store.entryLimit {
return errLimitExceeded
}
store.currentEntries = currentEntries
return nil
}
// write - writes event to the directory.
func (store *QueueStore) write(key string, e event.Event) error {
// Marshalls the event.
eventData, err := json.Marshal(e)
if err != nil {
return err
}
path := filepath.Join(store.directory, key+eventExt)
if err := ioutil.WriteFile(path, eventData, os.FileMode(0770)); err != nil {
return err
}
// Increment the event count.
store.currentEntries++
return nil
}
// Put - puts a event to the store.
func (store *QueueStore) Put(e event.Event) error {
store.Lock()
defer store.Unlock()
if store.currentEntries >= store.entryLimit {
return errLimitExceeded
}
key, err := getNewUUID()
if err != nil {
return err
}
return store.write(key, e)
}
// Get - gets a event from the store.
func (store *QueueStore) Get(key string) (event event.Event, err error) {
store.RLock()
defer func(store *QueueStore) {
store.RUnlock()
if err != nil {
// Upon error we remove the entry.
store.Del(key)
}
}(store)
var eventData []byte
eventData, err = ioutil.ReadFile(filepath.Join(store.directory, key+eventExt))
if err != nil {
return event, err
}
if len(eventData) == 0 {
return event, os.ErrNotExist
}
if err = json.Unmarshal(eventData, &event); err != nil {
return event, err
}
return event, nil
}
// Del - Deletes an entry from the store.
func (store *QueueStore) Del(key string) error {
store.Lock()
defer store.Unlock()
return store.del(key)
}
// lockless call
func (store *QueueStore) del(key string) error {
if err := os.Remove(filepath.Join(store.directory, key+eventExt)); err != nil {
return err
}
// Decrement the current entries count.
store.currentEntries--
// Current entries can underflow, when multiple
// events are being pushed in parallel, this code
// is needed to ensure that we don't underflow.
//
// queueStore replayEvents is not serialized,
// this code is needed to protect us under
// such situations.
if store.currentEntries == math.MaxUint64 {
store.currentEntries = 0
}
return nil
}
// List - lists all files from the directory.
func (store *QueueStore) List() ([]string, error) {
store.RLock()
defer store.RUnlock()
return store.list()
}
// list lock less.
func (store *QueueStore) list() ([]string, error) {
var names []string
files, err := ioutil.ReadDir(store.directory)
if err != nil {
return names, err
}
// Sort the dentries.
sort.Slice(files, func(i, j int) bool {
return files[i].ModTime().Before(files[j].ModTime())
})
for _, file := range files {
names = append(names, file.Name())
}
return names, nil
}

View File

@@ -0,0 +1,214 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package target
import (
"os"
"path/filepath"
"reflect"
"strings"
"testing"
"github.com/minio/minio/internal/event"
)
// TestDir
var queueDir = filepath.Join(os.TempDir(), "minio_test")
// Sample test event.
var testEvent = event.Event{EventVersion: "1.0", EventSource: "test_source", AwsRegion: "test_region", EventTime: "test_time", EventName: event.ObjectAccessedGet}
// Initialize the store.
func setUpStore(directory string, limit uint64) (Store, error) {
store := NewQueueStore(queueDir, limit)
if oErr := store.Open(); oErr != nil {
return nil, oErr
}
return store, nil
}
// Tear down store
func tearDownStore() error {
return os.RemoveAll(queueDir)
}
// TestQueueStorePut - tests for store.Put
func TestQueueStorePut(t *testing.T) {
defer func() {
if err := tearDownStore(); err != nil {
t.Fatal("Failed to tear down store ", err)
}
}()
store, err := setUpStore(queueDir, 100)
if err != nil {
t.Fatal("Failed to create a queue store ", err)
}
// Put 100 events.
for i := 0; i < 100; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
// Count the events.
names, err := store.List()
if err != nil {
t.Fatal(err)
}
if len(names) != 100 {
t.Fatalf("List() Expected: 100, got %d", len(names))
}
}
// TestQueueStoreGet - tests for store.Get
func TestQueueStoreGet(t *testing.T) {
defer func() {
if err := tearDownStore(); err != nil {
t.Fatal("Failed to tear down store ", err)
}
}()
store, err := setUpStore(queueDir, 10)
if err != nil {
t.Fatal("Failed to create a queue store ", err)
}
// Put 10 events
for i := 0; i < 10; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
eventKeys, err := store.List()
if err != nil {
t.Fatal(err)
}
// Get 10 events.
if len(eventKeys) == 10 {
for _, key := range eventKeys {
event, eErr := store.Get(strings.TrimSuffix(key, eventExt))
if eErr != nil {
t.Fatal("Failed to Get the event from the queue store ", eErr)
}
if !reflect.DeepEqual(testEvent, event) {
t.Fatalf("Failed to read the event: error: expected = %v, got = %v", testEvent, event)
}
}
} else {
t.Fatalf("List() Expected: 10, got %d", len(eventKeys))
}
}
// TestQueueStoreDel - tests for store.Del
func TestQueueStoreDel(t *testing.T) {
defer func() {
if err := tearDownStore(); err != nil {
t.Fatal("Failed to tear down store ", err)
}
}()
store, err := setUpStore(queueDir, 20)
if err != nil {
t.Fatal("Failed to create a queue store ", err)
}
// Put 20 events.
for i := 0; i < 20; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
eventKeys, err := store.List()
if err != nil {
t.Fatal(err)
}
// Remove all the events.
if len(eventKeys) == 20 {
for _, key := range eventKeys {
err := store.Del(strings.TrimSuffix(key, eventExt))
if err != nil {
t.Fatal("queue store Del failed with ", err)
}
}
} else {
t.Fatalf("List() Expected: 20, got %d", len(eventKeys))
}
names, err := store.List()
if err != nil {
t.Fatal(err)
}
if len(names) != 0 {
t.Fatalf("List() Expected: 0, got %d", len(names))
}
}
// TestQueueStoreLimit - tests the event limit for the store.
func TestQueueStoreLimit(t *testing.T) {
defer func() {
if err := tearDownStore(); err != nil {
t.Fatal("Failed to tear down store ", err)
}
}()
// The max limit is set to 5.
store, err := setUpStore(queueDir, 5)
if err != nil {
t.Fatal("Failed to create a queue store ", err)
}
for i := 0; i < 5; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
// Should not allow 6th Put.
if err := store.Put(testEvent); err == nil {
t.Fatalf("Expected to fail with %s, but passes", errLimitExceeded)
}
}
// TestQueueStoreLimit - tests for store.LimitN.
func TestQueueStoreListN(t *testing.T) {
defer func() {
if err := tearDownStore(); err != nil {
t.Fatal("Failed to tear down store ", err)
}
}()
store, err := setUpStore(queueDir, 10)
if err != nil {
t.Fatal("Failed to create a queue store ", err)
}
for i := 0; i < 10; i++ {
if err := store.Put(testEvent); err != nil {
t.Fatal("Failed to put to queue store ", err)
}
}
// Should return all the event keys in the store.
names, err := store.List()
if err != nil {
t.Fatal(err)
}
if len(names) != 10 {
t.Fatalf("List() Expected: 10, got %d", len(names))
}
if err = os.RemoveAll(queueDir); err != nil {
t.Fatal(err)
}
_, err = store.List()
if !os.IsNotExist(err) {
t.Fatalf("Expected List() to fail with os.ErrNotExist, %s", err)
}
}

View File

@@ -0,0 +1,343 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package target
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"path/filepath"
"strings"
"time"
"github.com/gomodule/redigo/redis"
"github.com/minio/minio/internal/event"
xnet "github.com/minio/minio/internal/net"
)
// Redis constants
const (
RedisFormat = "format"
RedisAddress = "address"
RedisPassword = "password"
RedisKey = "key"
RedisQueueDir = "queue_dir"
RedisQueueLimit = "queue_limit"
EnvRedisEnable = "MINIO_NOTIFY_REDIS_ENABLE"
EnvRedisFormat = "MINIO_NOTIFY_REDIS_FORMAT"
EnvRedisAddress = "MINIO_NOTIFY_REDIS_ADDRESS"
EnvRedisPassword = "MINIO_NOTIFY_REDIS_PASSWORD"
EnvRedisKey = "MINIO_NOTIFY_REDIS_KEY"
EnvRedisQueueDir = "MINIO_NOTIFY_REDIS_QUEUE_DIR"
EnvRedisQueueLimit = "MINIO_NOTIFY_REDIS_QUEUE_LIMIT"
)
// RedisArgs - Redis target arguments.
type RedisArgs struct {
Enable bool `json:"enable"`
Format string `json:"format"`
Addr xnet.Host `json:"address"`
Password string `json:"password"`
Key string `json:"key"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
}
// RedisAccessEvent holds event log data and timestamp
type RedisAccessEvent struct {
Event []event.Event
EventTime string
}
// Validate RedisArgs fields
func (r RedisArgs) Validate() error {
if !r.Enable {
return nil
}
if r.Format != "" {
f := strings.ToLower(r.Format)
if f != event.NamespaceFormat && f != event.AccessFormat {
return fmt.Errorf("unrecognized format")
}
}
if r.Key == "" {
return fmt.Errorf("empty key")
}
if r.QueueDir != "" {
if !filepath.IsAbs(r.QueueDir) {
return errors.New("queueDir path should be absolute")
}
}
return nil
}
func (r RedisArgs) validateFormat(c redis.Conn) error {
typeAvailable, err := redis.String(c.Do("TYPE", r.Key))
if err != nil {
return err
}
if typeAvailable != "none" {
expectedType := "hash"
if r.Format == event.AccessFormat {
expectedType = "list"
}
if typeAvailable != expectedType {
return fmt.Errorf("expected type %v does not match with available type %v", expectedType, typeAvailable)
}
}
return nil
}
// RedisTarget - Redis target.
type RedisTarget struct {
id event.TargetID
args RedisArgs
pool *redis.Pool
store Store
firstPing bool
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
}
// ID - returns target ID.
func (target *RedisTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *RedisTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *RedisTarget) IsActive() (bool, error) {
conn := target.pool.Get()
defer func() {
cErr := conn.Close()
target.loggerOnce(context.Background(), cErr, target.ID())
}()
_, pingErr := conn.Do("PING")
if pingErr != nil {
if IsConnRefusedErr(pingErr) {
return false, errNotConnected
}
return false, pingErr
}
return true, nil
}
// Save - saves the events to the store if questore is configured, which will be replayed when the redis connection is active.
func (target *RedisTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
_, err := target.IsActive()
if err != nil {
return err
}
return target.send(eventData)
}
// send - sends an event to the redis.
func (target *RedisTarget) send(eventData event.Event) error {
conn := target.pool.Get()
defer func() {
cErr := conn.Close()
target.loggerOnce(context.Background(), cErr, target.ID())
}()
if target.args.Format == event.NamespaceFormat {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
if eventData.EventName == event.ObjectRemovedDelete {
_, err = conn.Do("HDEL", target.args.Key, key)
} else {
var data []byte
if data, err = json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}}); err != nil {
return err
}
_, err = conn.Do("HSET", target.args.Key, key, data)
}
if err != nil {
return err
}
}
if target.args.Format == event.AccessFormat {
data, err := json.Marshal([]RedisAccessEvent{{Event: []event.Event{eventData}, EventTime: eventData.EventTime}})
if err != nil {
return err
}
if _, err := conn.Do("RPUSH", target.args.Key, data); err != nil {
return err
}
}
return nil
}
// Send - reads an event from store and sends it to redis.
func (target *RedisTarget) Send(eventKey string) error {
conn := target.pool.Get()
defer func() {
cErr := conn.Close()
target.loggerOnce(context.Background(), cErr, target.ID())
}()
_, pingErr := conn.Do("PING")
if pingErr != nil {
if IsConnRefusedErr(pingErr) {
return errNotConnected
}
return pingErr
}
if !target.firstPing {
if err := target.args.validateFormat(conn); err != nil {
if IsConnRefusedErr(err) {
return errNotConnected
}
return err
}
target.firstPing = true
}
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and would've been already been sent successfully.
if os.IsNotExist(eErr) {
return nil
}
return eErr
}
if err := target.send(eventData); err != nil {
if IsConnRefusedErr(err) {
return errNotConnected
}
return err
}
// Delete the event from store.
return target.store.Del(eventKey)
}
// Close - releases the resources used by the pool.
func (target *RedisTarget) Close() error {
return target.pool.Close()
}
// NewRedisTarget - creates new Redis target.
func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}), test bool) (*RedisTarget, error) {
pool := &redis.Pool{
MaxIdle: 3,
IdleTimeout: 2 * 60 * time.Second,
Dial: func() (redis.Conn, error) {
conn, err := redis.Dial("tcp", args.Addr.String())
if err != nil {
return nil, err
}
if args.Password != "" {
if _, err = conn.Do("AUTH", args.Password); err != nil {
cErr := conn.Close()
targetID := event.TargetID{ID: id, Name: "redis"}
loggerOnce(context.Background(), cErr, targetID)
return nil, err
}
}
// Must be done after AUTH
if _, err = conn.Do("CLIENT", "SETNAME", "MinIO"); err != nil {
cErr := conn.Close()
targetID := event.TargetID{ID: id, Name: "redis"}
loggerOnce(context.Background(), cErr, targetID)
return nil, err
}
return conn, nil
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
var store Store
target := &RedisTarget{
id: event.TargetID{ID: id, Name: "redis"},
args: args,
pool: pool,
loggerOnce: loggerOnce,
}
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-redis-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
target.loggerOnce(context.Background(), oErr, target.ID())
return target, oErr
}
target.store = store
}
conn := target.pool.Get()
defer func() {
cErr := conn.Close()
target.loggerOnce(context.Background(), cErr, target.ID())
}()
_, pingErr := conn.Do("PING")
if pingErr != nil {
if target.store == nil || !(IsConnRefusedErr(pingErr) || IsConnResetErr(pingErr)) {
target.loggerOnce(context.Background(), pingErr, target.ID())
return target, pingErr
}
} else {
if err := target.args.validateFormat(conn); err != nil {
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
target.firstPing = true
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil
}

View File

@@ -0,0 +1,144 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package target
import (
"context"
"errors"
"fmt"
"strings"
"syscall"
"time"
"github.com/minio/minio/internal/event"
)
const retryInterval = 3 * time.Second
// errNotConnected - indicates that the target connection is not active.
var errNotConnected = errors.New("not connected to target server/service")
// errLimitExceeded error is sent when the maximum limit is reached.
var errLimitExceeded = errors.New("the maximum store limit reached")
// Store - To persist the events.
type Store interface {
Put(event event.Event) error
Get(key string) (event.Event, error)
List() ([]string, error)
Del(key string) error
Open() error
}
// replayEvents - Reads the events from the store and replays.
func replayEvents(store Store, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), id event.TargetID) <-chan string {
eventKeyCh := make(chan string)
go func() {
retryTicker := time.NewTicker(retryInterval)
defer retryTicker.Stop()
defer close(eventKeyCh)
for {
names, err := store.List()
if err == nil {
for _, name := range names {
select {
case eventKeyCh <- strings.TrimSuffix(name, eventExt):
// Get next key.
case <-doneCh:
return
}
}
}
if len(names) < 2 {
select {
case <-retryTicker.C:
if err != nil {
loggerOnce(context.Background(),
fmt.Errorf("store.List() failed '%w'", err), id)
}
case <-doneCh:
return
}
}
}
}()
return eventKeyCh
}
// IsConnRefusedErr - To check fot "connection refused" error.
func IsConnRefusedErr(err error) bool {
return errors.Is(err, syscall.ECONNREFUSED)
}
// IsConnResetErr - Checks for connection reset errors.
func IsConnResetErr(err error) bool {
if strings.Contains(err.Error(), "connection reset by peer") {
return true
}
// incase if error message is wrapped.
return errors.Is(err, syscall.ECONNRESET)
}
// sendEvents - Reads events from the store and re-plays.
func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) {
retryTicker := time.NewTicker(retryInterval)
defer retryTicker.Stop()
send := func(eventKey string) bool {
for {
err := target.Send(eventKey)
if err == nil {
break
}
if err != errNotConnected && !IsConnResetErr(err) {
loggerOnce(context.Background(),
fmt.Errorf("target.Send() failed with '%w'", err),
target.ID())
}
// Retrying after 3secs back-off
select {
case <-retryTicker.C:
case <-doneCh:
return false
}
}
return true
}
for {
select {
case eventKey, ok := <-eventKeyCh:
if !ok {
// closed channel.
return
}
if !send(eventKey) {
return
}
case <-doneCh:
return
}
}
}

View File

@@ -0,0 +1,13 @@
-----BEGIN CERTIFICATE-----
MIICCjCCAbGgAwIBAgIUKLFyLD0Ze9gR3A2aBxgEiT6MgZUwCgYIKoZIzj0EAwIw
GDEWMBQGA1UEAwwNTWluaW8gUm9vdCBDQTAeFw0yMDA5MTQxMzI0MzNaFw0zMDA5
MTIxMzI0MzNaMEIxCzAJBgNVBAYTAkNBMQ4wDAYDVQQKDAVNaW5JTzEPMA0GA1UE
CwwGQ2xpZW50MRIwEAYDVQQDDAlsb2NhbGhvc3QwWTATBgcqhkjOPQIBBggqhkjO
PQMBBwNCAARAhYrQXYbzeKyVSw8nf57gBphwFP1o5S7CjxoGKCfghzdhExKiEmbi
sK+FSS2YtltU7cM7L7AduLIbuEnGHHYQo4GuMIGrMAkGA1UdEwQCMAAwUwYDVR0j
BEwwSoAUWN6Fr30E5vvvNOBkuGGkqGzA3SihHKQaMBgxFjAUBgNVBAMMDU1pbmlv
IFJvb3QgQ0GCFHiTsAON45VvwFb0MxHEdLPeWi95MA4GA1UdDwEB/wQEAwIFoDAd
BgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwGgYDVR0RBBMwEYcEfwAAAYIJ
bG9jYWxob3N0MAoGCCqGSM49BAMCA0cAMEQCIC7MHOEf0C/zqw/ZOaCffeJIMeFm
iT8ugBfhFbgGkd5YAiBz9FEfV4JMZQ4N29WLmvxxDSxkL8g5e3fnIK8Aa4excw==
-----END CERTIFICATE-----

View File

@@ -0,0 +1,5 @@
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIBluB2BuspJcz1e58rnXpQEx48/ZwNmygNw06NbdTZDroAoGCCqGSM49
AwEHoUQDQgAEQIWK0F2G83islUsPJ3+e4AaYcBT9aOUuwo8aBign4Ic3YRMSohJm
4rCvhUktmLZbVO3DOy+wHbiyG7hJxhx2EA==
-----END EC PRIVATE KEY-----

View File

@@ -0,0 +1,12 @@
-----BEGIN CERTIFICATE-----
MIIByTCCAW+gAwIBAgIUdAg80BTm1El7s5ZZezgjsls9BwkwCgYIKoZIzj0EAwIw
GDEWMBQGA1UEAwwNTWluaW8gUm9vdCBDQTAeFw0yMDA5MTQxMjQzMjNaFw0zMDA5
MTIxMjQzMjNaMAAwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAASolKUI7FVSA2Ts
+GSW/DHDKNczDNjfccI2GLETso6ie8buveOODj1JIL9ff5pRDN+U6QvwwlDmXEqh
1a6XBI4Ho4GuMIGrMAkGA1UdEwQCMAAwUwYDVR0jBEwwSoAUWN6Fr30E5vvvNOBk
uGGkqGzA3SihHKQaMBgxFjAUBgNVBAMMDU1pbmlvIFJvb3QgQ0GCFHiTsAON45Vv
wFb0MxHEdLPeWi95MA4GA1UdDwEB/wQEAwIFoDAdBgNVHSUEFjAUBggrBgEFBQcD
AQYIKwYBBQUHAwIwGgYDVR0RBBMwEYcEfwAAAYIJbG9jYWxob3N0MAoGCCqGSM49
BAMCA0gAMEUCIB7WXnQAkmjw2QE6A3uOscOIctJnlVNREfm4V9CrF6UGAiEA734B
vKlhMk8H459BRoIp8GpOuUWqLqocSmMM1febvcg=
-----END CERTIFICATE-----

View File

@@ -0,0 +1,5 @@
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEILFuMS2xvsc/CsuqtSv3S2iSCcc28rZsg1wpR2kirXFloAoGCCqGSM49
AwEHoUQDQgAEqJSlCOxVUgNk7PhklvwxwyjXMwzY33HCNhixE7KOonvG7r3jjg49
SSC/X3+aUQzflOkL8MJQ5lxKodWulwSOBw==
-----END EC PRIVATE KEY-----

View File

@@ -0,0 +1,11 @@
-----BEGIN CERTIFICATE-----
MIIBlTCCATygAwIBAgIUeJOwA43jlW/AVvQzEcR0s95aL3kwCgYIKoZIzj0EAwIw
GDEWMBQGA1UEAwwNTWluaW8gUm9vdCBDQTAeFw0yMDA5MTQxMjMwMDJaFw0zMDA5
MTIxMjMwMDJaMBgxFjAUBgNVBAMMDU1pbmlvIFJvb3QgQ0EwWTATBgcqhkjOPQIB
BggqhkjOPQMBBwNCAARK9fVNGHc1h5B5fpOMyEdyhh18xNNcNUGQ5iGLO97Z0KtK
5vRlDeeE1I0SaJgqppm9OEHw32JU0HMi4FBZi2Rso2QwYjAdBgNVHQ4EFgQUWN6F
r30E5vvvNOBkuGGkqGzA3SgwHwYDVR0jBBgwFoAUWN6Fr30E5vvvNOBkuGGkqGzA
3SgwDwYDVR0TAQH/BAUwAwEB/zAPBgNVHREECDAGhwR/AAABMAoGCCqGSM49BAMC
A0cAMEQCIDPOiks2Vs3RmuJZl5HHjuqaFSOAp1g7pZpMb3Qrh9YDAiAtjO2xOpkS
WynK8P7EfyQP/IUa7GxJIoHk6/H/TCsYvQ==
-----END CERTIFICATE-----

View File

@@ -0,0 +1,5 @@
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIB8tAGuc9FP4XbYqMP67TKgjL7OTrACGgEmTf+zMvYRhoAoGCCqGSM49
AwEHoUQDQgAESvX1TRh3NYeQeX6TjMhHcoYdfMTTXDVBkOYhizve2dCrSub0ZQ3n
hNSNEmiYKqaZvThB8N9iVNBzIuBQWYtkbA==
-----END EC PRIVATE KEY-----

View File

@@ -0,0 +1,7 @@
port: 14225
net: localhost
tls {
cert_file: "./testdata/contrib/certs/nats_server_cert.pem"
key_file: "./testdata/contrib/certs/nats_server_key.pem"
}

View File

@@ -0,0 +1,18 @@
port: 14226
net: localhost
tls {
cert_file: "./testdata/contrib/certs/nats_server_cert.pem"
key_file: "./testdata/contrib/certs/nats_server_key.pem"
ca_file: "./testdata/contrib/certs/root_ca_cert.pem"
verify_and_map: true
}
authorization {
ADMIN = {
publish = ">"
subscribe = ">"
}
users = [
{user: "CN=localhost,OU=Client,O=MinIO,C=CA", permissions: $ADMIN}
]
}

View File

@@ -0,0 +1,262 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package target
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path/filepath"
"time"
"github.com/minio/minio/internal/event"
xnet "github.com/minio/minio/internal/net"
"github.com/minio/pkg/certs"
)
// Webhook constants
const (
WebhookEndpoint = "endpoint"
WebhookAuthToken = "auth_token"
WebhookQueueDir = "queue_dir"
WebhookQueueLimit = "queue_limit"
WebhookClientCert = "client_cert"
WebhookClientKey = "client_key"
EnvWebhookEnable = "MINIO_NOTIFY_WEBHOOK_ENABLE"
EnvWebhookEndpoint = "MINIO_NOTIFY_WEBHOOK_ENDPOINT"
EnvWebhookAuthToken = "MINIO_NOTIFY_WEBHOOK_AUTH_TOKEN"
EnvWebhookQueueDir = "MINIO_NOTIFY_WEBHOOK_QUEUE_DIR"
EnvWebhookQueueLimit = "MINIO_NOTIFY_WEBHOOK_QUEUE_LIMIT"
EnvWebhookClientCert = "MINIO_NOTIFY_WEBHOOK_CLIENT_CERT"
EnvWebhookClientKey = "MINIO_NOTIFY_WEBHOOK_CLIENT_KEY"
)
// WebhookArgs - Webhook target arguments.
type WebhookArgs struct {
Enable bool `json:"enable"`
Endpoint xnet.URL `json:"endpoint"`
AuthToken string `json:"authToken"`
Transport *http.Transport `json:"-"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
ClientCert string `json:"clientCert"`
ClientKey string `json:"clientKey"`
}
// Validate WebhookArgs fields
func (w WebhookArgs) Validate() error {
if !w.Enable {
return nil
}
if w.Endpoint.IsEmpty() {
return errors.New("endpoint empty")
}
if w.QueueDir != "" {
if !filepath.IsAbs(w.QueueDir) {
return errors.New("queueDir path should be absolute")
}
}
if w.ClientCert != "" && w.ClientKey == "" || w.ClientCert == "" && w.ClientKey != "" {
return errors.New("cert and key must be specified as a pair")
}
return nil
}
// WebhookTarget - Webhook target.
type WebhookTarget struct {
id event.TargetID
args WebhookArgs
httpClient *http.Client
store Store
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
}
// ID - returns target ID.
func (target WebhookTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *WebhookTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *WebhookTarget) IsActive() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodHead, target.args.Endpoint.String(), nil)
if err != nil {
if xnet.IsNetworkOrHostDown(err, false) {
return false, errNotConnected
}
return false, err
}
resp, err := target.httpClient.Do(req)
if err != nil {
if xnet.IsNetworkOrHostDown(err, false) || errors.Is(err, context.DeadlineExceeded) {
return false, errNotConnected
}
return false, err
}
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
// No network failure i.e response from the target means its up
return true, nil
}
// Save - saves the events to the store if queuestore is configured, which will be replayed when the wenhook connection is active.
func (target *WebhookTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
err := target.send(eventData)
if err != nil {
if xnet.IsNetworkOrHostDown(err, false) {
return errNotConnected
}
}
return err
}
// send - sends an event to the webhook.
func (target *WebhookTarget) send(eventData event.Event) error {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
data, err := json.Marshal(event.Log{EventName: eventData.EventName, Key: key, Records: []event.Event{eventData}})
if err != nil {
return err
}
req, err := http.NewRequest("POST", target.args.Endpoint.String(), bytes.NewReader(data))
if err != nil {
return err
}
if target.args.AuthToken != "" {
req.Header.Set("Authorization", "Bearer "+target.args.AuthToken)
}
req.Header.Set("Content-Type", "application/json")
resp, err := target.httpClient.Do(req)
if err != nil {
target.Close()
return err
}
defer resp.Body.Close()
io.Copy(ioutil.Discard, resp.Body)
if resp.StatusCode < 200 || resp.StatusCode > 299 {
target.Close()
return fmt.Errorf("sending event failed with %v", resp.Status)
}
return nil
}
// Send - reads an event from store and sends it to webhook.
func (target *WebhookTarget) Send(eventKey string) error {
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and would've been already been sent successfully.
if os.IsNotExist(eErr) {
return nil
}
return eErr
}
if err := target.send(eventData); err != nil {
if xnet.IsNetworkOrHostDown(err, false) {
return errNotConnected
}
return err
}
// Delete the event from store.
return target.store.Del(eventKey)
}
// Close - does nothing and available for interface compatibility.
func (target *WebhookTarget) Close() error {
// Close idle connection with "keep-alive" states
target.httpClient.CloseIdleConnections()
return nil
}
// NewWebhookTarget - creates new Webhook target.
func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), transport *http.Transport, test bool) (*WebhookTarget, error) {
var store Store
target := &WebhookTarget{
id: event.TargetID{ID: id, Name: "webhook"},
args: args,
loggerOnce: loggerOnce,
}
if target.args.ClientCert != "" && target.args.ClientKey != "" {
manager, err := certs.NewManager(ctx, target.args.ClientCert, target.args.ClientKey, tls.LoadX509KeyPair)
if err != nil {
return target, err
}
transport.TLSClientConfig.GetClientCertificate = manager.GetClientCertificate
}
target.httpClient = &http.Client{Transport: transport}
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-webhook-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if err := store.Open(); err != nil {
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
target.store = store
}
_, err := target.IsActive()
if err != nil {
if target.store == nil || err != errNotConnected {
target.loggerOnce(ctx, err, target.ID())
return target, err
}
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, ctx.Done(), target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, ctx.Done(), target.loggerOnce)
}
return target, nil
}