mirror of
https://github.com/minio/minio.git
synced 2024-12-26 23:25:54 -05:00
95bf4a57b6
Create new code paths for multiple subsystems in the code. This will make maintaing this easier later. Also introduce bugLogIf() for errors that should not happen in the first place.
220 lines
6.0 KiB
Go
220 lines
6.0 KiB
Go
// Copyright (c) 2015-2023 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package lambda
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
|
|
"github.com/minio/minio/internal/config"
|
|
"github.com/minio/minio/internal/config/lambda/event"
|
|
"github.com/minio/minio/internal/config/lambda/target"
|
|
"github.com/minio/minio/internal/logger"
|
|
"github.com/minio/pkg/v2/env"
|
|
xnet "github.com/minio/pkg/v2/net"
|
|
)
|
|
|
|
const (
|
|
logSubsys = "notify"
|
|
)
|
|
|
|
func logOnceIf(ctx context.Context, err error, id string, errKind ...interface{}) {
|
|
logger.LogOnceIf(ctx, logSubsys, err, id, errKind...)
|
|
}
|
|
|
|
// ErrTargetsOffline - Indicates single/multiple target failures.
|
|
var ErrTargetsOffline = errors.New("one or more targets are offline. Please use `mc admin info --json` to check the offline targets")
|
|
|
|
// TestSubSysLambdaTargets - tests notification targets of given subsystem
|
|
func TestSubSysLambdaTargets(ctx context.Context, cfg config.Config, subSys string, transport *http.Transport) error {
|
|
if err := checkValidLambdaKeysForSubSys(subSys, cfg[subSys]); err != nil {
|
|
return err
|
|
}
|
|
|
|
targetList, err := fetchSubSysTargets(ctx, cfg, subSys, transport)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, target := range targetList {
|
|
defer target.Close()
|
|
}
|
|
|
|
for _, target := range targetList {
|
|
yes, err := target.IsActive()
|
|
if err == nil && !yes {
|
|
err = ErrTargetsOffline
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("error (%s): %w", target.ID(), err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func fetchSubSysTargets(ctx context.Context, cfg config.Config, subSys string, transport *http.Transport) (targets []event.Target, err error) {
|
|
if err := checkValidLambdaKeysForSubSys(subSys, cfg[subSys]); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if subSys == config.LambdaWebhookSubSys {
|
|
webhookTargets, err := GetLambdaWebhook(cfg[config.LambdaWebhookSubSys], transport)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for id, args := range webhookTargets {
|
|
if !args.Enable {
|
|
continue
|
|
}
|
|
t, err := target.NewWebhookTarget(ctx, id, args, logOnceIf, transport)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
targets = append(targets, t)
|
|
}
|
|
}
|
|
return targets, nil
|
|
}
|
|
|
|
// FetchEnabledTargets - Returns a set of configured TargetList
|
|
func FetchEnabledTargets(ctx context.Context, cfg config.Config, transport *http.Transport) (*event.TargetList, error) {
|
|
targetList := event.NewTargetList()
|
|
for _, subSys := range config.LambdaSubSystems.ToSlice() {
|
|
targets, err := fetchSubSysTargets(ctx, cfg, subSys, transport)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, t := range targets {
|
|
if err = targetList.Add(t); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
return targetList, nil
|
|
}
|
|
|
|
// DefaultLambdaKVS - default notification list of kvs.
|
|
var (
|
|
DefaultLambdaKVS = map[string]config.KVS{
|
|
config.LambdaWebhookSubSys: DefaultWebhookKVS,
|
|
}
|
|
)
|
|
|
|
// DefaultWebhookKVS - default KV for webhook config
|
|
var (
|
|
DefaultWebhookKVS = config.KVS{
|
|
config.KV{
|
|
Key: config.Enable,
|
|
Value: config.EnableOff,
|
|
},
|
|
config.KV{
|
|
Key: target.WebhookEndpoint,
|
|
Value: "",
|
|
},
|
|
config.KV{
|
|
Key: target.WebhookAuthToken,
|
|
Value: "",
|
|
},
|
|
config.KV{
|
|
Key: target.WebhookClientCert,
|
|
Value: "",
|
|
},
|
|
config.KV{
|
|
Key: target.WebhookClientKey,
|
|
Value: "",
|
|
},
|
|
}
|
|
)
|
|
|
|
func checkValidLambdaKeysForSubSys(subSys string, tgt map[string]config.KVS) error {
|
|
validKVS, ok := DefaultLambdaKVS[subSys]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
for tname, kv := range tgt {
|
|
subSysTarget := subSys
|
|
if tname != config.Default {
|
|
subSysTarget = subSys + config.SubSystemSeparator + tname
|
|
}
|
|
if v, ok := kv.Lookup(config.Enable); ok && v == config.EnableOn {
|
|
if err := config.CheckValidKeys(subSysTarget, kv, validKVS); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetLambdaWebhook - returns a map of registered notification 'webhook' targets
|
|
func GetLambdaWebhook(webhookKVS map[string]config.KVS, transport *http.Transport) (
|
|
map[string]target.WebhookArgs, error,
|
|
) {
|
|
webhookTargets := make(map[string]target.WebhookArgs)
|
|
for k, kv := range config.Merge(webhookKVS, target.EnvWebhookEnable, DefaultWebhookKVS) {
|
|
enableEnv := target.EnvWebhookEnable
|
|
if k != config.Default {
|
|
enableEnv = enableEnv + config.Default + k
|
|
}
|
|
enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable)))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !enabled {
|
|
continue
|
|
}
|
|
urlEnv := target.EnvWebhookEndpoint
|
|
if k != config.Default {
|
|
urlEnv = urlEnv + config.Default + k
|
|
}
|
|
url, err := xnet.ParseHTTPURL(env.Get(urlEnv, kv.Get(target.WebhookEndpoint)))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
authEnv := target.EnvWebhookAuthToken
|
|
if k != config.Default {
|
|
authEnv = authEnv + config.Default + k
|
|
}
|
|
clientCertEnv := target.EnvWebhookClientCert
|
|
if k != config.Default {
|
|
clientCertEnv = clientCertEnv + config.Default + k
|
|
}
|
|
|
|
clientKeyEnv := target.EnvWebhookClientKey
|
|
if k != config.Default {
|
|
clientKeyEnv = clientKeyEnv + config.Default + k
|
|
}
|
|
|
|
webhookArgs := target.WebhookArgs{
|
|
Enable: enabled,
|
|
Endpoint: *url,
|
|
Transport: transport,
|
|
AuthToken: env.Get(authEnv, kv.Get(target.WebhookAuthToken)),
|
|
ClientCert: env.Get(clientCertEnv, kv.Get(target.WebhookClientCert)),
|
|
ClientKey: env.Get(clientKeyEnv, kv.Get(target.WebhookClientKey)),
|
|
}
|
|
if err = webhookArgs.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
webhookTargets[k] = webhookArgs
|
|
}
|
|
return webhookTargets, nil
|
|
}
|