2021-04-18 12:41:13 -07:00
|
|
|
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
|
|
//
|
|
|
|
// This file is part of MinIO Object Storage stack
|
|
|
|
//
|
|
|
|
// This program is free software: you can redistribute it and/or modify
|
|
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
|
|
// (at your option) any later version.
|
|
|
|
//
|
|
|
|
// This program is distributed in the hope that it will be useful
|
|
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
// GNU Affero General Public License for more details.
|
|
|
|
//
|
|
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
2019-10-22 22:59:13 -07:00
|
|
|
|
|
|
|
package notify
|
|
|
|
|
|
|
|
import (
|
2020-04-14 23:49:25 +05:30
|
|
|
"context"
|
2019-10-22 22:59:13 -07:00
|
|
|
"crypto/tls"
|
|
|
|
"crypto/x509"
|
2020-04-14 23:49:25 +05:30
|
|
|
"errors"
|
2019-12-12 06:53:50 -08:00
|
|
|
"net/http"
|
2019-10-22 22:59:13 -07:00
|
|
|
"strconv"
|
|
|
|
"strings"
|
|
|
|
"time"
|
|
|
|
|
2021-06-01 14:59:40 -07:00
|
|
|
"github.com/minio/minio/internal/config"
|
|
|
|
"github.com/minio/minio/internal/event"
|
|
|
|
"github.com/minio/minio/internal/event/target"
|
|
|
|
"github.com/minio/minio/internal/logger"
|
2021-05-28 15:17:01 -07:00
|
|
|
"github.com/minio/pkg/env"
|
2021-06-14 14:54:37 -07:00
|
|
|
xnet "github.com/minio/pkg/net"
|
2019-10-22 22:59:13 -07:00
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
formatNamespace = "namespace"
|
|
|
|
)
|
|
|
|
|
2020-04-14 23:49:25 +05:30
|
|
|
// ErrTargetsOffline - Indicates single/multiple target failures.
|
|
|
|
var ErrTargetsOffline = errors.New("one or more targets are offline. Please use `mc admin info --json` to check the offline targets")
|
|
|
|
|
2019-10-22 22:59:13 -07:00
|
|
|
// TestNotificationTargets is similar to GetNotificationTargets()
|
|
|
|
// avoids explicit registration.
|
certs: refactor cert manager to support multiple certificates (#10207)
This commit refactors the certificate management implementation
in the `certs` package such that multiple certificates can be
specified at the same time. Therefore, the following layout of
the `certs/` directory is expected:
```
certs/
│
├─ public.crt
├─ private.key
├─ CAs/ // CAs directory is ignored
│ │
│ ...
│
├─ example.com/
│ │
│ ├─ public.crt
│ └─ private.key
└─ foobar.org/
│
├─ public.crt
└─ private.key
...
```
However, directory names like `example.com` are just for human
readability/organization and don't have any meaning w.r.t whether
a particular certificate is served or not. This decision is made based
on the SNI sent by the client and the SAN of the certificate.
***
The `Manager` will pick a certificate based on the client trying
to establish a TLS connection. In particular, it looks at the client
hello (i.e. SNI) to determine which host the client tries to access.
If the manager can find a certificate that matches the SNI it
returns this certificate to the client.
However, the client may choose to not send an SNI or tries to access
a server directly via IP (`https://<ip>:<port>`). In this case, we
cannot use the SNI to determine which certificate to serve. However,
we also should not pick "the first" certificate that would be accepted
by the client (based on crypto. parameters - like a signature algorithm)
because it may be an internal certificate that contains internal hostnames.
We would disclose internal infrastructure details doing so.
Therefore, the `Manager` returns the "default" certificate when the
client does not specify an SNI. The default certificate the top-level
`public.crt` - i.e. `certs/public.crt`.
This approach has some consequences:
- It's the operator's responsibility to ensure that the top-level
`public.crt` does not disclose any information (i.e. hostnames)
that are not publicly visible. However, this was the case in the
past already.
- Any other `public.crt` - except for the top-level one - must not
contain any IP SAN. The reason for this restriction is that the
Manager cannot match a SNI to an IP b/c the SNI is the server host
name. The entire purpose of SNI is to indicate which host the client
tries to connect to when multiple hosts run on the same IP. So, a
client will not set the SNI to an IP.
If we would allow IP SANs in a lower-level `public.crt` a user would
expect that it is possible to connect to MinIO directly via IP address
and that the MinIO server would pick "the right" certificate. However,
the MinIO server cannot determine which certificate to serve, and
therefore always picks the "default" one. This may lead to all sorts
of confusing errors like:
"It works if I use `https:instance.minio.local` but not when I use
`https://10.0.2.1`.
These consequences/limitations should be pointed out / explained in our
docs in an appropriate way. However, the support for multiple
certificates should not have any impact on how deployment with a single
certificate function today.
Co-authored-by: Harshavardhana <harsha@minio.io>
2020-09-04 08:33:37 +02:00
|
|
|
func TestNotificationTargets(ctx context.Context, cfg config.Config, transport *http.Transport, targetIDs []event.TargetID) error {
|
2019-12-13 12:36:45 -08:00
|
|
|
test := true
|
2020-04-14 23:49:25 +05:30
|
|
|
returnOnTargetError := true
|
certs: refactor cert manager to support multiple certificates (#10207)
This commit refactors the certificate management implementation
in the `certs` package such that multiple certificates can be
specified at the same time. Therefore, the following layout of
the `certs/` directory is expected:
```
certs/
│
├─ public.crt
├─ private.key
├─ CAs/ // CAs directory is ignored
│ │
│ ...
│
├─ example.com/
│ │
│ ├─ public.crt
│ └─ private.key
└─ foobar.org/
│
├─ public.crt
└─ private.key
...
```
However, directory names like `example.com` are just for human
readability/organization and don't have any meaning w.r.t whether
a particular certificate is served or not. This decision is made based
on the SNI sent by the client and the SAN of the certificate.
***
The `Manager` will pick a certificate based on the client trying
to establish a TLS connection. In particular, it looks at the client
hello (i.e. SNI) to determine which host the client tries to access.
If the manager can find a certificate that matches the SNI it
returns this certificate to the client.
However, the client may choose to not send an SNI or tries to access
a server directly via IP (`https://<ip>:<port>`). In this case, we
cannot use the SNI to determine which certificate to serve. However,
we also should not pick "the first" certificate that would be accepted
by the client (based on crypto. parameters - like a signature algorithm)
because it may be an internal certificate that contains internal hostnames.
We would disclose internal infrastructure details doing so.
Therefore, the `Manager` returns the "default" certificate when the
client does not specify an SNI. The default certificate the top-level
`public.crt` - i.e. `certs/public.crt`.
This approach has some consequences:
- It's the operator's responsibility to ensure that the top-level
`public.crt` does not disclose any information (i.e. hostnames)
that are not publicly visible. However, this was the case in the
past already.
- Any other `public.crt` - except for the top-level one - must not
contain any IP SAN. The reason for this restriction is that the
Manager cannot match a SNI to an IP b/c the SNI is the server host
name. The entire purpose of SNI is to indicate which host the client
tries to connect to when multiple hosts run on the same IP. So, a
client will not set the SNI to an IP.
If we would allow IP SANs in a lower-level `public.crt` a user would
expect that it is possible to connect to MinIO directly via IP address
and that the MinIO server would pick "the right" certificate. However,
the MinIO server cannot determine which certificate to serve, and
therefore always picks the "default" one. This may lead to all sorts
of confusing errors like:
"It works if I use `https:instance.minio.local` but not when I use
`https://10.0.2.1`.
These consequences/limitations should be pointed out / explained in our
docs in an appropriate way. However, the support for multiple
certificates should not have any impact on how deployment with a single
certificate function today.
Co-authored-by: Harshavardhana <harsha@minio.io>
2020-09-04 08:33:37 +02:00
|
|
|
targets, err := RegisterNotificationTargets(ctx, cfg, transport, targetIDs, test, returnOnTargetError)
|
2020-02-07 23:35:56 +01:00
|
|
|
if err == nil {
|
|
|
|
// Close all targets since we are only testing connections.
|
|
|
|
for _, t := range targets.TargetMap() {
|
|
|
|
_ = t.Close()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-10-22 22:59:13 -07:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-02-09 00:06:41 +05:30
|
|
|
// TestSubSysNotificationTargets - tests notification targets of given subsystem
|
|
|
|
func TestSubSysNotificationTargets(ctx context.Context, cfg config.Config, transport *http.Transport, targetIDs []event.TargetID, subSys string) error {
|
|
|
|
if err := checkValidNotificationKeysForSubSys(subSys, cfg[subSys]); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
targetList := event.NewTargetList()
|
|
|
|
targetsOffline, err := fetchSubSysTargets(ctx, cfg, transport, true, true, subSys, targetList)
|
|
|
|
if err == nil {
|
|
|
|
// Close all targets since we are only testing connections.
|
|
|
|
for _, t := range targetList.TargetMap() {
|
|
|
|
_ = t.Close()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if targetsOffline {
|
|
|
|
return ErrTargetsOffline
|
|
|
|
}
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2019-10-22 22:59:13 -07:00
|
|
|
// GetNotificationTargets registers and initializes all notification
|
|
|
|
// targets, returns error if any.
|
certs: refactor cert manager to support multiple certificates (#10207)
This commit refactors the certificate management implementation
in the `certs` package such that multiple certificates can be
specified at the same time. Therefore, the following layout of
the `certs/` directory is expected:
```
certs/
│
├─ public.crt
├─ private.key
├─ CAs/ // CAs directory is ignored
│ │
│ ...
│
├─ example.com/
│ │
│ ├─ public.crt
│ └─ private.key
└─ foobar.org/
│
├─ public.crt
└─ private.key
...
```
However, directory names like `example.com` are just for human
readability/organization and don't have any meaning w.r.t whether
a particular certificate is served or not. This decision is made based
on the SNI sent by the client and the SAN of the certificate.
***
The `Manager` will pick a certificate based on the client trying
to establish a TLS connection. In particular, it looks at the client
hello (i.e. SNI) to determine which host the client tries to access.
If the manager can find a certificate that matches the SNI it
returns this certificate to the client.
However, the client may choose to not send an SNI or tries to access
a server directly via IP (`https://<ip>:<port>`). In this case, we
cannot use the SNI to determine which certificate to serve. However,
we also should not pick "the first" certificate that would be accepted
by the client (based on crypto. parameters - like a signature algorithm)
because it may be an internal certificate that contains internal hostnames.
We would disclose internal infrastructure details doing so.
Therefore, the `Manager` returns the "default" certificate when the
client does not specify an SNI. The default certificate the top-level
`public.crt` - i.e. `certs/public.crt`.
This approach has some consequences:
- It's the operator's responsibility to ensure that the top-level
`public.crt` does not disclose any information (i.e. hostnames)
that are not publicly visible. However, this was the case in the
past already.
- Any other `public.crt` - except for the top-level one - must not
contain any IP SAN. The reason for this restriction is that the
Manager cannot match a SNI to an IP b/c the SNI is the server host
name. The entire purpose of SNI is to indicate which host the client
tries to connect to when multiple hosts run on the same IP. So, a
client will not set the SNI to an IP.
If we would allow IP SANs in a lower-level `public.crt` a user would
expect that it is possible to connect to MinIO directly via IP address
and that the MinIO server would pick "the right" certificate. However,
the MinIO server cannot determine which certificate to serve, and
therefore always picks the "default" one. This may lead to all sorts
of confusing errors like:
"It works if I use `https:instance.minio.local` but not when I use
`https://10.0.2.1`.
These consequences/limitations should be pointed out / explained in our
docs in an appropriate way. However, the support for multiple
certificates should not have any impact on how deployment with a single
certificate function today.
Co-authored-by: Harshavardhana <harsha@minio.io>
2020-09-04 08:33:37 +02:00
|
|
|
func GetNotificationTargets(ctx context.Context, cfg config.Config, transport *http.Transport, test bool) (*event.TargetList, error) {
|
2020-04-14 23:49:25 +05:30
|
|
|
returnOnTargetError := false
|
certs: refactor cert manager to support multiple certificates (#10207)
This commit refactors the certificate management implementation
in the `certs` package such that multiple certificates can be
specified at the same time. Therefore, the following layout of
the `certs/` directory is expected:
```
certs/
│
├─ public.crt
├─ private.key
├─ CAs/ // CAs directory is ignored
│ │
│ ...
│
├─ example.com/
│ │
│ ├─ public.crt
│ └─ private.key
└─ foobar.org/
│
├─ public.crt
└─ private.key
...
```
However, directory names like `example.com` are just for human
readability/organization and don't have any meaning w.r.t whether
a particular certificate is served or not. This decision is made based
on the SNI sent by the client and the SAN of the certificate.
***
The `Manager` will pick a certificate based on the client trying
to establish a TLS connection. In particular, it looks at the client
hello (i.e. SNI) to determine which host the client tries to access.
If the manager can find a certificate that matches the SNI it
returns this certificate to the client.
However, the client may choose to not send an SNI or tries to access
a server directly via IP (`https://<ip>:<port>`). In this case, we
cannot use the SNI to determine which certificate to serve. However,
we also should not pick "the first" certificate that would be accepted
by the client (based on crypto. parameters - like a signature algorithm)
because it may be an internal certificate that contains internal hostnames.
We would disclose internal infrastructure details doing so.
Therefore, the `Manager` returns the "default" certificate when the
client does not specify an SNI. The default certificate the top-level
`public.crt` - i.e. `certs/public.crt`.
This approach has some consequences:
- It's the operator's responsibility to ensure that the top-level
`public.crt` does not disclose any information (i.e. hostnames)
that are not publicly visible. However, this was the case in the
past already.
- Any other `public.crt` - except for the top-level one - must not
contain any IP SAN. The reason for this restriction is that the
Manager cannot match a SNI to an IP b/c the SNI is the server host
name. The entire purpose of SNI is to indicate which host the client
tries to connect to when multiple hosts run on the same IP. So, a
client will not set the SNI to an IP.
If we would allow IP SANs in a lower-level `public.crt` a user would
expect that it is possible to connect to MinIO directly via IP address
and that the MinIO server would pick "the right" certificate. However,
the MinIO server cannot determine which certificate to serve, and
therefore always picks the "default" one. This may lead to all sorts
of confusing errors like:
"It works if I use `https:instance.minio.local` but not when I use
`https://10.0.2.1`.
These consequences/limitations should be pointed out / explained in our
docs in an appropriate way. However, the support for multiple
certificates should not have any impact on how deployment with a single
certificate function today.
Co-authored-by: Harshavardhana <harsha@minio.io>
2020-09-04 08:33:37 +02:00
|
|
|
return RegisterNotificationTargets(ctx, cfg, transport, nil, test, returnOnTargetError)
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// RegisterNotificationTargets - returns TargetList which contains enabled targets in serverConfig.
|
|
|
|
// A new notification target is added like below
|
|
|
|
// * Add a new target in pkg/event/target package.
|
|
|
|
// * Add newly added target configuration to serverConfig.Notify.<TARGET_NAME>.
|
|
|
|
// * Handle the configuration in this function to create/add into TargetList.
|
certs: refactor cert manager to support multiple certificates (#10207)
This commit refactors the certificate management implementation
in the `certs` package such that multiple certificates can be
specified at the same time. Therefore, the following layout of
the `certs/` directory is expected:
```
certs/
│
├─ public.crt
├─ private.key
├─ CAs/ // CAs directory is ignored
│ │
│ ...
│
├─ example.com/
│ │
│ ├─ public.crt
│ └─ private.key
└─ foobar.org/
│
├─ public.crt
└─ private.key
...
```
However, directory names like `example.com` are just for human
readability/organization and don't have any meaning w.r.t whether
a particular certificate is served or not. This decision is made based
on the SNI sent by the client and the SAN of the certificate.
***
The `Manager` will pick a certificate based on the client trying
to establish a TLS connection. In particular, it looks at the client
hello (i.e. SNI) to determine which host the client tries to access.
If the manager can find a certificate that matches the SNI it
returns this certificate to the client.
However, the client may choose to not send an SNI or tries to access
a server directly via IP (`https://<ip>:<port>`). In this case, we
cannot use the SNI to determine which certificate to serve. However,
we also should not pick "the first" certificate that would be accepted
by the client (based on crypto. parameters - like a signature algorithm)
because it may be an internal certificate that contains internal hostnames.
We would disclose internal infrastructure details doing so.
Therefore, the `Manager` returns the "default" certificate when the
client does not specify an SNI. The default certificate the top-level
`public.crt` - i.e. `certs/public.crt`.
This approach has some consequences:
- It's the operator's responsibility to ensure that the top-level
`public.crt` does not disclose any information (i.e. hostnames)
that are not publicly visible. However, this was the case in the
past already.
- Any other `public.crt` - except for the top-level one - must not
contain any IP SAN. The reason for this restriction is that the
Manager cannot match a SNI to an IP b/c the SNI is the server host
name. The entire purpose of SNI is to indicate which host the client
tries to connect to when multiple hosts run on the same IP. So, a
client will not set the SNI to an IP.
If we would allow IP SANs in a lower-level `public.crt` a user would
expect that it is possible to connect to MinIO directly via IP address
and that the MinIO server would pick "the right" certificate. However,
the MinIO server cannot determine which certificate to serve, and
therefore always picks the "default" one. This may lead to all sorts
of confusing errors like:
"It works if I use `https:instance.minio.local` but not when I use
`https://10.0.2.1`.
These consequences/limitations should be pointed out / explained in our
docs in an appropriate way. However, the support for multiple
certificates should not have any impact on how deployment with a single
certificate function today.
Co-authored-by: Harshavardhana <harsha@minio.io>
2020-09-04 08:33:37 +02:00
|
|
|
func RegisterNotificationTargets(ctx context.Context, cfg config.Config, transport *http.Transport, targetIDs []event.TargetID, test bool, returnOnTargetError bool) (*event.TargetList, error) {
|
|
|
|
targetList, err := FetchRegisteredTargets(ctx, cfg, transport, test, returnOnTargetError)
|
2020-04-14 23:49:25 +05:30
|
|
|
if err != nil {
|
|
|
|
return targetList, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if test {
|
|
|
|
// Verify if user is trying to disable already configured
|
|
|
|
// notification targets, based on their target IDs
|
|
|
|
for _, targetID := range targetIDs {
|
|
|
|
if !targetList.Exists(targetID) {
|
|
|
|
return nil, config.Errorf(
|
2022-01-27 18:28:16 -08:00
|
|
|
"Unable to disable currently configured targets '%v'",
|
2020-04-14 23:49:25 +05:30
|
|
|
targetID)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return targetList, nil
|
|
|
|
}
|
|
|
|
|
2022-02-09 00:06:41 +05:30
|
|
|
func fetchSubSysTargets(ctx context.Context, cfg config.Config,
|
|
|
|
transport *http.Transport, test bool, returnOnTargetError bool,
|
|
|
|
subSys string, targetList *event.TargetList) (targetsOffline bool, err error) {
|
|
|
|
targetsOffline = false
|
|
|
|
if err := checkValidNotificationKeysForSubSys(subSys, cfg[subSys]); err != nil {
|
|
|
|
return targetsOffline, err
|
2019-10-30 23:39:09 -07:00
|
|
|
}
|
|
|
|
|
2022-02-09 00:06:41 +05:30
|
|
|
switch subSys {
|
|
|
|
case config.NotifyAMQPSubSys:
|
|
|
|
amqpTargets, err := GetNotifyAMQP(cfg[config.NotifyAMQPSubSys])
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2022-02-09 00:06:41 +05:30
|
|
|
return targetsOffline, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
for id, args := range amqpTargets {
|
|
|
|
if !args.Enable {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
newTarget, err := target.NewAMQPTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
|
|
|
|
if err != nil {
|
|
|
|
targetsOffline = true
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
|
|
|
_ = newTarget.Close()
|
2020-04-14 23:49:25 +05:30
|
|
|
}
|
2022-03-02 21:35:48 -08:00
|
|
|
if err = targetList.Add(newTarget); err != nil {
|
|
|
|
logger.LogIf(context.Background(), err)
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
|
|
|
}
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
case config.NotifyESSubSys:
|
|
|
|
esTargets, err := GetNotifyES(cfg[config.NotifyESSubSys], transport)
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2022-02-09 00:06:41 +05:30
|
|
|
return targetsOffline, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
for id, args := range esTargets {
|
|
|
|
if !args.Enable {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
newTarget, err := target.NewElasticsearchTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
|
|
|
|
if err != nil {
|
|
|
|
targetsOffline = true
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
|
|
|
_ = newTarget.Close()
|
|
|
|
}
|
|
|
|
if err = targetList.Add(newTarget); err != nil {
|
|
|
|
logger.LogIf(context.Background(), err)
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
2020-04-14 23:49:25 +05:30
|
|
|
}
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
case config.NotifyKafkaSubSys:
|
|
|
|
kafkaTargets, err := GetNotifyKafka(cfg[config.NotifyKafkaSubSys])
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2022-02-09 00:06:41 +05:30
|
|
|
return targetsOffline, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
for id, args := range kafkaTargets {
|
|
|
|
if !args.Enable {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
args.TLS.RootCAs = transport.TLSClientConfig.RootCAs
|
|
|
|
newTarget, err := target.NewKafkaTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
|
|
|
|
if err != nil {
|
|
|
|
targetsOffline = true
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
|
|
|
_ = newTarget.Close()
|
|
|
|
}
|
|
|
|
if err = targetList.Add(newTarget); err != nil {
|
|
|
|
logger.LogIf(context.Background(), err)
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
2020-04-14 23:49:25 +05:30
|
|
|
}
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
2022-02-09 00:06:41 +05:30
|
|
|
case config.NotifyMQTTSubSys:
|
|
|
|
mqttTargets, err := GetNotifyMQTT(cfg[config.NotifyMQTTSubSys], transport.TLSClientConfig.RootCAs)
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2022-02-09 00:06:41 +05:30
|
|
|
return targetsOffline, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
for id, args := range mqttTargets {
|
|
|
|
if !args.Enable {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
args.RootCAs = transport.TLSClientConfig.RootCAs
|
|
|
|
newTarget, err := target.NewMQTTTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
|
|
|
|
if err != nil {
|
|
|
|
targetsOffline = true
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
|
|
|
_ = newTarget.Close()
|
|
|
|
}
|
|
|
|
if err = targetList.Add(newTarget); err != nil {
|
|
|
|
logger.LogIf(context.Background(), err)
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
2020-04-14 23:49:25 +05:30
|
|
|
}
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
case config.NotifyMySQLSubSys:
|
|
|
|
mysqlTargets, err := GetNotifyMySQL(cfg[config.NotifyMySQLSubSys])
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2022-02-09 00:06:41 +05:30
|
|
|
return targetsOffline, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
for id, args := range mysqlTargets {
|
|
|
|
if !args.Enable {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
newTarget, err := target.NewMySQLTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
|
|
|
|
if err != nil {
|
|
|
|
targetsOffline = true
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
|
|
|
_ = newTarget.Close()
|
|
|
|
}
|
|
|
|
if err = targetList.Add(newTarget); err != nil {
|
|
|
|
logger.LogIf(context.Background(), err)
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
2020-04-14 23:49:25 +05:30
|
|
|
}
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
case config.NotifyNATSSubSys:
|
|
|
|
natsTargets, err := GetNotifyNATS(cfg[config.NotifyNATSSubSys], transport.TLSClientConfig.RootCAs)
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2022-02-09 00:06:41 +05:30
|
|
|
return targetsOffline, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
for id, args := range natsTargets {
|
|
|
|
if !args.Enable {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
newTarget, err := target.NewNATSTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
|
|
|
|
if err != nil {
|
|
|
|
targetsOffline = true
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
|
|
|
_ = newTarget.Close()
|
|
|
|
}
|
|
|
|
if err = targetList.Add(newTarget); err != nil {
|
|
|
|
logger.LogIf(context.Background(), err)
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
2020-04-14 23:49:25 +05:30
|
|
|
}
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
case config.NotifyNSQSubSys:
|
|
|
|
nsqTargets, err := GetNotifyNSQ(cfg[config.NotifyNSQSubSys])
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2022-02-09 00:06:41 +05:30
|
|
|
return targetsOffline, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
for id, args := range nsqTargets {
|
|
|
|
if !args.Enable {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
newTarget, err := target.NewNSQTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
|
|
|
|
if err != nil {
|
|
|
|
targetsOffline = true
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
|
|
|
_ = newTarget.Close()
|
|
|
|
}
|
|
|
|
if err = targetList.Add(newTarget); err != nil {
|
|
|
|
logger.LogIf(context.Background(), err)
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
2020-04-14 23:49:25 +05:30
|
|
|
}
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
case config.NotifyPostgresSubSys:
|
|
|
|
postgresTargets, err := GetNotifyPostgres(cfg[config.NotifyPostgresSubSys])
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2022-02-09 00:06:41 +05:30
|
|
|
return targetsOffline, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
for id, args := range postgresTargets {
|
|
|
|
if !args.Enable {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
newTarget, err := target.NewPostgreSQLTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
|
|
|
|
if err != nil {
|
|
|
|
targetsOffline = true
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
|
|
|
_ = newTarget.Close()
|
|
|
|
}
|
|
|
|
if err = targetList.Add(newTarget); err != nil {
|
|
|
|
logger.LogIf(context.Background(), err)
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
2020-04-14 23:49:25 +05:30
|
|
|
}
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
case config.NotifyRedisSubSys:
|
|
|
|
redisTargets, err := GetNotifyRedis(cfg[config.NotifyRedisSubSys])
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2022-02-09 00:06:41 +05:30
|
|
|
return targetsOffline, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
for id, args := range redisTargets {
|
|
|
|
if !args.Enable {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
newTarget, err := target.NewRedisTarget(id, args, ctx.Done(), logger.LogOnceIf, test)
|
|
|
|
if err != nil {
|
|
|
|
targetsOffline = true
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
|
|
|
_ = newTarget.Close()
|
|
|
|
}
|
|
|
|
if err = targetList.Add(newTarget); err != nil {
|
|
|
|
logger.LogIf(context.Background(), err)
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
2020-04-14 23:49:25 +05:30
|
|
|
}
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
case config.NotifyWebhookSubSys:
|
|
|
|
webhookTargets, err := GetNotifyWebhook(cfg[config.NotifyWebhookSubSys], transport)
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2022-02-09 00:06:41 +05:30
|
|
|
return targetsOffline, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
for id, args := range webhookTargets {
|
|
|
|
if !args.Enable {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
newTarget, err := target.NewWebhookTarget(ctx, id, args, logger.LogOnceIf, transport, test)
|
|
|
|
if err != nil {
|
|
|
|
targetsOffline = true
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
|
|
|
_ = newTarget.Close()
|
|
|
|
}
|
|
|
|
if err = targetList.Add(newTarget); err != nil {
|
|
|
|
logger.LogIf(context.Background(), err)
|
|
|
|
if returnOnTargetError {
|
|
|
|
return targetsOffline, err
|
|
|
|
}
|
2020-04-14 23:49:25 +05:30
|
|
|
}
|
2019-12-13 12:36:45 -08:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
|
2019-12-13 12:36:45 -08:00
|
|
|
}
|
2022-02-09 00:06:41 +05:30
|
|
|
return targetsOffline, nil
|
|
|
|
}
|
2019-12-13 12:36:45 -08:00
|
|
|
|
2022-02-09 00:06:41 +05:30
|
|
|
// FetchRegisteredTargets - Returns a set of configured TargetList
|
|
|
|
// If `returnOnTargetError` is set to true, The function returns when a target initialization fails
|
|
|
|
// Else, the function will return a complete TargetList irrespective of errors
|
|
|
|
func FetchRegisteredTargets(ctx context.Context, cfg config.Config, transport *http.Transport, test bool, returnOnTargetError bool) (_ *event.TargetList, err error) {
|
|
|
|
targetList := event.NewTargetList()
|
|
|
|
var targetsOffline bool
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
// Automatically close all connections to targets when an error occur.
|
|
|
|
// Close all the targets if returnOnTargetError is set
|
|
|
|
// Else, close only the failed targets
|
|
|
|
if err != nil && returnOnTargetError {
|
|
|
|
for _, t := range targetList.TargetMap() {
|
|
|
|
_ = t.Close()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
for _, subSys := range config.NotifySubSystems.ToSlice() {
|
|
|
|
if targetsOffline, err = fetchSubSysTargets(ctx, cfg, transport, test, returnOnTargetError, subSys, targetList); err != nil {
|
|
|
|
return targetList, err
|
|
|
|
}
|
|
|
|
if targetsOffline {
|
|
|
|
return targetList, ErrTargetsOffline
|
|
|
|
}
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
return targetList, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// DefaultNotificationKVS - default notification list of kvs.
|
|
|
|
var (
|
|
|
|
DefaultNotificationKVS = map[string]config.KVS{
|
|
|
|
config.NotifyAMQPSubSys: DefaultAMQPKVS,
|
|
|
|
config.NotifyKafkaSubSys: DefaultKafkaKVS,
|
|
|
|
config.NotifyMQTTSubSys: DefaultMQTTKVS,
|
|
|
|
config.NotifyMySQLSubSys: DefaultMySQLKVS,
|
|
|
|
config.NotifyNATSSubSys: DefaultNATSKVS,
|
|
|
|
config.NotifyNSQSubSys: DefaultNSQKVS,
|
|
|
|
config.NotifyPostgresSubSys: DefaultPostgresKVS,
|
|
|
|
config.NotifyRedisSubSys: DefaultRedisKVS,
|
|
|
|
config.NotifyWebhookSubSys: DefaultWebhookKVS,
|
|
|
|
config.NotifyESSubSys: DefaultESKVS,
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
2022-02-09 00:06:41 +05:30
|
|
|
func checkValidNotificationKeysForSubSys(subSys string, tgt map[string]config.KVS) error {
|
|
|
|
validKVS, ok := DefaultNotificationKVS[subSys]
|
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
for tname, kv := range tgt {
|
|
|
|
subSysTarget := subSys
|
|
|
|
if tname != config.Default {
|
|
|
|
subSysTarget = subSys + config.SubSystemSeparator + tname
|
|
|
|
}
|
|
|
|
if v, ok := kv.Lookup(config.Enable); ok && v == config.EnableOn {
|
|
|
|
if err := config.CheckValidKeys(subSysTarget, kv, validKVS); err != nil {
|
|
|
|
return err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// DefaultKakfaKVS - default KV for kafka target
|
|
|
|
var (
|
|
|
|
DefaultKafkaKVS = config.KVS{
|
2019-11-20 15:10:24 -08:00
|
|
|
config.KV{
|
2019-12-04 15:32:37 -08:00
|
|
|
Key: config.Enable,
|
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.KafkaTopic,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.KafkaBrokers,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.KafkaSASLUsername,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.KafkaSASLPassword,
|
|
|
|
Value: "",
|
|
|
|
},
|
2020-03-20 18:10:27 +00:00
|
|
|
config.KV{
|
|
|
|
Key: target.KafkaSASLMechanism,
|
|
|
|
Value: "plain",
|
|
|
|
},
|
2019-12-06 02:31:46 +03:00
|
|
|
config.KV{
|
|
|
|
Key: target.KafkaClientTLSCert,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.KafkaClientTLSKey,
|
|
|
|
Value: "",
|
|
|
|
},
|
2019-11-20 15:10:24 -08:00
|
|
|
config.KV{
|
|
|
|
Key: target.KafkaTLSClientAuth,
|
|
|
|
Value: "0",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.KafkaSASL,
|
2019-12-04 15:32:37 -08:00
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.KafkaTLS,
|
2019-12-04 15:32:37 -08:00
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.KafkaTLSSkipVerify,
|
2019-12-04 15:32:37 -08:00
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.KafkaQueueLimit,
|
|
|
|
Value: "0",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.KafkaQueueDir,
|
|
|
|
Value: "",
|
|
|
|
},
|
2020-02-17 07:56:34 +05:30
|
|
|
config.KV{
|
|
|
|
Key: target.KafkaVersion,
|
|
|
|
Value: "",
|
|
|
|
},
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
// GetNotifyKafka - returns a map of registered notification 'kafka' targets
|
2019-10-30 23:39:09 -07:00
|
|
|
func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs, error) {
|
2019-10-22 22:59:13 -07:00
|
|
|
kafkaTargets := make(map[string]target.KafkaArgs)
|
2021-07-13 09:39:13 -07:00
|
|
|
for k, kv := range config.Merge(kafkaKVS, target.EnvKafkaEnable, DefaultKafkaKVS) {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv := target.EnvKafkaEnable
|
2019-10-22 22:59:13 -07:00
|
|
|
if k != config.Default {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv = enableEnv + config.Default + k
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2019-12-04 15:32:37 -08:00
|
|
|
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
if !enabled {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
var brokers []xnet.Host
|
|
|
|
brokersEnv := target.EnvKafkaBrokers
|
|
|
|
if k != config.Default {
|
|
|
|
brokersEnv = brokersEnv + config.Default + k
|
|
|
|
}
|
|
|
|
kafkaBrokers := env.Get(brokersEnv, kv.Get(target.KafkaBrokers))
|
2019-10-30 23:39:09 -07:00
|
|
|
if len(kafkaBrokers) == 0 {
|
2019-12-14 17:27:57 -08:00
|
|
|
return nil, config.Errorf("kafka 'brokers' cannot be empty")
|
2019-10-30 23:39:09 -07:00
|
|
|
}
|
2019-10-22 22:59:13 -07:00
|
|
|
for _, s := range strings.Split(kafkaBrokers, config.ValueSeparator) {
|
|
|
|
var host *xnet.Host
|
|
|
|
host, err = xnet.ParseHost(s)
|
|
|
|
if err != nil {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
brokers = append(brokers, *host)
|
|
|
|
}
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
queueLimitEnv := target.EnvKafkaQueueLimit
|
|
|
|
if k != config.Default {
|
|
|
|
queueLimitEnv = queueLimitEnv + config.Default + k
|
|
|
|
}
|
|
|
|
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.KafkaQueueLimit)), 10, 64)
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
clientAuthEnv := target.EnvKafkaTLSClientAuth
|
|
|
|
if k != config.Default {
|
|
|
|
clientAuthEnv = clientAuthEnv + config.Default + k
|
|
|
|
}
|
|
|
|
clientAuth, err := strconv.Atoi(env.Get(clientAuthEnv, kv.Get(target.KafkaTLSClientAuth)))
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
topicEnv := target.EnvKafkaTopic
|
|
|
|
if k != config.Default {
|
|
|
|
topicEnv = topicEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
queueDirEnv := target.EnvKafkaQueueDir
|
|
|
|
if k != config.Default {
|
|
|
|
queueDirEnv = queueDirEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
2020-02-17 07:56:34 +05:30
|
|
|
versionEnv := target.EnvKafkaVersion
|
|
|
|
if k != config.Default {
|
|
|
|
versionEnv = versionEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
2019-10-22 22:59:13 -07:00
|
|
|
kafkaArgs := target.KafkaArgs{
|
|
|
|
Enable: enabled,
|
|
|
|
Brokers: brokers,
|
|
|
|
Topic: env.Get(topicEnv, kv.Get(target.KafkaTopic)),
|
|
|
|
QueueDir: env.Get(queueDirEnv, kv.Get(target.KafkaQueueDir)),
|
|
|
|
QueueLimit: queueLimit,
|
2020-02-17 07:56:34 +05:30
|
|
|
Version: env.Get(versionEnv, kv.Get(target.KafkaVersion)),
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
2019-11-13 17:38:05 -08:00
|
|
|
tlsEnableEnv := target.EnvKafkaTLS
|
2019-10-22 22:59:13 -07:00
|
|
|
if k != config.Default {
|
|
|
|
tlsEnableEnv = tlsEnableEnv + config.Default + k
|
|
|
|
}
|
|
|
|
tlsSkipVerifyEnv := target.EnvKafkaTLSSkipVerify
|
|
|
|
if k != config.Default {
|
|
|
|
tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k
|
|
|
|
}
|
2019-12-06 02:31:46 +03:00
|
|
|
|
|
|
|
tlsClientTLSCertEnv := target.EnvKafkaClientTLSCert
|
|
|
|
if k != config.Default {
|
|
|
|
tlsClientTLSCertEnv = tlsClientTLSCertEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
tlsClientTLSKeyEnv := target.EnvKafkaClientTLSKey
|
|
|
|
if k != config.Default {
|
|
|
|
tlsClientTLSKeyEnv = tlsClientTLSKeyEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
2019-12-04 15:32:37 -08:00
|
|
|
kafkaArgs.TLS.Enable = env.Get(tlsEnableEnv, kv.Get(target.KafkaTLS)) == config.EnableOn
|
|
|
|
kafkaArgs.TLS.SkipVerify = env.Get(tlsSkipVerifyEnv, kv.Get(target.KafkaTLSSkipVerify)) == config.EnableOn
|
2019-10-22 22:59:13 -07:00
|
|
|
kafkaArgs.TLS.ClientAuth = tls.ClientAuthType(clientAuth)
|
|
|
|
|
2019-12-06 02:31:46 +03:00
|
|
|
kafkaArgs.TLS.ClientTLSCert = env.Get(tlsClientTLSCertEnv, kv.Get(target.KafkaClientTLSCert))
|
|
|
|
kafkaArgs.TLS.ClientTLSKey = env.Get(tlsClientTLSKeyEnv, kv.Get(target.KafkaClientTLSKey))
|
|
|
|
|
2019-10-22 22:59:13 -07:00
|
|
|
saslEnableEnv := target.EnvKafkaSASLEnable
|
|
|
|
if k != config.Default {
|
|
|
|
saslEnableEnv = saslEnableEnv + config.Default + k
|
|
|
|
}
|
|
|
|
saslUsernameEnv := target.EnvKafkaSASLUsername
|
|
|
|
if k != config.Default {
|
|
|
|
saslUsernameEnv = saslUsernameEnv + config.Default + k
|
|
|
|
}
|
|
|
|
saslPasswordEnv := target.EnvKafkaSASLPassword
|
|
|
|
if k != config.Default {
|
|
|
|
saslPasswordEnv = saslPasswordEnv + config.Default + k
|
|
|
|
}
|
2020-03-20 18:10:27 +00:00
|
|
|
saslMechanismEnv := target.EnvKafkaSASLMechanism
|
|
|
|
if k != config.Default {
|
|
|
|
saslMechanismEnv = saslMechanismEnv + config.Default + k
|
|
|
|
}
|
2019-12-04 15:32:37 -08:00
|
|
|
kafkaArgs.SASL.Enable = env.Get(saslEnableEnv, kv.Get(target.KafkaSASL)) == config.EnableOn
|
2019-10-22 22:59:13 -07:00
|
|
|
kafkaArgs.SASL.User = env.Get(saslUsernameEnv, kv.Get(target.KafkaSASLUsername))
|
|
|
|
kafkaArgs.SASL.Password = env.Get(saslPasswordEnv, kv.Get(target.KafkaSASLPassword))
|
2020-03-20 18:10:27 +00:00
|
|
|
kafkaArgs.SASL.Mechanism = env.Get(saslMechanismEnv, kv.Get(target.KafkaSASLMechanism))
|
2019-10-22 22:59:13 -07:00
|
|
|
|
|
|
|
if err = kafkaArgs.Validate(); err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
kafkaTargets[k] = kafkaArgs
|
|
|
|
}
|
|
|
|
|
2019-10-30 23:39:09 -07:00
|
|
|
return kafkaTargets, nil
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// DefaultMQTTKVS - default MQTT config
|
|
|
|
var (
|
|
|
|
DefaultMQTTKVS = config.KVS{
|
2019-11-20 15:10:24 -08:00
|
|
|
config.KV{
|
2019-12-04 15:32:37 -08:00
|
|
|
Key: config.Enable,
|
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.MqttBroker,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.MqttTopic,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.MqttPassword,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.MqttUsername,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.MqttQoS,
|
|
|
|
Value: "0",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.MqttKeepAliveInterval,
|
|
|
|
Value: "0s",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.MqttReconnectInterval,
|
|
|
|
Value: "0s",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.MqttQueueDir,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.MqttQueueLimit,
|
|
|
|
Value: "0",
|
|
|
|
},
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
// GetNotifyMQTT - returns a map of registered notification 'mqtt' targets
|
2019-10-30 23:39:09 -07:00
|
|
|
func GetNotifyMQTT(mqttKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[string]target.MQTTArgs, error) {
|
2019-10-22 22:59:13 -07:00
|
|
|
mqttTargets := make(map[string]target.MQTTArgs)
|
2021-07-13 09:39:13 -07:00
|
|
|
for k, kv := range config.Merge(mqttKVS, target.EnvMQTTEnable, DefaultMQTTKVS) {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv := target.EnvMQTTEnable
|
2019-10-22 22:59:13 -07:00
|
|
|
if k != config.Default {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv = enableEnv + config.Default + k
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
2019-12-04 15:32:37 -08:00
|
|
|
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
if !enabled {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
brokerEnv := target.EnvMQTTBroker
|
|
|
|
if k != config.Default {
|
|
|
|
brokerEnv = brokerEnv + config.Default + k
|
|
|
|
}
|
2019-11-22 13:46:05 -08:00
|
|
|
|
2019-10-22 22:59:13 -07:00
|
|
|
brokerURL, err := xnet.ParseURL(env.Get(brokerEnv, kv.Get(target.MqttBroker)))
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
reconnectIntervalEnv := target.EnvMQTTReconnectInterval
|
|
|
|
if k != config.Default {
|
|
|
|
reconnectIntervalEnv = reconnectIntervalEnv + config.Default + k
|
|
|
|
}
|
|
|
|
reconnectInterval, err := time.ParseDuration(env.Get(reconnectIntervalEnv,
|
|
|
|
kv.Get(target.MqttReconnectInterval)))
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
keepAliveIntervalEnv := target.EnvMQTTKeepAliveInterval
|
|
|
|
if k != config.Default {
|
|
|
|
keepAliveIntervalEnv = keepAliveIntervalEnv + config.Default + k
|
|
|
|
}
|
|
|
|
keepAliveInterval, err := time.ParseDuration(env.Get(keepAliveIntervalEnv,
|
|
|
|
kv.Get(target.MqttKeepAliveInterval)))
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
queueLimitEnv := target.EnvMQTTQueueLimit
|
|
|
|
if k != config.Default {
|
|
|
|
queueLimitEnv = queueLimitEnv + config.Default + k
|
|
|
|
}
|
|
|
|
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.MqttQueueLimit)), 10, 64)
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
qosEnv := target.EnvMQTTQoS
|
|
|
|
if k != config.Default {
|
|
|
|
qosEnv = qosEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
// Parse uint8 value
|
|
|
|
qos, err := strconv.ParseUint(env.Get(qosEnv, kv.Get(target.MqttQoS)), 10, 8)
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
topicEnv := target.EnvMQTTTopic
|
|
|
|
if k != config.Default {
|
|
|
|
topicEnv = topicEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
usernameEnv := target.EnvMQTTUsername
|
|
|
|
if k != config.Default {
|
|
|
|
usernameEnv = usernameEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
passwordEnv := target.EnvMQTTPassword
|
|
|
|
if k != config.Default {
|
|
|
|
passwordEnv = passwordEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
queueDirEnv := target.EnvMQTTQueueDir
|
|
|
|
if k != config.Default {
|
|
|
|
queueDirEnv = queueDirEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
mqttArgs := target.MQTTArgs{
|
|
|
|
Enable: enabled,
|
|
|
|
Broker: *brokerURL,
|
|
|
|
Topic: env.Get(topicEnv, kv.Get(target.MqttTopic)),
|
|
|
|
QoS: byte(qos),
|
|
|
|
User: env.Get(usernameEnv, kv.Get(target.MqttUsername)),
|
|
|
|
Password: env.Get(passwordEnv, kv.Get(target.MqttPassword)),
|
|
|
|
MaxReconnectInterval: reconnectInterval,
|
|
|
|
KeepAlive: keepAliveInterval,
|
|
|
|
RootCAs: rootCAs,
|
|
|
|
QueueDir: env.Get(queueDirEnv, kv.Get(target.MqttQueueDir)),
|
|
|
|
QueueLimit: queueLimit,
|
|
|
|
}
|
|
|
|
|
|
|
|
if err = mqttArgs.Validate(); err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
mqttTargets[k] = mqttArgs
|
|
|
|
}
|
2019-10-30 23:39:09 -07:00
|
|
|
return mqttTargets, nil
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// DefaultMySQLKVS - default KV for MySQL
|
|
|
|
var (
|
|
|
|
DefaultMySQLKVS = config.KVS{
|
2019-11-20 15:10:24 -08:00
|
|
|
config.KV{
|
2019-12-04 15:32:37 -08:00
|
|
|
Key: config.Enable,
|
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.MySQLFormat,
|
|
|
|
Value: formatNamespace,
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.MySQLDSNString,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.MySQLTable,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.MySQLQueueDir,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.MySQLQueueLimit,
|
|
|
|
Value: "0",
|
|
|
|
},
|
2020-09-25 10:50:30 +05:30
|
|
|
config.KV{
|
|
|
|
Key: target.MySQLMaxOpenConnections,
|
|
|
|
Value: "2",
|
|
|
|
},
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
// GetNotifyMySQL - returns a map of registered notification 'mysql' targets
|
2019-10-30 23:39:09 -07:00
|
|
|
func GetNotifyMySQL(mysqlKVS map[string]config.KVS) (map[string]target.MySQLArgs, error) {
|
2019-10-22 22:59:13 -07:00
|
|
|
mysqlTargets := make(map[string]target.MySQLArgs)
|
2021-07-13 09:39:13 -07:00
|
|
|
for k, kv := range config.Merge(mysqlKVS, target.EnvMySQLEnable, DefaultMySQLKVS) {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv := target.EnvMySQLEnable
|
2019-10-22 22:59:13 -07:00
|
|
|
if k != config.Default {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv = enableEnv + config.Default + k
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
2019-12-04 15:32:37 -08:00
|
|
|
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
if !enabled {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
queueLimitEnv := target.EnvMySQLQueueLimit
|
|
|
|
if k != config.Default {
|
|
|
|
queueLimitEnv = queueLimitEnv + config.Default + k
|
|
|
|
}
|
|
|
|
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.MySQLQueueLimit)), 10, 64)
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
formatEnv := target.EnvMySQLFormat
|
|
|
|
if k != config.Default {
|
|
|
|
formatEnv = formatEnv + config.Default + k
|
|
|
|
}
|
2020-04-09 21:45:17 -07:00
|
|
|
|
2019-10-22 22:59:13 -07:00
|
|
|
dsnStringEnv := target.EnvMySQLDSNString
|
|
|
|
if k != config.Default {
|
|
|
|
dsnStringEnv = dsnStringEnv + config.Default + k
|
|
|
|
}
|
2020-04-09 21:45:17 -07:00
|
|
|
|
2019-10-22 22:59:13 -07:00
|
|
|
tableEnv := target.EnvMySQLTable
|
|
|
|
if k != config.Default {
|
|
|
|
tableEnv = tableEnv + config.Default + k
|
|
|
|
}
|
2020-04-09 21:45:17 -07:00
|
|
|
|
2019-10-22 22:59:13 -07:00
|
|
|
queueDirEnv := target.EnvMySQLQueueDir
|
|
|
|
if k != config.Default {
|
|
|
|
queueDirEnv = queueDirEnv + config.Default + k
|
|
|
|
}
|
2020-09-25 10:50:30 +05:30
|
|
|
|
|
|
|
maxOpenConnectionsEnv := target.EnvMySQLMaxOpenConnections
|
|
|
|
if k != config.Default {
|
|
|
|
maxOpenConnectionsEnv = maxOpenConnectionsEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
maxOpenConnections, cErr := strconv.Atoi(env.Get(maxOpenConnectionsEnv, kv.Get(target.MySQLMaxOpenConnections)))
|
|
|
|
if cErr != nil {
|
|
|
|
return nil, cErr
|
|
|
|
}
|
|
|
|
|
2019-10-22 22:59:13 -07:00
|
|
|
mysqlArgs := target.MySQLArgs{
|
2020-09-25 10:50:30 +05:30
|
|
|
Enable: enabled,
|
|
|
|
Format: env.Get(formatEnv, kv.Get(target.MySQLFormat)),
|
|
|
|
DSN: env.Get(dsnStringEnv, kv.Get(target.MySQLDSNString)),
|
|
|
|
Table: env.Get(tableEnv, kv.Get(target.MySQLTable)),
|
|
|
|
QueueDir: env.Get(queueDirEnv, kv.Get(target.MySQLQueueDir)),
|
|
|
|
QueueLimit: queueLimit,
|
|
|
|
MaxOpenConnections: maxOpenConnections,
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
if err = mysqlArgs.Validate(); err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
mysqlTargets[k] = mysqlArgs
|
|
|
|
}
|
2019-10-30 23:39:09 -07:00
|
|
|
return mysqlTargets, nil
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// DefaultNATSKVS - NATS KV for nats config.
|
|
|
|
var (
|
|
|
|
DefaultNATSKVS = config.KVS{
|
2019-11-20 15:10:24 -08:00
|
|
|
config.KV{
|
2019-12-04 15:32:37 -08:00
|
|
|
Key: config.Enable,
|
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NATSAddress,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NATSSubject,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NATSUsername,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NATSPassword,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NATSToken,
|
|
|
|
Value: "",
|
|
|
|
},
|
2019-12-06 13:53:51 -08:00
|
|
|
config.KV{
|
|
|
|
Key: target.NATSTLS,
|
|
|
|
Value: config.EnableOff,
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NATSTLSSkipVerify,
|
|
|
|
Value: config.EnableOff,
|
|
|
|
},
|
2019-11-20 15:10:24 -08:00
|
|
|
config.KV{
|
|
|
|
Key: target.NATSCertAuthority,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NATSClientCert,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NATSClientKey,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NATSPingInterval,
|
|
|
|
Value: "0",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NATSStreaming,
|
2019-12-04 15:32:37 -08:00
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NATSStreamingAsync,
|
2019-12-04 15:32:37 -08:00
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NATSStreamingMaxPubAcksInFlight,
|
|
|
|
Value: "0",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NATSStreamingClusterID,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NATSQueueDir,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NATSQueueLimit,
|
|
|
|
Value: "0",
|
|
|
|
},
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
// GetNotifyNATS - returns a map of registered notification 'nats' targets
|
2019-12-06 13:53:51 -08:00
|
|
|
func GetNotifyNATS(natsKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[string]target.NATSArgs, error) {
|
2019-10-22 22:59:13 -07:00
|
|
|
natsTargets := make(map[string]target.NATSArgs)
|
2021-07-13 09:39:13 -07:00
|
|
|
for k, kv := range config.Merge(natsKVS, target.EnvNATSEnable, DefaultNATSKVS) {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv := target.EnvNATSEnable
|
2019-10-22 22:59:13 -07:00
|
|
|
if k != config.Default {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv = enableEnv + config.Default + k
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
2019-12-04 15:32:37 -08:00
|
|
|
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
if !enabled {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
addressEnv := target.EnvNATSAddress
|
|
|
|
if k != config.Default {
|
|
|
|
addressEnv = addressEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
address, err := xnet.ParseHost(env.Get(addressEnv, kv.Get(target.NATSAddress)))
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
pingIntervalEnv := target.EnvNATSPingInterval
|
|
|
|
if k != config.Default {
|
|
|
|
pingIntervalEnv = pingIntervalEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
pingInterval, err := strconv.ParseInt(env.Get(pingIntervalEnv, kv.Get(target.NATSPingInterval)), 10, 64)
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
queueLimitEnv := target.EnvNATSQueueLimit
|
|
|
|
if k != config.Default {
|
|
|
|
queueLimitEnv = queueLimitEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.NATSQueueLimit)), 10, 64)
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
2019-12-06 13:53:51 -08:00
|
|
|
tlsEnv := target.EnvNATSTLS
|
|
|
|
if k != config.Default {
|
|
|
|
tlsEnv = tlsEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
tlsSkipVerifyEnv := target.EnvNATSTLSSkipVerify
|
2019-10-22 22:59:13 -07:00
|
|
|
if k != config.Default {
|
2019-12-06 13:53:51 -08:00
|
|
|
tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
subjectEnv := target.EnvNATSSubject
|
|
|
|
if k != config.Default {
|
|
|
|
subjectEnv = subjectEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
usernameEnv := target.EnvNATSUsername
|
|
|
|
if k != config.Default {
|
|
|
|
usernameEnv = usernameEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
passwordEnv := target.EnvNATSPassword
|
|
|
|
if k != config.Default {
|
|
|
|
passwordEnv = passwordEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
tokenEnv := target.EnvNATSToken
|
|
|
|
if k != config.Default {
|
|
|
|
tokenEnv = tokenEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
queueDirEnv := target.EnvNATSQueueDir
|
|
|
|
if k != config.Default {
|
|
|
|
queueDirEnv = queueDirEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
2019-11-15 12:13:23 -05:00
|
|
|
certAuthorityEnv := target.EnvNATSCertAuthority
|
|
|
|
if k != config.Default {
|
|
|
|
certAuthorityEnv = certAuthorityEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
clientCertEnv := target.EnvNATSClientCert
|
|
|
|
if k != config.Default {
|
|
|
|
clientCertEnv = clientCertEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
clientKeyEnv := target.EnvNATSClientKey
|
|
|
|
if k != config.Default {
|
|
|
|
clientKeyEnv = clientKeyEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
2019-10-22 22:59:13 -07:00
|
|
|
natsArgs := target.NATSArgs{
|
2019-11-15 12:13:23 -05:00
|
|
|
Enable: true,
|
|
|
|
Address: *address,
|
|
|
|
Subject: env.Get(subjectEnv, kv.Get(target.NATSSubject)),
|
|
|
|
Username: env.Get(usernameEnv, kv.Get(target.NATSUsername)),
|
|
|
|
Password: env.Get(passwordEnv, kv.Get(target.NATSPassword)),
|
|
|
|
CertAuthority: env.Get(certAuthorityEnv, kv.Get(target.NATSCertAuthority)),
|
|
|
|
ClientCert: env.Get(clientCertEnv, kv.Get(target.NATSClientCert)),
|
|
|
|
ClientKey: env.Get(clientKeyEnv, kv.Get(target.NATSClientKey)),
|
|
|
|
Token: env.Get(tokenEnv, kv.Get(target.NATSToken)),
|
2019-12-06 13:53:51 -08:00
|
|
|
TLS: env.Get(tlsEnv, kv.Get(target.NATSTLS)) == config.EnableOn,
|
|
|
|
TLSSkipVerify: env.Get(tlsSkipVerifyEnv, kv.Get(target.NATSTLSSkipVerify)) == config.EnableOn,
|
2019-11-15 12:13:23 -05:00
|
|
|
PingInterval: pingInterval,
|
|
|
|
QueueDir: env.Get(queueDirEnv, kv.Get(target.NATSQueueDir)),
|
|
|
|
QueueLimit: queueLimit,
|
2019-12-06 13:53:51 -08:00
|
|
|
RootCAs: rootCAs,
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
2019-11-13 17:38:05 -08:00
|
|
|
streamingEnableEnv := target.EnvNATSStreaming
|
2019-10-22 22:59:13 -07:00
|
|
|
if k != config.Default {
|
|
|
|
streamingEnableEnv = streamingEnableEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
2019-12-04 15:32:37 -08:00
|
|
|
streamingEnabled := env.Get(streamingEnableEnv, kv.Get(target.NATSStreaming)) == config.EnableOn
|
2019-10-22 22:59:13 -07:00
|
|
|
if streamingEnabled {
|
|
|
|
asyncEnv := target.EnvNATSStreamingAsync
|
|
|
|
if k != config.Default {
|
|
|
|
asyncEnv = asyncEnv + config.Default + k
|
|
|
|
}
|
|
|
|
maxPubAcksInflightEnv := target.EnvNATSStreamingMaxPubAcksInFlight
|
|
|
|
if k != config.Default {
|
|
|
|
maxPubAcksInflightEnv = maxPubAcksInflightEnv + config.Default + k
|
|
|
|
}
|
|
|
|
maxPubAcksInflight, err := strconv.Atoi(env.Get(maxPubAcksInflightEnv,
|
|
|
|
kv.Get(target.NATSStreamingMaxPubAcksInFlight)))
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
clusterIDEnv := target.EnvNATSStreamingClusterID
|
|
|
|
if k != config.Default {
|
|
|
|
clusterIDEnv = clusterIDEnv + config.Default + k
|
|
|
|
}
|
|
|
|
natsArgs.Streaming.Enable = streamingEnabled
|
|
|
|
natsArgs.Streaming.ClusterID = env.Get(clusterIDEnv, kv.Get(target.NATSStreamingClusterID))
|
2019-12-04 15:32:37 -08:00
|
|
|
natsArgs.Streaming.Async = env.Get(asyncEnv, kv.Get(target.NATSStreamingAsync)) == config.EnableOn
|
2019-10-22 22:59:13 -07:00
|
|
|
natsArgs.Streaming.MaxPubAcksInflight = maxPubAcksInflight
|
|
|
|
}
|
|
|
|
|
|
|
|
if err = natsArgs.Validate(); err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
natsTargets[k] = natsArgs
|
|
|
|
}
|
2019-10-30 23:39:09 -07:00
|
|
|
return natsTargets, nil
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// DefaultNSQKVS - NSQ KV for config
|
|
|
|
var (
|
|
|
|
DefaultNSQKVS = config.KVS{
|
2019-11-20 15:10:24 -08:00
|
|
|
config.KV{
|
2019-12-04 15:32:37 -08:00
|
|
|
Key: config.Enable,
|
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NSQAddress,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NSQTopic,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NSQTLS,
|
2019-12-04 15:32:37 -08:00
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NSQTLSSkipVerify,
|
2019-12-04 15:32:37 -08:00
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NSQQueueDir,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.NSQQueueLimit,
|
|
|
|
Value: "0",
|
|
|
|
},
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
// GetNotifyNSQ - returns a map of registered notification 'nsq' targets
|
2019-10-30 23:39:09 -07:00
|
|
|
func GetNotifyNSQ(nsqKVS map[string]config.KVS) (map[string]target.NSQArgs, error) {
|
2019-10-22 22:59:13 -07:00
|
|
|
nsqTargets := make(map[string]target.NSQArgs)
|
2021-07-13 09:39:13 -07:00
|
|
|
for k, kv := range config.Merge(nsqKVS, target.EnvNSQEnable, DefaultNSQKVS) {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv := target.EnvNSQEnable
|
2019-10-22 22:59:13 -07:00
|
|
|
if k != config.Default {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv = enableEnv + config.Default + k
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
2019-12-04 15:32:37 -08:00
|
|
|
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
if !enabled {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
addressEnv := target.EnvNSQAddress
|
|
|
|
if k != config.Default {
|
|
|
|
addressEnv = addressEnv + config.Default + k
|
|
|
|
}
|
|
|
|
nsqdAddress, err := xnet.ParseHost(env.Get(addressEnv, kv.Get(target.NSQAddress)))
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2019-11-13 17:38:05 -08:00
|
|
|
tlsEnableEnv := target.EnvNSQTLS
|
2019-10-22 22:59:13 -07:00
|
|
|
if k != config.Default {
|
|
|
|
tlsEnableEnv = tlsEnableEnv + config.Default + k
|
|
|
|
}
|
|
|
|
tlsSkipVerifyEnv := target.EnvNSQTLSSkipVerify
|
|
|
|
if k != config.Default {
|
|
|
|
tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
queueLimitEnv := target.EnvNSQQueueLimit
|
|
|
|
if k != config.Default {
|
|
|
|
queueLimitEnv = queueLimitEnv + config.Default + k
|
|
|
|
}
|
|
|
|
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.NSQQueueLimit)), 10, 64)
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
topicEnv := target.EnvNSQTopic
|
|
|
|
if k != config.Default {
|
|
|
|
topicEnv = topicEnv + config.Default + k
|
|
|
|
}
|
|
|
|
queueDirEnv := target.EnvNSQQueueDir
|
|
|
|
if k != config.Default {
|
|
|
|
queueDirEnv = queueDirEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
nsqArgs := target.NSQArgs{
|
|
|
|
Enable: enabled,
|
|
|
|
NSQDAddress: *nsqdAddress,
|
|
|
|
Topic: env.Get(topicEnv, kv.Get(target.NSQTopic)),
|
|
|
|
QueueDir: env.Get(queueDirEnv, kv.Get(target.NSQQueueDir)),
|
|
|
|
QueueLimit: queueLimit,
|
|
|
|
}
|
2019-12-04 15:32:37 -08:00
|
|
|
nsqArgs.TLS.Enable = env.Get(tlsEnableEnv, kv.Get(target.NSQTLS)) == config.EnableOn
|
|
|
|
nsqArgs.TLS.SkipVerify = env.Get(tlsSkipVerifyEnv, kv.Get(target.NSQTLSSkipVerify)) == config.EnableOn
|
2019-10-22 22:59:13 -07:00
|
|
|
|
|
|
|
if err = nsqArgs.Validate(); err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
nsqTargets[k] = nsqArgs
|
|
|
|
}
|
2019-10-30 23:39:09 -07:00
|
|
|
return nsqTargets, nil
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// DefaultPostgresKVS - default Postgres KV for server config.
|
|
|
|
var (
|
|
|
|
DefaultPostgresKVS = config.KVS{
|
2019-11-20 15:10:24 -08:00
|
|
|
config.KV{
|
2019-12-04 15:32:37 -08:00
|
|
|
Key: config.Enable,
|
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.PostgresFormat,
|
|
|
|
Value: formatNamespace,
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.PostgresConnectionString,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.PostgresTable,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.PostgresQueueDir,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.PostgresQueueLimit,
|
|
|
|
Value: "0",
|
|
|
|
},
|
2020-09-25 10:50:30 +05:30
|
|
|
config.KV{
|
|
|
|
Key: target.PostgresMaxOpenConnections,
|
|
|
|
Value: "2",
|
|
|
|
},
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
// GetNotifyPostgres - returns a map of registered notification 'postgres' targets
|
2019-10-30 23:39:09 -07:00
|
|
|
func GetNotifyPostgres(postgresKVS map[string]config.KVS) (map[string]target.PostgreSQLArgs, error) {
|
2019-10-22 22:59:13 -07:00
|
|
|
psqlTargets := make(map[string]target.PostgreSQLArgs)
|
2021-07-13 09:39:13 -07:00
|
|
|
for k, kv := range config.Merge(postgresKVS, target.EnvPostgresEnable, DefaultPostgresKVS) {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv := target.EnvPostgresEnable
|
2019-10-22 22:59:13 -07:00
|
|
|
if k != config.Default {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv = enableEnv + config.Default + k
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
2019-12-04 15:32:37 -08:00
|
|
|
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
if !enabled {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
queueLimitEnv := target.EnvPostgresQueueLimit
|
|
|
|
if k != config.Default {
|
|
|
|
queueLimitEnv = queueLimitEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.PostgresQueueLimit)))
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
formatEnv := target.EnvPostgresFormat
|
|
|
|
if k != config.Default {
|
|
|
|
formatEnv = formatEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
connectionStringEnv := target.EnvPostgresConnectionString
|
|
|
|
if k != config.Default {
|
|
|
|
connectionStringEnv = connectionStringEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
tableEnv := target.EnvPostgresTable
|
|
|
|
if k != config.Default {
|
|
|
|
tableEnv = tableEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
queueDirEnv := target.EnvPostgresQueueDir
|
|
|
|
if k != config.Default {
|
|
|
|
queueDirEnv = queueDirEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
2020-09-25 10:50:30 +05:30
|
|
|
maxOpenConnectionsEnv := target.EnvPostgresMaxOpenConnections
|
|
|
|
if k != config.Default {
|
|
|
|
maxOpenConnectionsEnv = maxOpenConnectionsEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
maxOpenConnections, cErr := strconv.Atoi(env.Get(maxOpenConnectionsEnv, kv.Get(target.PostgresMaxOpenConnections)))
|
|
|
|
if cErr != nil {
|
|
|
|
return nil, cErr
|
|
|
|
}
|
|
|
|
|
2019-10-22 22:59:13 -07:00
|
|
|
psqlArgs := target.PostgreSQLArgs{
|
2020-09-25 10:50:30 +05:30
|
|
|
Enable: enabled,
|
|
|
|
Format: env.Get(formatEnv, kv.Get(target.PostgresFormat)),
|
|
|
|
ConnectionString: env.Get(connectionStringEnv, kv.Get(target.PostgresConnectionString)),
|
|
|
|
Table: env.Get(tableEnv, kv.Get(target.PostgresTable)),
|
|
|
|
QueueDir: env.Get(queueDirEnv, kv.Get(target.PostgresQueueDir)),
|
|
|
|
QueueLimit: uint64(queueLimit),
|
|
|
|
MaxOpenConnections: maxOpenConnections,
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
if err = psqlArgs.Validate(); err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
psqlTargets[k] = psqlArgs
|
|
|
|
}
|
2019-10-30 23:39:09 -07:00
|
|
|
|
|
|
|
return psqlTargets, nil
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// DefaultRedisKVS - default KV for redis config
|
|
|
|
var (
|
|
|
|
DefaultRedisKVS = config.KVS{
|
2019-11-20 15:10:24 -08:00
|
|
|
config.KV{
|
2019-12-04 15:32:37 -08:00
|
|
|
Key: config.Enable,
|
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.RedisFormat,
|
|
|
|
Value: formatNamespace,
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.RedisAddress,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.RedisKey,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.RedisPassword,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.RedisQueueDir,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.RedisQueueLimit,
|
|
|
|
Value: "0",
|
|
|
|
},
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
// GetNotifyRedis - returns a map of registered notification 'redis' targets
|
2019-10-30 23:39:09 -07:00
|
|
|
func GetNotifyRedis(redisKVS map[string]config.KVS) (map[string]target.RedisArgs, error) {
|
2019-10-22 22:59:13 -07:00
|
|
|
redisTargets := make(map[string]target.RedisArgs)
|
2021-07-13 09:39:13 -07:00
|
|
|
for k, kv := range config.Merge(redisKVS, target.EnvRedisEnable, DefaultRedisKVS) {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv := target.EnvRedisEnable
|
2019-10-22 22:59:13 -07:00
|
|
|
if k != config.Default {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv = enableEnv + config.Default + k
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
2019-12-04 15:32:37 -08:00
|
|
|
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
if !enabled {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
addressEnv := target.EnvRedisAddress
|
|
|
|
if k != config.Default {
|
|
|
|
addressEnv = addressEnv + config.Default + k
|
|
|
|
}
|
|
|
|
addr, err := xnet.ParseHost(env.Get(addressEnv, kv.Get(target.RedisAddress)))
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
queueLimitEnv := target.EnvRedisQueueLimit
|
|
|
|
if k != config.Default {
|
|
|
|
queueLimitEnv = queueLimitEnv + config.Default + k
|
|
|
|
}
|
|
|
|
queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.RedisQueueLimit)))
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
formatEnv := target.EnvRedisFormat
|
|
|
|
if k != config.Default {
|
|
|
|
formatEnv = formatEnv + config.Default + k
|
|
|
|
}
|
|
|
|
passwordEnv := target.EnvRedisPassword
|
|
|
|
if k != config.Default {
|
|
|
|
passwordEnv = passwordEnv + config.Default + k
|
|
|
|
}
|
|
|
|
keyEnv := target.EnvRedisKey
|
|
|
|
if k != config.Default {
|
|
|
|
keyEnv = keyEnv + config.Default + k
|
|
|
|
}
|
|
|
|
queueDirEnv := target.EnvRedisQueueDir
|
|
|
|
if k != config.Default {
|
|
|
|
queueDirEnv = queueDirEnv + config.Default + k
|
|
|
|
}
|
|
|
|
redisArgs := target.RedisArgs{
|
|
|
|
Enable: enabled,
|
|
|
|
Format: env.Get(formatEnv, kv.Get(target.RedisFormat)),
|
|
|
|
Addr: *addr,
|
|
|
|
Password: env.Get(passwordEnv, kv.Get(target.RedisPassword)),
|
|
|
|
Key: env.Get(keyEnv, kv.Get(target.RedisKey)),
|
|
|
|
QueueDir: env.Get(queueDirEnv, kv.Get(target.RedisQueueDir)),
|
|
|
|
QueueLimit: uint64(queueLimit),
|
|
|
|
}
|
|
|
|
if err = redisArgs.Validate(); err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
redisTargets[k] = redisArgs
|
|
|
|
}
|
2019-10-30 23:39:09 -07:00
|
|
|
return redisTargets, nil
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// DefaultWebhookKVS - default KV for webhook config
|
|
|
|
var (
|
|
|
|
DefaultWebhookKVS = config.KVS{
|
2019-11-20 15:10:24 -08:00
|
|
|
config.KV{
|
2019-12-04 15:32:37 -08:00
|
|
|
Key: config.Enable,
|
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.WebhookEndpoint,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.WebhookAuthToken,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.WebhookQueueLimit,
|
|
|
|
Value: "0",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.WebhookQueueDir,
|
|
|
|
Value: "",
|
|
|
|
},
|
2020-06-08 18:25:44 +05:30
|
|
|
config.KV{
|
|
|
|
Key: target.WebhookClientCert,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.WebhookClientKey,
|
|
|
|
Value: "",
|
|
|
|
},
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
// GetNotifyWebhook - returns a map of registered notification 'webhook' targets
|
2019-12-12 06:53:50 -08:00
|
|
|
func GetNotifyWebhook(webhookKVS map[string]config.KVS, transport *http.Transport) (
|
|
|
|
map[string]target.WebhookArgs, error) {
|
2019-10-22 22:59:13 -07:00
|
|
|
webhookTargets := make(map[string]target.WebhookArgs)
|
2021-07-13 09:39:13 -07:00
|
|
|
for k, kv := range config.Merge(webhookKVS, target.EnvWebhookEnable, DefaultWebhookKVS) {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv := target.EnvWebhookEnable
|
2019-10-22 22:59:13 -07:00
|
|
|
if k != config.Default {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv = enableEnv + config.Default + k
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2019-12-04 15:32:37 -08:00
|
|
|
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
if !enabled {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
urlEnv := target.EnvWebhookEndpoint
|
|
|
|
if k != config.Default {
|
|
|
|
urlEnv = urlEnv + config.Default + k
|
|
|
|
}
|
2019-10-30 23:39:09 -07:00
|
|
|
url, err := xnet.ParseHTTPURL(env.Get(urlEnv, kv.Get(target.WebhookEndpoint)))
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
queueLimitEnv := target.EnvWebhookQueueLimit
|
|
|
|
if k != config.Default {
|
|
|
|
queueLimitEnv = queueLimitEnv + config.Default + k
|
|
|
|
}
|
|
|
|
queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.WebhookQueueLimit)))
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
queueDirEnv := target.EnvWebhookQueueDir
|
|
|
|
if k != config.Default {
|
|
|
|
queueDirEnv = queueDirEnv + config.Default + k
|
|
|
|
}
|
|
|
|
authEnv := target.EnvWebhookAuthToken
|
|
|
|
if k != config.Default {
|
|
|
|
authEnv = authEnv + config.Default + k
|
|
|
|
}
|
2020-06-08 18:25:44 +05:30
|
|
|
clientCertEnv := target.EnvWebhookClientCert
|
|
|
|
if k != config.Default {
|
|
|
|
clientCertEnv = clientCertEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
clientKeyEnv := target.EnvWebhookClientKey
|
|
|
|
if k != config.Default {
|
|
|
|
clientKeyEnv = clientKeyEnv + config.Default + k
|
|
|
|
}
|
2019-10-22 22:59:13 -07:00
|
|
|
|
|
|
|
webhookArgs := target.WebhookArgs{
|
|
|
|
Enable: enabled,
|
|
|
|
Endpoint: *url,
|
2019-12-12 06:53:50 -08:00
|
|
|
Transport: transport,
|
2019-10-22 22:59:13 -07:00
|
|
|
AuthToken: env.Get(authEnv, kv.Get(target.WebhookAuthToken)),
|
|
|
|
QueueDir: env.Get(queueDirEnv, kv.Get(target.WebhookQueueDir)),
|
|
|
|
QueueLimit: uint64(queueLimit),
|
2020-06-08 18:25:44 +05:30
|
|
|
ClientCert: env.Get(clientCertEnv, kv.Get(target.WebhookClientCert)),
|
|
|
|
ClientKey: env.Get(clientKeyEnv, kv.Get(target.WebhookClientKey)),
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
if err = webhookArgs.Validate(); err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
webhookTargets[k] = webhookArgs
|
|
|
|
}
|
2019-10-30 23:39:09 -07:00
|
|
|
return webhookTargets, nil
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// DefaultESKVS - default KV config for Elasticsearch target
|
|
|
|
var (
|
|
|
|
DefaultESKVS = config.KVS{
|
2019-11-20 15:10:24 -08:00
|
|
|
config.KV{
|
2019-12-04 15:32:37 -08:00
|
|
|
Key: config.Enable,
|
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.ElasticURL,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.ElasticFormat,
|
|
|
|
Value: formatNamespace,
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.ElasticIndex,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.ElasticQueueDir,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.ElasticQueueLimit,
|
|
|
|
Value: "0",
|
|
|
|
},
|
2020-08-23 22:13:48 +05:30
|
|
|
config.KV{
|
|
|
|
Key: target.ElasticUsername,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.ElasticPassword,
|
|
|
|
Value: "",
|
|
|
|
},
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
// GetNotifyES - returns a map of registered notification 'elasticsearch' targets
|
2020-08-23 22:13:48 +05:30
|
|
|
func GetNotifyES(esKVS map[string]config.KVS, transport *http.Transport) (map[string]target.ElasticsearchArgs, error) {
|
2019-10-22 22:59:13 -07:00
|
|
|
esTargets := make(map[string]target.ElasticsearchArgs)
|
2021-07-13 09:39:13 -07:00
|
|
|
for k, kv := range config.Merge(esKVS, target.EnvElasticEnable, DefaultESKVS) {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv := target.EnvElasticEnable
|
2019-10-22 22:59:13 -07:00
|
|
|
if k != config.Default {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv = enableEnv + config.Default + k
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2019-12-04 15:32:37 -08:00
|
|
|
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
if !enabled {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
urlEnv := target.EnvElasticURL
|
|
|
|
if k != config.Default {
|
|
|
|
urlEnv = urlEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
2019-10-30 23:39:09 -07:00
|
|
|
url, err := xnet.ParseHTTPURL(env.Get(urlEnv, kv.Get(target.ElasticURL)))
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
queueLimitEnv := target.EnvElasticQueueLimit
|
|
|
|
if k != config.Default {
|
|
|
|
queueLimitEnv = queueLimitEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.ElasticQueueLimit)))
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
formatEnv := target.EnvElasticFormat
|
|
|
|
if k != config.Default {
|
|
|
|
formatEnv = formatEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
indexEnv := target.EnvElasticIndex
|
|
|
|
if k != config.Default {
|
|
|
|
indexEnv = indexEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
queueDirEnv := target.EnvElasticQueueDir
|
|
|
|
if k != config.Default {
|
|
|
|
queueDirEnv = queueDirEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
2020-08-23 22:13:48 +05:30
|
|
|
usernameEnv := target.EnvElasticUsername
|
|
|
|
if k != config.Default {
|
|
|
|
usernameEnv = usernameEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
|
|
|
passwordEnv := target.EnvElasticPassword
|
|
|
|
if k != config.Default {
|
|
|
|
passwordEnv = passwordEnv + config.Default + k
|
|
|
|
}
|
|
|
|
|
2019-10-22 22:59:13 -07:00
|
|
|
esArgs := target.ElasticsearchArgs{
|
|
|
|
Enable: enabled,
|
|
|
|
Format: env.Get(formatEnv, kv.Get(target.ElasticFormat)),
|
|
|
|
URL: *url,
|
|
|
|
Index: env.Get(indexEnv, kv.Get(target.ElasticIndex)),
|
|
|
|
QueueDir: env.Get(queueDirEnv, kv.Get(target.ElasticQueueDir)),
|
|
|
|
QueueLimit: uint64(queueLimit),
|
2020-08-23 22:13:48 +05:30
|
|
|
Transport: transport,
|
|
|
|
Username: env.Get(usernameEnv, kv.Get(target.ElasticUsername)),
|
|
|
|
Password: env.Get(passwordEnv, kv.Get(target.ElasticPassword)),
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
if err = esArgs.Validate(); err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
esTargets[k] = esArgs
|
|
|
|
}
|
2019-10-30 23:39:09 -07:00
|
|
|
return esTargets, nil
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// DefaultAMQPKVS - default KV for AMQP config
|
|
|
|
var (
|
|
|
|
DefaultAMQPKVS = config.KVS{
|
2019-11-20 15:10:24 -08:00
|
|
|
config.KV{
|
2019-12-04 15:32:37 -08:00
|
|
|
Key: config.Enable,
|
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.AmqpURL,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.AmqpExchange,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.AmqpExchangeType,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.AmqpRoutingKey,
|
|
|
|
Value: "",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.AmqpMandatory,
|
2019-12-04 15:32:37 -08:00
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.AmqpDurable,
|
2019-12-04 15:32:37 -08:00
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.AmqpNoWait,
|
2019-12-04 15:32:37 -08:00
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.AmqpInternal,
|
2019-12-04 15:32:37 -08:00
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.AmqpAutoDeleted,
|
2019-12-04 15:32:37 -08:00
|
|
|
Value: config.EnableOff,
|
2019-11-20 15:10:24 -08:00
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.AmqpDeliveryMode,
|
|
|
|
Value: "0",
|
|
|
|
},
|
2021-06-14 13:28:44 -07:00
|
|
|
config.KV{
|
|
|
|
Key: target.AmqpPublisherConfirms,
|
|
|
|
Value: config.EnableOff,
|
|
|
|
},
|
2019-11-20 15:10:24 -08:00
|
|
|
config.KV{
|
|
|
|
Key: target.AmqpQueueLimit,
|
|
|
|
Value: "0",
|
|
|
|
},
|
|
|
|
config.KV{
|
|
|
|
Key: target.AmqpQueueDir,
|
|
|
|
Value: "",
|
|
|
|
},
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
// GetNotifyAMQP - returns a map of registered notification 'amqp' targets
|
2019-10-30 23:39:09 -07:00
|
|
|
func GetNotifyAMQP(amqpKVS map[string]config.KVS) (map[string]target.AMQPArgs, error) {
|
2019-10-22 22:59:13 -07:00
|
|
|
amqpTargets := make(map[string]target.AMQPArgs)
|
2021-07-13 09:39:13 -07:00
|
|
|
for k, kv := range config.Merge(amqpKVS, target.EnvAMQPEnable, DefaultAMQPKVS) {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv := target.EnvAMQPEnable
|
2019-10-22 22:59:13 -07:00
|
|
|
if k != config.Default {
|
2019-12-04 15:32:37 -08:00
|
|
|
enableEnv = enableEnv + config.Default + k
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
2019-12-04 15:32:37 -08:00
|
|
|
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
|
2019-10-22 22:59:13 -07:00
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
if !enabled {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
urlEnv := target.EnvAMQPURL
|
|
|
|
if k != config.Default {
|
|
|
|
urlEnv = urlEnv + config.Default + k
|
|
|
|
}
|
|
|
|
url, err := xnet.ParseURL(env.Get(urlEnv, kv.Get(target.AmqpURL)))
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
deliveryModeEnv := target.EnvAMQPDeliveryMode
|
|
|
|
if k != config.Default {
|
|
|
|
deliveryModeEnv = deliveryModeEnv + config.Default + k
|
|
|
|
}
|
|
|
|
deliveryMode, err := strconv.Atoi(env.Get(deliveryModeEnv, kv.Get(target.AmqpDeliveryMode)))
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
exchangeEnv := target.EnvAMQPExchange
|
|
|
|
if k != config.Default {
|
|
|
|
exchangeEnv = exchangeEnv + config.Default + k
|
|
|
|
}
|
|
|
|
routingKeyEnv := target.EnvAMQPRoutingKey
|
|
|
|
if k != config.Default {
|
|
|
|
routingKeyEnv = routingKeyEnv + config.Default + k
|
|
|
|
}
|
|
|
|
exchangeTypeEnv := target.EnvAMQPExchangeType
|
|
|
|
if k != config.Default {
|
|
|
|
exchangeTypeEnv = exchangeTypeEnv + config.Default + k
|
|
|
|
}
|
|
|
|
mandatoryEnv := target.EnvAMQPMandatory
|
|
|
|
if k != config.Default {
|
|
|
|
mandatoryEnv = mandatoryEnv + config.Default + k
|
|
|
|
}
|
|
|
|
immediateEnv := target.EnvAMQPImmediate
|
|
|
|
if k != config.Default {
|
|
|
|
immediateEnv = immediateEnv + config.Default + k
|
|
|
|
}
|
|
|
|
durableEnv := target.EnvAMQPDurable
|
|
|
|
if k != config.Default {
|
|
|
|
durableEnv = durableEnv + config.Default + k
|
|
|
|
}
|
|
|
|
internalEnv := target.EnvAMQPInternal
|
|
|
|
if k != config.Default {
|
|
|
|
internalEnv = internalEnv + config.Default + k
|
|
|
|
}
|
|
|
|
noWaitEnv := target.EnvAMQPNoWait
|
|
|
|
if k != config.Default {
|
|
|
|
noWaitEnv = noWaitEnv + config.Default + k
|
|
|
|
}
|
|
|
|
autoDeletedEnv := target.EnvAMQPAutoDeleted
|
|
|
|
if k != config.Default {
|
|
|
|
autoDeletedEnv = autoDeletedEnv + config.Default + k
|
|
|
|
}
|
2021-06-14 13:28:44 -07:00
|
|
|
publisherConfirmsEnv := target.EnvAMQPPublisherConfirms
|
|
|
|
if k != config.Default {
|
|
|
|
publisherConfirmsEnv = publisherConfirmsEnv + config.Default + k
|
|
|
|
}
|
2019-10-22 22:59:13 -07:00
|
|
|
queueDirEnv := target.EnvAMQPQueueDir
|
|
|
|
if k != config.Default {
|
|
|
|
queueDirEnv = queueDirEnv + config.Default + k
|
|
|
|
}
|
|
|
|
queueLimitEnv := target.EnvAMQPQueueLimit
|
|
|
|
if k != config.Default {
|
|
|
|
queueLimitEnv = queueLimitEnv + config.Default + k
|
|
|
|
}
|
|
|
|
queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.AmqpQueueLimit)), 10, 64)
|
|
|
|
if err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
amqpArgs := target.AMQPArgs{
|
2021-06-14 13:28:44 -07:00
|
|
|
Enable: enabled,
|
|
|
|
URL: *url,
|
|
|
|
Exchange: env.Get(exchangeEnv, kv.Get(target.AmqpExchange)),
|
|
|
|
RoutingKey: env.Get(routingKeyEnv, kv.Get(target.AmqpRoutingKey)),
|
|
|
|
ExchangeType: env.Get(exchangeTypeEnv, kv.Get(target.AmqpExchangeType)),
|
|
|
|
DeliveryMode: uint8(deliveryMode),
|
|
|
|
Mandatory: env.Get(mandatoryEnv, kv.Get(target.AmqpMandatory)) == config.EnableOn,
|
|
|
|
Immediate: env.Get(immediateEnv, kv.Get(target.AmqpImmediate)) == config.EnableOn,
|
|
|
|
Durable: env.Get(durableEnv, kv.Get(target.AmqpDurable)) == config.EnableOn,
|
|
|
|
Internal: env.Get(internalEnv, kv.Get(target.AmqpInternal)) == config.EnableOn,
|
|
|
|
NoWait: env.Get(noWaitEnv, kv.Get(target.AmqpNoWait)) == config.EnableOn,
|
|
|
|
AutoDeleted: env.Get(autoDeletedEnv, kv.Get(target.AmqpAutoDeleted)) == config.EnableOn,
|
|
|
|
PublisherConfirms: env.Get(publisherConfirmsEnv, kv.Get(target.AmqpPublisherConfirms)) == config.EnableOn,
|
|
|
|
QueueDir: env.Get(queueDirEnv, kv.Get(target.AmqpQueueDir)),
|
|
|
|
QueueLimit: queueLimit,
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
if err = amqpArgs.Validate(); err != nil {
|
2019-10-30 23:39:09 -07:00
|
|
|
return nil, err
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|
|
|
|
amqpTargets[k] = amqpArgs
|
|
|
|
}
|
2019-10-30 23:39:09 -07:00
|
|
|
return amqpTargets, nil
|
2019-10-22 22:59:13 -07:00
|
|
|
}
|