mirror of
https://github.com/minio/minio.git
synced 2025-01-23 20:53:18 -05:00
9a4d003ac7
The middleware sets up tracing, throttling, gzipped responses and collecting API stats. Additionally, this change updates the names of handler functions in metric labels to be the same as the name derived from Go lang reflection on the handler name. The metric api labels are now stored in memory the same as the handler name - they will be camelcased, e.g. `GetObject` instead of `getobject`. For compatibility, we lowercase the metric api label values when emitting the metrics.
587 lines
15 KiB
Go
587 lines
15 KiB
Go
// Copyright (c) 2015-2024 MinIO, Inc
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"math/rand"
|
|
"net/http"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/minio/madmin-go/v3"
|
|
"github.com/minio/minio/internal/crypto"
|
|
"github.com/minio/minio/internal/hash"
|
|
"github.com/minio/minio/internal/kms"
|
|
"github.com/minio/minio/internal/logger"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
)
|
|
|
|
//go:generate msgp -file $GOFILE
|
|
|
|
var (
|
|
errTierMissingCredentials = AdminError{
|
|
Code: "XMinioAdminTierMissingCredentials",
|
|
Message: "Specified remote credentials are empty",
|
|
StatusCode: http.StatusForbidden,
|
|
}
|
|
|
|
errTierBackendInUse = AdminError{
|
|
Code: "XMinioAdminTierBackendInUse",
|
|
Message: "Specified remote tier is already in use",
|
|
StatusCode: http.StatusConflict,
|
|
}
|
|
|
|
errTierTypeUnsupported = AdminError{
|
|
Code: "XMinioAdminTierTypeUnsupported",
|
|
Message: "Specified tier type is unsupported",
|
|
StatusCode: http.StatusBadRequest,
|
|
}
|
|
|
|
errTierBackendNotEmpty = AdminError{
|
|
Code: "XMinioAdminTierBackendNotEmpty",
|
|
Message: "Specified remote backend is not empty",
|
|
StatusCode: http.StatusBadRequest,
|
|
}
|
|
)
|
|
|
|
const (
|
|
tierConfigFile = "tier-config.bin"
|
|
tierConfigFormat = 1
|
|
tierConfigV1 = 1
|
|
tierConfigVersion = 2
|
|
)
|
|
|
|
// tierConfigPath refers to remote tier config object name
|
|
var tierConfigPath = path.Join(minioConfigPrefix, tierConfigFile)
|
|
|
|
const tierCfgRefreshAtHdr = "X-MinIO-TierCfg-RefreshedAt"
|
|
|
|
// TierConfigMgr holds the collection of remote tiers configured in this deployment.
|
|
type TierConfigMgr struct {
|
|
sync.RWMutex `msg:"-"`
|
|
drivercache map[string]WarmBackend `msg:"-"`
|
|
|
|
Tiers map[string]madmin.TierConfig `json:"tiers"`
|
|
lastRefreshedAt time.Time `msg:"-"`
|
|
}
|
|
|
|
type tierMetrics struct {
|
|
sync.RWMutex // protects requestsCount only
|
|
requestsCount map[string]struct {
|
|
success int64
|
|
failure int64
|
|
}
|
|
histogram *prometheus.HistogramVec
|
|
}
|
|
|
|
var globalTierMetrics = tierMetrics{
|
|
requestsCount: make(map[string]struct {
|
|
success int64
|
|
failure int64
|
|
}),
|
|
histogram: prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
|
Name: "tier_ttlb_seconds",
|
|
Help: "Time taken by requests served by warm tier",
|
|
Buckets: []float64{0.01, 0.1, 1, 2, 5, 10, 60, 5 * 60, 15 * 60, 30 * 60},
|
|
}, []string{"tier"}),
|
|
}
|
|
|
|
func (t *tierMetrics) Observe(tier string, dur time.Duration) {
|
|
t.histogram.With(prometheus.Labels{"tier": tier}).Observe(dur.Seconds())
|
|
}
|
|
|
|
func (t *tierMetrics) logSuccess(tier string) {
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
|
|
stat := t.requestsCount[tier]
|
|
stat.success++
|
|
t.requestsCount[tier] = stat
|
|
}
|
|
|
|
func (t *tierMetrics) logFailure(tier string) {
|
|
t.Lock()
|
|
defer t.Unlock()
|
|
|
|
stat := t.requestsCount[tier]
|
|
stat.failure++
|
|
t.requestsCount[tier] = stat
|
|
}
|
|
|
|
var (
|
|
// {minio_node}_{tier}_{ttlb_seconds_distribution}
|
|
tierTTLBMD = MetricDescription{
|
|
Namespace: nodeMetricNamespace,
|
|
Subsystem: tierSubsystem,
|
|
Name: ttlbDistribution,
|
|
Help: "Distribution of time to last byte for objects downloaded from warm tier",
|
|
Type: gaugeMetric,
|
|
}
|
|
|
|
// {minio_node}_{tier}_{requests_success}
|
|
tierRequestsSuccessMD = MetricDescription{
|
|
Namespace: nodeMetricNamespace,
|
|
Subsystem: tierSubsystem,
|
|
Name: tierRequestsSuccess,
|
|
Help: "Number of requests to download object from warm tier that were successful",
|
|
Type: counterMetric,
|
|
}
|
|
// {minio_node}_{tier}_{requests_failure}
|
|
tierRequestsFailureMD = MetricDescription{
|
|
Namespace: nodeMetricNamespace,
|
|
Subsystem: tierSubsystem,
|
|
Name: tierRequestsFailure,
|
|
Help: "Number of requests to download object from warm tier that failed",
|
|
Type: counterMetric,
|
|
}
|
|
)
|
|
|
|
func (t *tierMetrics) Report() []Metric {
|
|
metrics := getHistogramMetrics(t.histogram, tierTTLBMD, true)
|
|
t.RLock()
|
|
defer t.RUnlock()
|
|
for tier, stat := range t.requestsCount {
|
|
metrics = append(metrics, Metric{
|
|
Description: tierRequestsSuccessMD,
|
|
Value: float64(stat.success),
|
|
VariableLabels: map[string]string{"tier": tier},
|
|
})
|
|
metrics = append(metrics, Metric{
|
|
Description: tierRequestsFailureMD,
|
|
Value: float64(stat.failure),
|
|
VariableLabels: map[string]string{"tier": tier},
|
|
})
|
|
}
|
|
return metrics
|
|
}
|
|
|
|
func (config *TierConfigMgr) refreshedAt() time.Time {
|
|
config.RLock()
|
|
defer config.RUnlock()
|
|
return config.lastRefreshedAt
|
|
}
|
|
|
|
// IsTierValid returns true if there exists a remote tier by name tierName,
|
|
// otherwise returns false.
|
|
func (config *TierConfigMgr) IsTierValid(tierName string) bool {
|
|
config.RLock()
|
|
defer config.RUnlock()
|
|
_, valid := config.isTierNameInUse(tierName)
|
|
return valid
|
|
}
|
|
|
|
// isTierNameInUse returns tier type and true if there exists a remote tier by
|
|
// name tierName, otherwise returns madmin.Unsupported and false. N B this
|
|
// function is meant for internal use, where the caller is expected to take
|
|
// appropriate locks.
|
|
func (config *TierConfigMgr) isTierNameInUse(tierName string) (madmin.TierType, bool) {
|
|
if t, ok := config.Tiers[tierName]; ok {
|
|
return t.Type, true
|
|
}
|
|
return madmin.Unsupported, false
|
|
}
|
|
|
|
// Add adds tier to config if it passes all validations.
|
|
func (config *TierConfigMgr) Add(ctx context.Context, tier madmin.TierConfig, ignoreInUse bool) error {
|
|
config.Lock()
|
|
defer config.Unlock()
|
|
|
|
// check if tier name is in all caps
|
|
tierName := tier.Name
|
|
if tierName != strings.ToUpper(tierName) {
|
|
return errTierNameNotUppercase
|
|
}
|
|
|
|
// check if tier name already in use
|
|
if _, exists := config.isTierNameInUse(tierName); exists {
|
|
return errTierAlreadyExists
|
|
}
|
|
|
|
d, err := newWarmBackend(ctx, tier)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !ignoreInUse {
|
|
// Check if warmbackend is in use by other MinIO tenants
|
|
inUse, err := d.InUse(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if inUse {
|
|
return errTierBackendInUse
|
|
}
|
|
}
|
|
|
|
config.Tiers[tierName] = tier
|
|
config.drivercache[tierName] = d
|
|
|
|
return nil
|
|
}
|
|
|
|
// Remove removes tier if it is empty.
|
|
func (config *TierConfigMgr) Remove(ctx context.Context, tier string) error {
|
|
d, err := config.getDriver(tier)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if inuse, err := d.InUse(ctx); err != nil {
|
|
return err
|
|
} else if inuse {
|
|
return errTierBackendNotEmpty
|
|
}
|
|
config.Lock()
|
|
delete(config.Tiers, tier)
|
|
delete(config.drivercache, tier)
|
|
config.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// Verify verifies if tier's config is valid by performing all supported
|
|
// operations on the corresponding warmbackend.
|
|
func (config *TierConfigMgr) Verify(ctx context.Context, tier string) error {
|
|
d, err := config.getDriver(tier)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return checkWarmBackend(ctx, d)
|
|
}
|
|
|
|
// Empty returns if tier targets are empty
|
|
func (config *TierConfigMgr) Empty() bool {
|
|
if config == nil {
|
|
return true
|
|
}
|
|
return len(config.ListTiers()) == 0
|
|
}
|
|
|
|
// TierType returns the type of tier
|
|
func (config *TierConfigMgr) TierType(name string) string {
|
|
config.RLock()
|
|
defer config.RUnlock()
|
|
|
|
cfg, ok := config.Tiers[name]
|
|
if !ok {
|
|
return "internal"
|
|
}
|
|
return cfg.Type.String()
|
|
}
|
|
|
|
// ListTiers lists remote tiers configured in this deployment.
|
|
func (config *TierConfigMgr) ListTiers() []madmin.TierConfig {
|
|
if config == nil {
|
|
return nil
|
|
}
|
|
|
|
config.RLock()
|
|
defer config.RUnlock()
|
|
|
|
var tierCfgs []madmin.TierConfig
|
|
for _, tier := range config.Tiers {
|
|
// This makes a local copy of tier config before
|
|
// passing a reference to it.
|
|
tier := tier.Clone()
|
|
tierCfgs = append(tierCfgs, tier)
|
|
}
|
|
return tierCfgs
|
|
}
|
|
|
|
// Edit replaces the credentials of the remote tier specified by tierName with creds.
|
|
func (config *TierConfigMgr) Edit(ctx context.Context, tierName string, creds madmin.TierCreds) error {
|
|
config.Lock()
|
|
defer config.Unlock()
|
|
|
|
// check if tier by this name exists
|
|
tierType, exists := config.isTierNameInUse(tierName)
|
|
if !exists {
|
|
return errTierNotFound
|
|
}
|
|
|
|
cfg := config.Tiers[tierName]
|
|
switch tierType {
|
|
case madmin.S3:
|
|
if creds.AWSRole {
|
|
cfg.S3.AWSRole = true
|
|
}
|
|
if creds.AWSRoleWebIdentityTokenFile != "" && creds.AWSRoleARN != "" {
|
|
cfg.S3.AWSRoleARN = creds.AWSRoleARN
|
|
cfg.S3.AWSRoleWebIdentityTokenFile = creds.AWSRoleWebIdentityTokenFile
|
|
}
|
|
if creds.AccessKey != "" && creds.SecretKey != "" {
|
|
cfg.S3.AccessKey = creds.AccessKey
|
|
cfg.S3.SecretKey = creds.SecretKey
|
|
}
|
|
case madmin.Azure:
|
|
if creds.SecretKey != "" {
|
|
cfg.Azure.AccountKey = creds.SecretKey
|
|
}
|
|
if creds.AzSP.TenantID != "" {
|
|
cfg.Azure.SPAuth.TenantID = creds.AzSP.TenantID
|
|
}
|
|
if creds.AzSP.ClientID != "" {
|
|
cfg.Azure.SPAuth.ClientID = creds.AzSP.ClientID
|
|
}
|
|
if creds.AzSP.ClientSecret != "" {
|
|
cfg.Azure.SPAuth.ClientSecret = creds.AzSP.ClientSecret
|
|
}
|
|
case madmin.GCS:
|
|
if creds.CredsJSON == nil {
|
|
return errTierMissingCredentials
|
|
}
|
|
cfg.GCS.Creds = base64.URLEncoding.EncodeToString(creds.CredsJSON)
|
|
case madmin.MinIO:
|
|
if creds.AccessKey == "" || creds.SecretKey == "" {
|
|
return errTierMissingCredentials
|
|
}
|
|
cfg.MinIO.AccessKey = creds.AccessKey
|
|
cfg.MinIO.SecretKey = creds.SecretKey
|
|
}
|
|
|
|
d, err := newWarmBackend(ctx, cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
config.Tiers[tierName] = cfg
|
|
config.drivercache[tierName] = d
|
|
return nil
|
|
}
|
|
|
|
// Bytes returns msgpack encoded config with format and version headers.
|
|
func (config *TierConfigMgr) Bytes() ([]byte, error) {
|
|
config.RLock()
|
|
defer config.RUnlock()
|
|
data := make([]byte, 4, config.Msgsize()+4)
|
|
|
|
// Initialize the header.
|
|
binary.LittleEndian.PutUint16(data[0:2], tierConfigFormat)
|
|
binary.LittleEndian.PutUint16(data[2:4], tierConfigVersion)
|
|
|
|
// Marshal the tier config
|
|
return config.MarshalMsg(data)
|
|
}
|
|
|
|
// getDriver returns a warmBackend interface object initialized with remote tier config matching tierName
|
|
func (config *TierConfigMgr) getDriver(tierName string) (d WarmBackend, err error) {
|
|
config.Lock()
|
|
defer config.Unlock()
|
|
|
|
var ok bool
|
|
// Lookup in-memory drivercache
|
|
d, ok = config.drivercache[tierName]
|
|
if ok {
|
|
return d, nil
|
|
}
|
|
|
|
// Initialize driver from tier config matching tierName
|
|
t, ok := config.Tiers[tierName]
|
|
if !ok {
|
|
return nil, errTierNotFound
|
|
}
|
|
d, err = newWarmBackend(context.TODO(), t)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
config.drivercache[tierName] = d
|
|
return d, nil
|
|
}
|
|
|
|
// configReader returns a PutObjReader and ObjectOptions needed to save config
|
|
// using a PutObject API. PutObjReader encrypts json encoded tier configurations
|
|
// if KMS is enabled, otherwise simply yields the json encoded bytes as is.
|
|
// Similarly, ObjectOptions value depends on KMS' status.
|
|
func (config *TierConfigMgr) configReader(ctx context.Context) (*PutObjReader, *ObjectOptions, error) {
|
|
b, err := config.Bytes()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
payloadSize := int64(len(b))
|
|
br := bytes.NewReader(b)
|
|
hr, err := hash.NewReader(ctx, br, payloadSize, "", "", payloadSize)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if GlobalKMS == nil {
|
|
return NewPutObjReader(hr), &ObjectOptions{MaxParity: true}, nil
|
|
}
|
|
|
|
// Note: Local variables with names ek, oek, etc are named inline with
|
|
// acronyms defined here -
|
|
// https://github.com/minio/minio/blob/master/docs/security/README.md#acronyms
|
|
|
|
// Encrypt json encoded tier configurations
|
|
metadata := make(map[string]string)
|
|
encBr, oek, err := newEncryptReader(context.Background(), hr, crypto.S3, "", nil, minioMetaBucket, tierConfigPath, metadata, kms.Context{})
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
info := ObjectInfo{
|
|
Size: payloadSize,
|
|
}
|
|
encSize := info.EncryptedSize()
|
|
encHr, err := hash.NewReader(ctx, encBr, encSize, "", "", encSize)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
pReader, err := NewPutObjReader(hr).WithEncryption(encHr, &oek)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
opts := &ObjectOptions{
|
|
UserDefined: metadata,
|
|
MTime: UTCNow(),
|
|
MaxParity: true,
|
|
}
|
|
|
|
return pReader, opts, nil
|
|
}
|
|
|
|
// Reload updates config by reloading remote tier config from config store.
|
|
func (config *TierConfigMgr) Reload(ctx context.Context, objAPI ObjectLayer) error {
|
|
newConfig, err := loadTierConfig(ctx, objAPI)
|
|
switch err {
|
|
case nil:
|
|
break
|
|
case errConfigNotFound: // nothing to reload
|
|
// To maintain the invariance that lastRefreshedAt records the
|
|
// timestamp of last successful refresh
|
|
config.lastRefreshedAt = UTCNow()
|
|
return nil
|
|
default:
|
|
return err
|
|
}
|
|
|
|
config.Lock()
|
|
defer config.Unlock()
|
|
// Reset drivercache built using current config
|
|
for k := range config.drivercache {
|
|
delete(config.drivercache, k)
|
|
}
|
|
// Remove existing tier configs
|
|
for k := range config.Tiers {
|
|
delete(config.Tiers, k)
|
|
}
|
|
// Copy over the new tier configs
|
|
for tier, cfg := range newConfig.Tiers {
|
|
config.Tiers[tier] = cfg
|
|
}
|
|
config.lastRefreshedAt = UTCNow()
|
|
return nil
|
|
}
|
|
|
|
// Save saves tier configuration onto objAPI
|
|
func (config *TierConfigMgr) Save(ctx context.Context, objAPI ObjectLayer) error {
|
|
if objAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
|
|
pr, opts, err := globalTierConfigMgr.configReader(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = objAPI.PutObject(ctx, minioMetaBucket, tierConfigPath, pr, *opts)
|
|
return err
|
|
}
|
|
|
|
// NewTierConfigMgr - creates new tier configuration manager,
|
|
func NewTierConfigMgr() *TierConfigMgr {
|
|
return &TierConfigMgr{
|
|
drivercache: make(map[string]WarmBackend),
|
|
Tiers: make(map[string]madmin.TierConfig),
|
|
}
|
|
}
|
|
|
|
func (config *TierConfigMgr) refreshTierConfig(ctx context.Context, objAPI ObjectLayer) {
|
|
const tierCfgRefresh = 15 * time.Minute
|
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
randInterval := func() time.Duration {
|
|
return time.Duration(r.Float64() * 5 * float64(time.Second))
|
|
}
|
|
|
|
// To avoid all MinIO nodes reading the tier config object at the same
|
|
// time.
|
|
t := time.NewTimer(tierCfgRefresh + randInterval())
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-t.C:
|
|
err := config.Reload(ctx, objAPI)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
}
|
|
t.Reset(tierCfgRefresh + randInterval())
|
|
}
|
|
}
|
|
|
|
// loadTierConfig loads remote tier configuration from objAPI.
|
|
func loadTierConfig(ctx context.Context, objAPI ObjectLayer) (*TierConfigMgr, error) {
|
|
if objAPI == nil {
|
|
return nil, errServerNotInitialized
|
|
}
|
|
|
|
data, err := readConfig(ctx, objAPI, tierConfigPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(data) <= 4 {
|
|
return nil, fmt.Errorf("tierConfigInit: no data")
|
|
}
|
|
|
|
// Read header
|
|
switch format := binary.LittleEndian.Uint16(data[0:2]); format {
|
|
case tierConfigFormat:
|
|
default:
|
|
return nil, fmt.Errorf("tierConfigInit: unknown format: %d", format)
|
|
}
|
|
|
|
cfg := NewTierConfigMgr()
|
|
switch version := binary.LittleEndian.Uint16(data[2:4]); version {
|
|
case tierConfigV1, tierConfigVersion:
|
|
if _, decErr := cfg.UnmarshalMsg(data[4:]); decErr != nil {
|
|
return nil, decErr
|
|
}
|
|
default:
|
|
return nil, fmt.Errorf("tierConfigInit: unknown version: %d", version)
|
|
}
|
|
|
|
return cfg, nil
|
|
}
|
|
|
|
// Init initializes tier configuration reading from objAPI
|
|
func (config *TierConfigMgr) Init(ctx context.Context, objAPI ObjectLayer) error {
|
|
err := config.Reload(ctx, objAPI)
|
|
if globalIsDistErasure {
|
|
go config.refreshTierConfig(ctx, objAPI)
|
|
}
|
|
return err
|
|
}
|