mirror of
https://github.com/minio/minio.git
synced 2025-11-10 14:09:48 -05:00
Move storageclass config handling into cmd/config/storageclass (#8360)
Continuation of the changes done in PR #8351 to refactor, add tests and move global handling into a more idiomatic style for Go as packages.
This commit is contained in:
committed by
kannappanr
parent
002ac82631
commit
3b8adf7528
@@ -26,7 +26,6 @@ import (
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/event"
|
||||
xnet "github.com/minio/minio/pkg/net"
|
||||
"github.com/streadway/amqp"
|
||||
@@ -72,11 +71,12 @@ func (a *AMQPArgs) Validate() error {
|
||||
|
||||
// AMQPTarget - AMQP target
|
||||
type AMQPTarget struct {
|
||||
id event.TargetID
|
||||
args AMQPArgs
|
||||
conn *amqp.Connection
|
||||
connMutex sync.Mutex
|
||||
store Store
|
||||
id event.TargetID
|
||||
args AMQPArgs
|
||||
conn *amqp.Connection
|
||||
connMutex sync.Mutex
|
||||
store Store
|
||||
loggerOnce func(ctx context.Context, err error, id interface{})
|
||||
}
|
||||
|
||||
// ID - returns TargetID.
|
||||
@@ -174,7 +174,7 @@ func (target *AMQPTarget) Save(eventData event.Event) error {
|
||||
}
|
||||
defer func() {
|
||||
cErr := ch.Close()
|
||||
logger.LogOnceIf(context.Background(), cErr, target.ID())
|
||||
target.loggerOnce(context.Background(), cErr, target.ID())
|
||||
}()
|
||||
|
||||
return target.send(eventData, ch)
|
||||
@@ -188,7 +188,7 @@ func (target *AMQPTarget) Send(eventKey string) error {
|
||||
}
|
||||
defer func() {
|
||||
cErr := ch.Close()
|
||||
logger.LogOnceIf(context.Background(), cErr, target.ID())
|
||||
target.loggerOnce(context.Background(), cErr, target.ID())
|
||||
}()
|
||||
|
||||
eventData, eErr := target.store.Get(eventKey)
|
||||
@@ -215,7 +215,7 @@ func (target *AMQPTarget) Close() error {
|
||||
}
|
||||
|
||||
// NewAMQPTarget - creates new AMQP target.
|
||||
func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}) (*AMQPTarget, error) {
|
||||
func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*AMQPTarget, error) {
|
||||
var conn *amqp.Connection
|
||||
var err error
|
||||
|
||||
@@ -237,10 +237,11 @@ func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}) (*AMQPTarge
|
||||
}
|
||||
|
||||
target := &AMQPTarget{
|
||||
id: event.TargetID{ID: id, Name: "amqp"},
|
||||
args: args,
|
||||
conn: conn,
|
||||
store: store,
|
||||
id: event.TargetID{ID: id, Name: "amqp"},
|
||||
args: args,
|
||||
conn: conn,
|
||||
store: store,
|
||||
loggerOnce: loggerOnce,
|
||||
}
|
||||
|
||||
if target.store != nil {
|
||||
|
||||
@@ -28,7 +28,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/event"
|
||||
xnet "github.com/minio/minio/pkg/net"
|
||||
)
|
||||
@@ -95,11 +94,12 @@ func (r RedisArgs) validateFormat(c redis.Conn) error {
|
||||
|
||||
// RedisTarget - Redis target.
|
||||
type RedisTarget struct {
|
||||
id event.TargetID
|
||||
args RedisArgs
|
||||
pool *redis.Pool
|
||||
store Store
|
||||
firstPing bool
|
||||
id event.TargetID
|
||||
args RedisArgs
|
||||
pool *redis.Pool
|
||||
store Store
|
||||
firstPing bool
|
||||
loggerOnce func(ctx context.Context, err error, id interface{})
|
||||
}
|
||||
|
||||
// ID - returns target ID.
|
||||
@@ -115,7 +115,7 @@ func (target *RedisTarget) Save(eventData event.Event) error {
|
||||
conn := target.pool.Get()
|
||||
defer func() {
|
||||
cErr := conn.Close()
|
||||
logger.LogOnceIf(context.Background(), cErr, target.ID())
|
||||
target.loggerOnce(context.Background(), cErr, target.ID())
|
||||
}()
|
||||
_, pingErr := conn.Do("PING")
|
||||
if pingErr != nil {
|
||||
@@ -132,7 +132,7 @@ func (target *RedisTarget) send(eventData event.Event) error {
|
||||
conn := target.pool.Get()
|
||||
defer func() {
|
||||
cErr := conn.Close()
|
||||
logger.LogOnceIf(context.Background(), cErr, target.ID())
|
||||
target.loggerOnce(context.Background(), cErr, target.ID())
|
||||
}()
|
||||
|
||||
if target.args.Format == event.NamespaceFormat {
|
||||
@@ -175,7 +175,7 @@ func (target *RedisTarget) Send(eventKey string) error {
|
||||
conn := target.pool.Get()
|
||||
defer func() {
|
||||
cErr := conn.Close()
|
||||
logger.LogOnceIf(context.Background(), cErr, target.ID())
|
||||
target.loggerOnce(context.Background(), cErr, target.ID())
|
||||
}()
|
||||
_, pingErr := conn.Do("PING")
|
||||
if pingErr != nil {
|
||||
@@ -222,7 +222,7 @@ func (target *RedisTarget) Close() error {
|
||||
}
|
||||
|
||||
// NewRedisTarget - creates new Redis target.
|
||||
func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}) (*RedisTarget, error) {
|
||||
func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*RedisTarget, error) {
|
||||
pool := &redis.Pool{
|
||||
MaxIdle: 3,
|
||||
IdleTimeout: 2 * 60 * time.Second,
|
||||
@@ -239,7 +239,7 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}) (*RedisTa
|
||||
if _, err = conn.Do("AUTH", args.Password); err != nil {
|
||||
cErr := conn.Close()
|
||||
targetID := event.TargetID{ID: id, Name: "redis"}
|
||||
logger.LogOnceIf(context.Background(), cErr, targetID.String())
|
||||
loggerOnce(context.Background(), cErr, targetID)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -262,16 +262,17 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}) (*RedisTa
|
||||
}
|
||||
|
||||
target := &RedisTarget{
|
||||
id: event.TargetID{ID: id, Name: "redis"},
|
||||
args: args,
|
||||
pool: pool,
|
||||
store: store,
|
||||
id: event.TargetID{ID: id, Name: "redis"},
|
||||
args: args,
|
||||
pool: pool,
|
||||
store: store,
|
||||
loggerOnce: loggerOnce,
|
||||
}
|
||||
|
||||
conn := target.pool.Get()
|
||||
defer func() {
|
||||
cErr := conn.Close()
|
||||
logger.LogOnceIf(context.Background(), cErr, target.ID())
|
||||
target.loggerOnce(context.Background(), cErr, target.ID())
|
||||
}()
|
||||
|
||||
_, pingErr := conn.Do("PING")
|
||||
|
||||
@@ -18,37 +18,41 @@ package openid
|
||||
|
||||
import (
|
||||
"crypto"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
jwtgo "github.com/dgrijalva/jwt-go"
|
||||
"github.com/minio/minio/pkg/env"
|
||||
xnet "github.com/minio/minio/pkg/net"
|
||||
)
|
||||
|
||||
// JWKSArgs - RSA authentication target arguments
|
||||
type JWKSArgs struct {
|
||||
URL *xnet.URL `json:"url"`
|
||||
publicKeys map[string]crypto.PublicKey
|
||||
URL *xnet.URL `json:"url"`
|
||||
publicKeys map[string]crypto.PublicKey
|
||||
transport *http.Transport
|
||||
closeRespFn func(io.ReadCloser)
|
||||
}
|
||||
|
||||
// PopulatePublicKey - populates a new publickey from the JWKS URL.
|
||||
func (r *JWKSArgs) PopulatePublicKey() error {
|
||||
insecureClient := &http.Client{Transport: newCustomHTTPTransport(true)}
|
||||
client := &http.Client{Transport: newCustomHTTPTransport(false)}
|
||||
if r.URL == nil {
|
||||
return nil
|
||||
}
|
||||
client := &http.Client{}
|
||||
if r.transport != nil {
|
||||
client.Transport = r.transport
|
||||
}
|
||||
resp, err := client.Get(r.URL.String())
|
||||
if err != nil {
|
||||
resp, err = insecureClient.Get(r.URL.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
defer r.closeRespFn(resp.Body)
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return errors.New(resp.Status)
|
||||
}
|
||||
@@ -133,27 +137,6 @@ func GetDefaultExpiration(dsecs string) (time.Duration, error) {
|
||||
return defaultExpiryDuration, nil
|
||||
}
|
||||
|
||||
// newCustomHTTPTransport returns a new http configuration
|
||||
// used while communicating with the cloud backends.
|
||||
// This sets the value for MaxIdleConnsPerHost from 2 (go default)
|
||||
// to 100.
|
||||
func newCustomHTTPTransport(insecure bool) *http.Transport {
|
||||
return &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 1024,
|
||||
MaxIdleConnsPerHost: 1024,
|
||||
IdleConnTimeout: 30 * time.Second,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
ExpectContinueTimeout: 1 * time.Second,
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure},
|
||||
DisableCompression: true,
|
||||
}
|
||||
}
|
||||
|
||||
// Validate - validates the access token.
|
||||
func (p *JWT) Validate(token, dsecs string) (map[string]interface{}, error) {
|
||||
jp := new(jwtgo.Parser)
|
||||
@@ -211,6 +194,34 @@ func (p *JWT) ID() ID {
|
||||
return "jwt"
|
||||
}
|
||||
|
||||
// JWKS url
|
||||
const (
|
||||
EnvIAMJWKSURL = "MINIO_IAM_JWKS_URL"
|
||||
)
|
||||
|
||||
// LookupConfig lookup jwks from config, override with any ENVs.
|
||||
func LookupConfig(args JWKSArgs, transport *http.Transport, closeRespFn func(io.ReadCloser)) (JWKSArgs, error) {
|
||||
var urlStr string
|
||||
if args.URL != nil {
|
||||
urlStr = args.URL.String()
|
||||
}
|
||||
|
||||
jwksURL := env.Get(EnvIAMJWKSURL, urlStr)
|
||||
if jwksURL == "" {
|
||||
return args, nil
|
||||
}
|
||||
|
||||
u, err := xnet.ParseURL(jwksURL)
|
||||
if err != nil {
|
||||
return args, err
|
||||
}
|
||||
args.URL = u
|
||||
if err := args.PopulatePublicKey(); err != nil {
|
||||
return args, err
|
||||
}
|
||||
return args, nil
|
||||
}
|
||||
|
||||
// NewJWT - initialize new jwt authenticator.
|
||||
func NewJWT(args JWKSArgs) *JWT {
|
||||
return &JWT{
|
||||
|
||||
@@ -23,9 +23,16 @@ import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
"github.com/minio/minio/pkg/env"
|
||||
xnet "github.com/minio/minio/pkg/net"
|
||||
)
|
||||
|
||||
// Env IAM OPA URL
|
||||
const (
|
||||
EnvIAMOPAURL = "MINIO_IAM_OPA_URL"
|
||||
EnvIAMOPAAuthToken = "MINIO_IAM_OPA_AUTHTOKEN"
|
||||
)
|
||||
|
||||
// OpaArgs opa general purpose policy engine configuration.
|
||||
type OpaArgs struct {
|
||||
URL *xnet.URL `json:"url"`
|
||||
@@ -82,10 +89,36 @@ type Opa struct {
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
// LookupConfig lookup Opa from config, override with any ENVs.
|
||||
func LookupConfig(args OpaArgs, transport *http.Transport, closeRespFn func(io.ReadCloser)) (OpaArgs, error) {
|
||||
var urlStr string
|
||||
if args.URL != nil {
|
||||
urlStr = args.URL.String()
|
||||
}
|
||||
opaURL := env.Get(EnvIAMOPAURL, urlStr)
|
||||
if opaURL == "" {
|
||||
return args, nil
|
||||
}
|
||||
u, err := xnet.ParseURL(opaURL)
|
||||
if err != nil {
|
||||
return args, err
|
||||
}
|
||||
args = OpaArgs{
|
||||
URL: u,
|
||||
AuthToken: env.Get(EnvIAMOPAAuthToken, ""),
|
||||
Transport: transport,
|
||||
CloseRespFn: closeRespFn,
|
||||
}
|
||||
if err = args.Validate(); err != nil {
|
||||
return args, err
|
||||
}
|
||||
return args, nil
|
||||
}
|
||||
|
||||
// NewOpa - initializes opa policy engine connector.
|
||||
func NewOpa(args OpaArgs) *Opa {
|
||||
// No opa args.
|
||||
if args.URL == nil && args.AuthToken == "" {
|
||||
if args.URL == nil || args.URL.Scheme == "" && args.AuthToken == "" {
|
||||
return nil
|
||||
}
|
||||
return &Opa{
|
||||
|
||||
Reference in New Issue
Block a user