Set the maximum open connections limit in PG and MySQL target configs (#10558)

As the bulk/recursive delete will require multiple connections to open at an instance,
The default open connections limit will be reached which results in the following error

```FATAL:  sorry, too many clients already```

By setting the open connections to a reasonable value - `2`, We ensure that the max open connections
will not be exhausted and lie under bounds.

The queries are simple inserts/updates/deletes which is operational and sufficient with the
the maximum open connection limit is 2.

Fixes #10553

Allow user configuration for MaxOpenConnections
This commit is contained in:
Praveen raj Mani
2020-09-25 10:50:30 +05:30
committed by GitHub
parent 37a5d5d7a0
commit b880796aef
9 changed files with 198 additions and 104 deletions

View File

@@ -340,6 +340,12 @@ var (
Optional: true,
Type: "sentence",
},
config.HelpKV{
Key: target.PostgresMaxOpenConnections,
Description: "To set the maximum number of open connections to the database. The value is set to `2` by default.",
Optional: true,
Type: "number",
},
}
HelpMySQL = config.HelpKVS{
@@ -377,6 +383,12 @@ var (
Optional: true,
Type: "sentence",
},
config.HelpKV{
Key: target.MySQLMaxOpenConnections,
Description: "To set the maximum number of open connections to the database. The value is set to `2` by default.",
Optional: true,
Type: "number",
},
}
HelpNATS = config.HelpKVS{

View File

@@ -357,6 +357,10 @@ func SetNotifyPostgres(s config.Config, psqName string, cfg target.PostgreSQLArg
Key: target.PostgresQueueLimit,
Value: strconv.Itoa(int(cfg.QueueLimit)),
},
config.KV{
Key: target.PostgresMaxOpenConnections,
Value: strconv.Itoa(cfg.MaxOpenConnections),
},
}
return nil
@@ -554,6 +558,10 @@ func SetNotifyMySQL(s config.Config, sqlName string, cfg target.MySQLArgs) error
Key: target.MySQLQueueLimit,
Value: strconv.Itoa(int(cfg.QueueLimit)),
},
config.KV{
Key: target.MySQLMaxOpenConnections,
Value: strconv.Itoa(cfg.MaxOpenConnections),
},
}
return nil

View File

@@ -807,6 +807,10 @@ var (
Key: target.MySQLQueueLimit,
Value: "0",
},
config.KV{
Key: target.MySQLMaxOpenConnections,
Value: "2",
},
}
)
@@ -855,13 +859,25 @@ func GetNotifyMySQL(mysqlKVS map[string]config.KVS) (map[string]target.MySQLArgs
if k != config.Default {
queueDirEnv = queueDirEnv + config.Default + k
}
maxOpenConnectionsEnv := target.EnvMySQLMaxOpenConnections
if k != config.Default {
maxOpenConnectionsEnv = maxOpenConnectionsEnv + config.Default + k
}
maxOpenConnections, cErr := strconv.Atoi(env.Get(maxOpenConnectionsEnv, kv.Get(target.MySQLMaxOpenConnections)))
if cErr != nil {
return nil, cErr
}
mysqlArgs := target.MySQLArgs{
Enable: enabled,
Format: env.Get(formatEnv, kv.Get(target.MySQLFormat)),
DSN: env.Get(dsnStringEnv, kv.Get(target.MySQLDSNString)),
Table: env.Get(tableEnv, kv.Get(target.MySQLTable)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.MySQLQueueDir)),
QueueLimit: queueLimit,
Enable: enabled,
Format: env.Get(formatEnv, kv.Get(target.MySQLFormat)),
DSN: env.Get(dsnStringEnv, kv.Get(target.MySQLDSNString)),
Table: env.Get(tableEnv, kv.Get(target.MySQLTable)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.MySQLQueueDir)),
QueueLimit: queueLimit,
MaxOpenConnections: maxOpenConnections,
}
if err = mysqlArgs.Validate(); err != nil {
return nil, err
@@ -1235,6 +1251,10 @@ var (
Key: target.PostgresQueueLimit,
Value: "0",
},
config.KV{
Key: target.PostgresMaxOpenConnections,
Value: "2",
},
}
)
@@ -1285,13 +1305,24 @@ func GetNotifyPostgres(postgresKVS map[string]config.KVS) (map[string]target.Pos
queueDirEnv = queueDirEnv + config.Default + k
}
maxOpenConnectionsEnv := target.EnvPostgresMaxOpenConnections
if k != config.Default {
maxOpenConnectionsEnv = maxOpenConnectionsEnv + config.Default + k
}
maxOpenConnections, cErr := strconv.Atoi(env.Get(maxOpenConnectionsEnv, kv.Get(target.PostgresMaxOpenConnections)))
if cErr != nil {
return nil, cErr
}
psqlArgs := target.PostgreSQLArgs{
Enable: enabled,
Format: env.Get(formatEnv, kv.Get(target.PostgresFormat)),
ConnectionString: env.Get(connectionStringEnv, kv.Get(target.PostgresConnectionString)),
Table: env.Get(tableEnv, kv.Get(target.PostgresTable)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.PostgresQueueDir)),
QueueLimit: uint64(queueLimit),
Enable: enabled,
Format: env.Get(formatEnv, kv.Get(target.PostgresFormat)),
ConnectionString: env.Get(connectionStringEnv, kv.Get(target.PostgresConnectionString)),
Table: env.Get(tableEnv, kv.Get(target.PostgresTable)),
QueueDir: env.Get(queueDirEnv, kv.Get(target.PostgresQueueDir)),
QueueLimit: uint64(queueLimit),
MaxOpenConnections: maxOpenConnections,
}
if err = psqlArgs.Validate(); err != nil {
return nil, err