From 428836d4e1952f176aa7e5c0515b047b6d812d64 Mon Sep 17 00:00:00 2001 From: Praveen raj Mani Date: Fri, 6 Sep 2019 02:25:48 +0530 Subject: [PATCH] Ignore "connection reset" errors while initializing the targets with queue store enabled (#8185) Fixes #8178 --- pkg/event/target/amqp.go | 2 +- pkg/event/target/mysql.go | 2 +- pkg/event/target/nsq.go | 2 +- pkg/event/target/postgresql.go | 2 +- pkg/event/target/redis.go | 2 +- pkg/event/target/store.go | 10 +++++++--- 6 files changed, 12 insertions(+), 8 deletions(-) diff --git a/pkg/event/target/amqp.go b/pkg/event/target/amqp.go index 4b72d4d64..bd10e5fc0 100644 --- a/pkg/event/target/amqp.go +++ b/pkg/event/target/amqp.go @@ -231,7 +231,7 @@ func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}) (*AMQPTarge conn, err = amqp.Dial(args.URL.String()) if err != nil { - if store == nil || !IsConnRefusedErr(err) { + if store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) { return nil, err } } diff --git a/pkg/event/target/mysql.go b/pkg/event/target/mysql.go index ad5611c86..3aa8ebafc 100644 --- a/pkg/event/target/mysql.go +++ b/pkg/event/target/mysql.go @@ -351,7 +351,7 @@ func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}) (*MySQLTa err = target.db.Ping() if err != nil { - if target.store == nil || !IsConnRefusedErr(err) { + if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) { return nil, err } } else { diff --git a/pkg/event/target/nsq.go b/pkg/event/target/nsq.go index 1bb92cd28..68d647b9d 100644 --- a/pkg/event/target/nsq.go +++ b/pkg/event/target/nsq.go @@ -182,7 +182,7 @@ func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}) (*NSQTarget, if err := target.producer.Ping(); err != nil { // To treat "connection refused" errors as errNotConnected. - if target.store == nil || !IsConnRefusedErr(err) { + if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) { return nil, err } } diff --git a/pkg/event/target/postgresql.go b/pkg/event/target/postgresql.go index 839a017fb..849166500 100644 --- a/pkg/event/target/postgresql.go +++ b/pkg/event/target/postgresql.go @@ -358,7 +358,7 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}) err = target.db.Ping() if err != nil { - if target.store == nil || !IsConnRefusedErr(err) { + if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) { return nil, err } } else { diff --git a/pkg/event/target/redis.go b/pkg/event/target/redis.go index 259b81ded..d0243ae7e 100644 --- a/pkg/event/target/redis.go +++ b/pkg/event/target/redis.go @@ -276,7 +276,7 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}) (*RedisTa _, pingErr := conn.Do("PING") if pingErr != nil { - if target.store == nil || !IsConnRefusedErr(pingErr) { + if target.store == nil || !(IsConnRefusedErr(pingErr) || IsConnResetErr(pingErr)) { return nil, pingErr } } else { diff --git a/pkg/event/target/store.go b/pkg/event/target/store.go index ae30683e1..5dd0ab5c6 100644 --- a/pkg/event/target/store.go +++ b/pkg/event/target/store.go @@ -93,8 +93,12 @@ func IsConnRefusedErr(err error) bool { return false } -// isConnResetErr - Checks for connection reset errors. -func isConnResetErr(err error) bool { +// IsConnResetErr - Checks for connection reset errors. +func IsConnResetErr(err error) bool { + if strings.Contains(err.Error(), "connection reset by peer") { + return true + } + // incase if error message is wrapped. if opErr, ok := err.(*net.OpError); ok { if syscallErr, ok := opErr.Err.(*os.SyscallError); ok { if syscallErr.Err == syscall.ECONNRESET { @@ -117,7 +121,7 @@ func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan str break } - if err != errNotConnected && !isConnResetErr(err) { + if err != errNotConnected && !IsConnResetErr(err) { panic(fmt.Errorf("target.Send() failed with '%v'", err)) }