mirror of
https://github.com/minio/minio.git
synced 2025-11-07 21:02:58 -05:00
use logger.LogOnce to reduce printing disconnection logs (#15408)
fixes #15334 - re-use net/url parsed value for http.Request{} - remove gosimple, structcheck and unusued due to https://github.com/golangci/golangci-lint/issues/2649 - unwrapErrs upto leafErr to ensure that we store exactly the correct errors
This commit is contained in:
@@ -29,6 +29,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/minio/minio/internal/event"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
xnet "github.com/minio/pkg/net"
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
@@ -115,7 +116,7 @@ type AMQPTarget struct {
|
||||
conn *amqp.Connection
|
||||
connMutex sync.Mutex
|
||||
store Store
|
||||
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
|
||||
loggerOnce logger.LogOnce
|
||||
}
|
||||
|
||||
// ID - returns TargetID.
|
||||
@@ -262,10 +263,7 @@ func (target *AMQPTarget) Save(eventData event.Event) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
cErr := ch.Close()
|
||||
target.loggerOnce(context.Background(), cErr, target.ID())
|
||||
}()
|
||||
defer ch.Close()
|
||||
|
||||
return target.send(eventData, ch, confirms)
|
||||
}
|
||||
@@ -276,10 +274,7 @@ func (target *AMQPTarget) Send(eventKey string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
cErr := ch.Close()
|
||||
target.loggerOnce(context.Background(), cErr, target.ID())
|
||||
}()
|
||||
defer ch.Close()
|
||||
|
||||
eventData, eErr := target.store.Get(eventKey)
|
||||
if eErr != nil {
|
||||
@@ -308,7 +303,7 @@ func (target *AMQPTarget) Close() error {
|
||||
}
|
||||
|
||||
// NewAMQPTarget - creates new AMQP target.
|
||||
func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}), test bool) (*AMQPTarget, error) {
|
||||
func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*AMQPTarget, error) {
|
||||
var conn *amqp.Connection
|
||||
var err error
|
||||
|
||||
@@ -324,7 +319,7 @@ func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce
|
||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-amqp-"+id)
|
||||
store = NewQueueStore(queueDir, args.QueueLimit)
|
||||
if oErr := store.Open(); oErr != nil {
|
||||
target.loggerOnce(context.Background(), oErr, target.ID())
|
||||
target.loggerOnce(context.Background(), oErr, target.ID().String())
|
||||
return target, oErr
|
||||
}
|
||||
target.store = store
|
||||
@@ -333,7 +328,7 @@ func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce
|
||||
conn, err = amqp.Dial(args.URL.String())
|
||||
if err != nil {
|
||||
if store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
|
||||
target.loggerOnce(context.Background(), err, target.ID())
|
||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||
return target, err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,6 +36,7 @@ import (
|
||||
elasticsearch7 "github.com/elastic/go-elasticsearch/v7"
|
||||
"github.com/minio/highwayhash"
|
||||
"github.com/minio/minio/internal/event"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
xnet "github.com/minio/pkg/net"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
@@ -156,7 +157,7 @@ type ElasticsearchTarget struct {
|
||||
args ElasticsearchArgs
|
||||
client esClient
|
||||
store Store
|
||||
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
|
||||
loggerOnce logger.LogOnce
|
||||
}
|
||||
|
||||
// ID - returns target ID.
|
||||
@@ -320,7 +321,7 @@ func (target *ElasticsearchTarget) checkAndInitClient(ctx context.Context) error
|
||||
}
|
||||
|
||||
// NewElasticsearchTarget - creates new Elasticsearch target.
|
||||
func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*ElasticsearchTarget, error) {
|
||||
func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*ElasticsearchTarget, error) {
|
||||
target := &ElasticsearchTarget{
|
||||
id: event.TargetID{ID: id, Name: "elasticsearch"},
|
||||
args: args,
|
||||
@@ -331,7 +332,7 @@ func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan str
|
||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-elasticsearch-"+id)
|
||||
target.store = NewQueueStore(queueDir, args.QueueLimit)
|
||||
if err := target.store.Open(); err != nil {
|
||||
target.loggerOnce(context.Background(), err, target.ID())
|
||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||
return target, err
|
||||
}
|
||||
}
|
||||
@@ -342,7 +343,7 @@ func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan str
|
||||
err := target.checkAndInitClient(ctx)
|
||||
if err != nil {
|
||||
if target.store == nil || err != errNotConnected {
|
||||
target.loggerOnce(context.Background(), err, target.ID())
|
||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||
return target, err
|
||||
}
|
||||
}
|
||||
@@ -492,11 +493,10 @@ func (c *esClientV7) removeEntry(ctx context.Context, index string, key string)
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
defer io.Copy(ioutil.Discard, res.Body)
|
||||
if res.IsError() {
|
||||
err := fmt.Errorf("Delete err: %s", res.String())
|
||||
return err
|
||||
return fmt.Errorf("Delete err: %s", res.String())
|
||||
}
|
||||
io.Copy(ioutil.Discard, res.Body)
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
@@ -522,11 +522,11 @@ func (c *esClientV7) updateEntry(ctx context.Context, index string, key string,
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
defer io.Copy(ioutil.Discard, res.Body)
|
||||
if res.IsError() {
|
||||
err := fmt.Errorf("Update err: %s", res.String())
|
||||
return err
|
||||
return fmt.Errorf("Update err: %s", res.String())
|
||||
}
|
||||
io.Copy(ioutil.Discard, res.Body)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -549,11 +549,10 @@ func (c *esClientV7) addEntry(ctx context.Context, index string, eventData event
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
defer io.Copy(ioutil.Discard, res.Body)
|
||||
if res.IsError() {
|
||||
err := fmt.Errorf("Add err: %s", res.String())
|
||||
return err
|
||||
return fmt.Errorf("Add err: %s", res.String())
|
||||
}
|
||||
io.Copy(ioutil.Discard, res.Body)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ import (
|
||||
"path/filepath"
|
||||
|
||||
"github.com/minio/minio/internal/event"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
xnet "github.com/minio/pkg/net"
|
||||
|
||||
sarama "github.com/Shopify/sarama"
|
||||
@@ -126,7 +127,7 @@ type KafkaTarget struct {
|
||||
producer sarama.SyncProducer
|
||||
config *sarama.Config
|
||||
store Store
|
||||
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
|
||||
loggerOnce logger.LogOnce
|
||||
}
|
||||
|
||||
// ID - returns target ID.
|
||||
@@ -251,7 +252,7 @@ func (k KafkaArgs) pingBrokers() bool {
|
||||
}
|
||||
|
||||
// NewKafkaTarget - creates new Kafka target with auth credentials.
|
||||
func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*KafkaTarget, error) {
|
||||
func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*KafkaTarget, error) {
|
||||
config := sarama.NewConfig()
|
||||
|
||||
target := &KafkaTarget{
|
||||
@@ -263,7 +264,7 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc
|
||||
if args.Version != "" {
|
||||
kafkaVersion, err := sarama.ParseKafkaVersion(args.Version)
|
||||
if err != nil {
|
||||
target.loggerOnce(context.Background(), err, target.ID())
|
||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||
return target, err
|
||||
}
|
||||
config.Version = kafkaVersion
|
||||
@@ -276,7 +277,7 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc
|
||||
|
||||
tlsConfig, err := saramatls.NewConfig(args.TLS.ClientTLSCert, args.TLS.ClientTLSKey)
|
||||
if err != nil {
|
||||
target.loggerOnce(context.Background(), err, target.ID())
|
||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||
return target, err
|
||||
}
|
||||
|
||||
@@ -303,7 +304,7 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc
|
||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-kafka-"+id)
|
||||
store = NewQueueStore(queueDir, args.QueueLimit)
|
||||
if oErr := store.Open(); oErr != nil {
|
||||
target.loggerOnce(context.Background(), oErr, target.ID())
|
||||
target.loggerOnce(context.Background(), oErr, target.ID().String())
|
||||
return target, oErr
|
||||
}
|
||||
target.store = store
|
||||
@@ -312,7 +313,7 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc
|
||||
producer, err := sarama.NewSyncProducer(brokers, config)
|
||||
if err != nil {
|
||||
if store == nil || err != sarama.ErrOutOfBrokers {
|
||||
target.loggerOnce(context.Background(), err, target.ID())
|
||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||
return target, err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ import (
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/minio/minio/internal/event"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
xnet "github.com/minio/pkg/net"
|
||||
)
|
||||
|
||||
@@ -111,7 +112,7 @@ type MQTTTarget struct {
|
||||
client mqtt.Client
|
||||
store Store
|
||||
quitCh chan struct{}
|
||||
loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})
|
||||
loggerOnce logger.LogOnce
|
||||
}
|
||||
|
||||
// ID - returns target ID.
|
||||
@@ -202,7 +203,7 @@ func (target *MQTTTarget) Close() error {
|
||||
}
|
||||
|
||||
// NewMQTTTarget - creates new MQTT target.
|
||||
func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*MQTTTarget, error) {
|
||||
func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*MQTTTarget, error) {
|
||||
if args.MaxReconnectInterval == 0 {
|
||||
// Default interval
|
||||
// https://github.com/eclipse/paho.mqtt.golang/blob/master/options.go#L115
|
||||
@@ -253,7 +254,7 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce
|
||||
target.loggerOnce(context.Background(),
|
||||
fmt.Errorf("Previous connect failed with %w attempting a reconnect",
|
||||
token.Error()),
|
||||
target.ID())
|
||||
target.ID().String())
|
||||
time.Sleep(reconnectInterval * time.Second)
|
||||
token = client.Connect()
|
||||
goto retry
|
||||
@@ -270,7 +271,7 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce
|
||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mqtt-"+id)
|
||||
target.store = NewQueueStore(queueDir, args.QueueLimit)
|
||||
if err := target.store.Open(); err != nil {
|
||||
target.loggerOnce(context.Background(), err, target.ID())
|
||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||
return target, err
|
||||
}
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ import (
|
||||
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"github.com/minio/minio/internal/event"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
xnet "github.com/minio/pkg/net"
|
||||
)
|
||||
|
||||
@@ -152,7 +153,7 @@ type MySQLTarget struct {
|
||||
db *sql.DB
|
||||
store Store
|
||||
firstPing bool
|
||||
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
|
||||
loggerOnce logger.LogOnce
|
||||
}
|
||||
|
||||
// ID - returns target ID.
|
||||
@@ -333,7 +334,7 @@ func (target *MySQLTarget) executeStmts() error {
|
||||
}
|
||||
|
||||
// NewMySQLTarget - creates new MySQL target.
|
||||
func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*MySQLTarget, error) {
|
||||
func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*MySQLTarget, error) {
|
||||
if args.DSN == "" {
|
||||
config := mysql.Config{
|
||||
User: args.User,
|
||||
@@ -357,7 +358,7 @@ func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnc
|
||||
|
||||
db, err := sql.Open("mysql", args.DSN)
|
||||
if err != nil {
|
||||
target.loggerOnce(context.Background(), err, target.ID())
|
||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||
return target, err
|
||||
}
|
||||
target.db = db
|
||||
@@ -373,7 +374,7 @@ func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnc
|
||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mysql-"+id)
|
||||
store = NewQueueStore(queueDir, args.QueueLimit)
|
||||
if oErr := store.Open(); oErr != nil {
|
||||
target.loggerOnce(context.Background(), oErr, target.ID())
|
||||
target.loggerOnce(context.Background(), oErr, target.ID().String())
|
||||
return target, oErr
|
||||
}
|
||||
target.store = store
|
||||
@@ -382,12 +383,12 @@ func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnc
|
||||
err = target.db.Ping()
|
||||
if err != nil {
|
||||
if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
|
||||
target.loggerOnce(context.Background(), err, target.ID())
|
||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||
return target, err
|
||||
}
|
||||
} else {
|
||||
if err = target.executeStmts(); err != nil {
|
||||
target.loggerOnce(context.Background(), err, target.ID())
|
||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||
return target, err
|
||||
}
|
||||
target.firstPing = true
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
"path/filepath"
|
||||
|
||||
"github.com/minio/minio/internal/event"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
xnet "github.com/minio/pkg/net"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/stan.go"
|
||||
@@ -217,7 +218,7 @@ type NATSTarget struct {
|
||||
stanConn stan.Conn
|
||||
jstream nats.JetStream
|
||||
store Store
|
||||
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
|
||||
loggerOnce logger.LogOnce
|
||||
}
|
||||
|
||||
// ID - returns target ID.
|
||||
@@ -350,7 +351,7 @@ func (target *NATSTarget) Close() (err error) {
|
||||
}
|
||||
|
||||
// NewNATSTarget - creates new NATS target.
|
||||
func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*NATSTarget, error) {
|
||||
func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*NATSTarget, error) {
|
||||
var natsConn *nats.Conn
|
||||
var stanConn stan.Conn
|
||||
var jstream nats.JetStream
|
||||
@@ -369,14 +370,14 @@ func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce
|
||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-nats-"+id)
|
||||
store = NewQueueStore(queueDir, args.QueueLimit)
|
||||
if oErr := store.Open(); oErr != nil {
|
||||
target.loggerOnce(context.Background(), oErr, target.ID())
|
||||
target.loggerOnce(context.Background(), oErr, target.ID().String())
|
||||
return target, oErr
|
||||
}
|
||||
target.store = store
|
||||
}
|
||||
|
||||
if args.Streaming.Enable {
|
||||
target.loggerOnce(context.Background(), errors.New("NATS Streaming is deprecated please migrate to JetStream"), target.ID())
|
||||
target.loggerOnce(context.Background(), errors.New("NATS Streaming is deprecated please migrate to JetStream"), target.ID().String())
|
||||
|
||||
stanConn, err = args.connectStan()
|
||||
target.stanConn = stanConn
|
||||
@@ -387,7 +388,7 @@ func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce
|
||||
|
||||
if err != nil {
|
||||
if store == nil || err.Error() != nats.ErrNoServers.Error() {
|
||||
target.loggerOnce(context.Background(), err, target.ID())
|
||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||
return target, err
|
||||
}
|
||||
}
|
||||
@@ -396,7 +397,7 @@ func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce
|
||||
jstream, err = target.natsConn.JetStream()
|
||||
if err != nil {
|
||||
if store == nil || err.Error() != nats.ErrNoServers.Error() {
|
||||
target.loggerOnce(context.Background(), err, target.ID())
|
||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||
return target, err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@ import (
|
||||
"github.com/nsqio/go-nsq"
|
||||
|
||||
"github.com/minio/minio/internal/event"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
xnet "github.com/minio/pkg/net"
|
||||
)
|
||||
|
||||
@@ -92,7 +93,7 @@ type NSQTarget struct {
|
||||
producer *nsq.Producer
|
||||
store Store
|
||||
config *nsq.Config
|
||||
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
|
||||
loggerOnce logger.LogOnce
|
||||
}
|
||||
|
||||
// ID - returns target ID.
|
||||
@@ -188,7 +189,7 @@ func (target *NSQTarget) Close() (err error) {
|
||||
}
|
||||
|
||||
// NewNSQTarget - creates new NSQ target.
|
||||
func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*NSQTarget, error) {
|
||||
func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*NSQTarget, error) {
|
||||
config := nsq.NewConfig()
|
||||
if args.TLS.Enable {
|
||||
config.TlsV1 = true
|
||||
@@ -210,7 +211,7 @@ func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce fu
|
||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-nsq-"+id)
|
||||
store = NewQueueStore(queueDir, args.QueueLimit)
|
||||
if oErr := store.Open(); oErr != nil {
|
||||
target.loggerOnce(context.Background(), oErr, target.ID())
|
||||
target.loggerOnce(context.Background(), oErr, target.ID().String())
|
||||
return target, oErr
|
||||
}
|
||||
target.store = store
|
||||
@@ -218,7 +219,7 @@ func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce fu
|
||||
|
||||
producer, err := nsq.NewProducer(args.NSQDAddress.String(), config)
|
||||
if err != nil {
|
||||
target.loggerOnce(context.Background(), err, target.ID())
|
||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||
return target, err
|
||||
}
|
||||
target.producer = producer
|
||||
@@ -226,7 +227,7 @@ func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce fu
|
||||
if err := target.producer.Ping(); err != nil {
|
||||
// To treat "connection refused" errors as errNotConnected.
|
||||
if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
|
||||
target.loggerOnce(context.Background(), err, target.ID())
|
||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||
return target, err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ import (
|
||||
_ "github.com/lib/pq" // Register postgres driver
|
||||
|
||||
"github.com/minio/minio/internal/event"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
xnet "github.com/minio/pkg/net"
|
||||
)
|
||||
|
||||
@@ -145,7 +146,7 @@ type PostgreSQLTarget struct {
|
||||
store Store
|
||||
firstPing bool
|
||||
connString string
|
||||
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
|
||||
loggerOnce logger.LogOnce
|
||||
}
|
||||
|
||||
// ID - returns target ID.
|
||||
@@ -329,7 +330,7 @@ func (target *PostgreSQLTarget) executeStmts() error {
|
||||
}
|
||||
|
||||
// NewPostgreSQLTarget - creates new PostgreSQL target.
|
||||
func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*PostgreSQLTarget, error) {
|
||||
func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*PostgreSQLTarget, error) {
|
||||
params := []string{args.ConnectionString}
|
||||
if args.ConnectionString == "" {
|
||||
params = []string{}
|
||||
@@ -376,7 +377,7 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{},
|
||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-postgresql-"+id)
|
||||
store = NewQueueStore(queueDir, args.QueueLimit)
|
||||
if oErr := store.Open(); oErr != nil {
|
||||
target.loggerOnce(context.Background(), oErr, target.ID())
|
||||
target.loggerOnce(context.Background(), oErr, target.ID().String())
|
||||
return target, oErr
|
||||
}
|
||||
target.store = store
|
||||
@@ -385,12 +386,12 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{},
|
||||
err = target.db.Ping()
|
||||
if err != nil {
|
||||
if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
|
||||
target.loggerOnce(context.Background(), err, target.ID())
|
||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||
return target, err
|
||||
}
|
||||
} else {
|
||||
if err = target.executeStmts(); err != nil {
|
||||
target.loggerOnce(context.Background(), err, target.ID())
|
||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||
return target, err
|
||||
}
|
||||
target.firstPing = true
|
||||
|
||||
@@ -30,6 +30,7 @@ import (
|
||||
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/minio/minio/internal/event"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
xnet "github.com/minio/pkg/net"
|
||||
)
|
||||
|
||||
@@ -121,7 +122,7 @@ type RedisTarget struct {
|
||||
pool *redis.Pool
|
||||
store Store
|
||||
firstPing bool
|
||||
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
|
||||
loggerOnce logger.LogOnce
|
||||
}
|
||||
|
||||
// ID - returns target ID.
|
||||
@@ -137,10 +138,8 @@ func (target *RedisTarget) HasQueueStore() bool {
|
||||
// IsActive - Return true if target is up and active
|
||||
func (target *RedisTarget) IsActive() (bool, error) {
|
||||
conn := target.pool.Get()
|
||||
defer func() {
|
||||
cErr := conn.Close()
|
||||
target.loggerOnce(context.Background(), cErr, target.ID())
|
||||
}()
|
||||
defer conn.Close()
|
||||
|
||||
_, pingErr := conn.Do("PING")
|
||||
if pingErr != nil {
|
||||
if IsConnRefusedErr(pingErr) {
|
||||
@@ -166,10 +165,7 @@ func (target *RedisTarget) Save(eventData event.Event) error {
|
||||
// send - sends an event to the redis.
|
||||
func (target *RedisTarget) send(eventData event.Event) error {
|
||||
conn := target.pool.Get()
|
||||
defer func() {
|
||||
cErr := conn.Close()
|
||||
target.loggerOnce(context.Background(), cErr, target.ID())
|
||||
}()
|
||||
defer conn.Close()
|
||||
|
||||
if target.args.Format == event.NamespaceFormat {
|
||||
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
|
||||
@@ -209,10 +205,8 @@ func (target *RedisTarget) send(eventData event.Event) error {
|
||||
// Send - reads an event from store and sends it to redis.
|
||||
func (target *RedisTarget) Send(eventKey string) error {
|
||||
conn := target.pool.Get()
|
||||
defer func() {
|
||||
cErr := conn.Close()
|
||||
target.loggerOnce(context.Background(), cErr, target.ID())
|
||||
}()
|
||||
defer conn.Close()
|
||||
|
||||
_, pingErr := conn.Do("PING")
|
||||
if pingErr != nil {
|
||||
if IsConnRefusedErr(pingErr) {
|
||||
@@ -258,7 +252,7 @@ func (target *RedisTarget) Close() error {
|
||||
}
|
||||
|
||||
// NewRedisTarget - creates new Redis target.
|
||||
func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}), test bool) (*RedisTarget, error) {
|
||||
func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnce logger.LogOnce, test bool) (*RedisTarget, error) {
|
||||
pool := &redis.Pool{
|
||||
MaxIdle: 3,
|
||||
IdleTimeout: 2 * 60 * time.Second,
|
||||
@@ -270,18 +264,14 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnc
|
||||
|
||||
if args.Password != "" {
|
||||
if _, err = conn.Do("AUTH", args.Password); err != nil {
|
||||
cErr := conn.Close()
|
||||
targetID := event.TargetID{ID: id, Name: "redis"}
|
||||
loggerOnce(context.Background(), cErr, targetID)
|
||||
conn.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Must be done after AUTH
|
||||
if _, err = conn.Do("CLIENT", "SETNAME", "MinIO"); err != nil {
|
||||
cErr := conn.Close()
|
||||
targetID := event.TargetID{ID: id, Name: "redis"}
|
||||
loggerOnce(context.Background(), cErr, targetID)
|
||||
conn.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -306,27 +296,24 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnc
|
||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-redis-"+id)
|
||||
store = NewQueueStore(queueDir, args.QueueLimit)
|
||||
if oErr := store.Open(); oErr != nil {
|
||||
target.loggerOnce(context.Background(), oErr, target.ID())
|
||||
target.loggerOnce(context.Background(), oErr, target.ID().String())
|
||||
return target, oErr
|
||||
}
|
||||
target.store = store
|
||||
}
|
||||
|
||||
conn := target.pool.Get()
|
||||
defer func() {
|
||||
cErr := conn.Close()
|
||||
target.loggerOnce(context.Background(), cErr, target.ID())
|
||||
}()
|
||||
defer conn.Close()
|
||||
|
||||
_, pingErr := conn.Do("PING")
|
||||
if pingErr != nil {
|
||||
if target.store == nil || !(IsConnRefusedErr(pingErr) || IsConnResetErr(pingErr)) {
|
||||
target.loggerOnce(context.Background(), pingErr, target.ID())
|
||||
target.loggerOnce(context.Background(), pingErr, target.ID().String())
|
||||
return target, pingErr
|
||||
}
|
||||
} else {
|
||||
if err := target.args.validateFormat(conn); err != nil {
|
||||
target.loggerOnce(context.Background(), err, target.ID())
|
||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||
return target, err
|
||||
}
|
||||
target.firstPing = true
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/internal/event"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
)
|
||||
|
||||
const retryInterval = 3 * time.Second
|
||||
@@ -46,36 +47,34 @@ type Store interface {
|
||||
}
|
||||
|
||||
// replayEvents - Reads the events from the store and replays.
|
||||
func replayEvents(store Store, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), id event.TargetID) <-chan string {
|
||||
func replayEvents(store Store, doneCh <-chan struct{}, loggerOnce logger.LogOnce, id event.TargetID) <-chan string {
|
||||
eventKeyCh := make(chan string)
|
||||
|
||||
go func() {
|
||||
defer close(eventKeyCh)
|
||||
|
||||
retryTicker := time.NewTicker(retryInterval)
|
||||
defer retryTicker.Stop()
|
||||
defer close(eventKeyCh)
|
||||
|
||||
for {
|
||||
names, err := store.List()
|
||||
if err == nil {
|
||||
if err != nil {
|
||||
loggerOnce(context.Background(), fmt.Errorf("eventStore.List() failed with: %w", err), id.String())
|
||||
} else {
|
||||
for _, name := range names {
|
||||
select {
|
||||
case eventKeyCh <- strings.TrimSuffix(name, eventExt):
|
||||
// Get next key.
|
||||
// Get next key.
|
||||
case <-doneCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(names) < 2 {
|
||||
select {
|
||||
case <-retryTicker.C:
|
||||
if err != nil {
|
||||
loggerOnce(context.Background(),
|
||||
fmt.Errorf("store.List() failed '%w'", err), id)
|
||||
}
|
||||
case <-doneCh:
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-retryTicker.C:
|
||||
case <-doneCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -98,7 +97,7 @@ func IsConnResetErr(err error) bool {
|
||||
}
|
||||
|
||||
// sendEvents - Reads events from the store and re-plays.
|
||||
func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) {
|
||||
func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan struct{}, loggerOnce logger.LogOnce) {
|
||||
retryTicker := time.NewTicker(retryInterval)
|
||||
defer retryTicker.Stop()
|
||||
|
||||
@@ -112,7 +111,7 @@ func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan str
|
||||
if err != errNotConnected && !IsConnResetErr(err) {
|
||||
loggerOnce(context.Background(),
|
||||
fmt.Errorf("target.Send() failed with '%w'", err),
|
||||
target.ID())
|
||||
target.ID().String())
|
||||
}
|
||||
|
||||
// Retrying after 3secs back-off
|
||||
|
||||
@@ -35,6 +35,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/internal/event"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/pkg/certs"
|
||||
xnet "github.com/minio/pkg/net"
|
||||
)
|
||||
@@ -94,7 +95,7 @@ type WebhookTarget struct {
|
||||
args WebhookArgs
|
||||
httpClient *http.Client
|
||||
store Store
|
||||
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
|
||||
loggerOnce logger.LogOnce
|
||||
}
|
||||
|
||||
// ID - returns target ID.
|
||||
@@ -232,7 +233,7 @@ func (target *WebhookTarget) Close() error {
|
||||
}
|
||||
|
||||
// NewWebhookTarget - creates new Webhook target.
|
||||
func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), transport *http.Transport, test bool) (*WebhookTarget, error) {
|
||||
func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOnce logger.LogOnce, transport *http.Transport, test bool) (*WebhookTarget, error) {
|
||||
var store Store
|
||||
target := &WebhookTarget{
|
||||
id: event.TargetID{ID: id, Name: "webhook"},
|
||||
@@ -254,7 +255,7 @@ func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOn
|
||||
queueDir := filepath.Join(args.QueueDir, storePrefix+"-webhook-"+id)
|
||||
store = NewQueueStore(queueDir, args.QueueLimit)
|
||||
if err := store.Open(); err != nil {
|
||||
target.loggerOnce(context.Background(), err, target.ID())
|
||||
target.loggerOnce(context.Background(), err, target.ID().String())
|
||||
return target, err
|
||||
}
|
||||
target.store = store
|
||||
@@ -263,7 +264,7 @@ func NewWebhookTarget(ctx context.Context, id string, args WebhookArgs, loggerOn
|
||||
_, err := target.IsActive()
|
||||
if err != nil {
|
||||
if target.store == nil || err != errNotConnected {
|
||||
target.loggerOnce(ctx, err, target.ID())
|
||||
target.loggerOnce(ctx, err, target.ID().String())
|
||||
return target, err
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user