// Copyright (c) 2015-2024 MinIO, Inc // // This file is part of MinIO Object Storage stack // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . package cmd import ( "bytes" "context" "encoding/base64" "encoding/binary" "fmt" "math/rand" "net/http" "path" "strings" "sync" "time" "github.com/minio/madmin-go/v3" "github.com/minio/minio/internal/crypto" "github.com/minio/minio/internal/hash" "github.com/minio/minio/internal/kms" "github.com/minio/minio/internal/logger" "github.com/prometheus/client_golang/prometheus" ) //go:generate msgp -file $GOFILE var ( errTierMissingCredentials = AdminError{ Code: "XMinioAdminTierMissingCredentials", Message: "Specified remote credentials are empty", StatusCode: http.StatusForbidden, } errTierBackendInUse = AdminError{ Code: "XMinioAdminTierBackendInUse", Message: "Specified remote tier is already in use", StatusCode: http.StatusConflict, } errTierTypeUnsupported = AdminError{ Code: "XMinioAdminTierTypeUnsupported", Message: "Specified tier type is unsupported", StatusCode: http.StatusBadRequest, } errTierBackendNotEmpty = AdminError{ Code: "XMinioAdminTierBackendNotEmpty", Message: "Specified remote backend is not empty", StatusCode: http.StatusBadRequest, } ) const ( tierConfigFile = "tier-config.bin" tierConfigFormat = 1 tierConfigV1 = 1 tierConfigVersion = 2 ) // tierConfigPath refers to remote tier config object name var tierConfigPath = path.Join(minioConfigPrefix, tierConfigFile) const tierCfgRefreshAtHdr = "X-MinIO-TierCfg-RefreshedAt" // TierConfigMgr holds the collection of remote tiers configured in this deployment. type TierConfigMgr struct { sync.RWMutex `msg:"-"` drivercache map[string]WarmBackend `msg:"-"` Tiers map[string]madmin.TierConfig `json:"tiers"` lastRefreshedAt time.Time `msg:"-"` } type tierMetrics struct { sync.RWMutex // protects requestsCount only requestsCount map[string]struct { success int64 failure int64 } histogram *prometheus.HistogramVec } var globalTierMetrics = tierMetrics{ requestsCount: make(map[string]struct { success int64 failure int64 }), histogram: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "tier_ttlb_seconds", Help: "Time taken by requests served by warm tier", Buckets: []float64{0.01, 0.1, 1, 2, 5, 10, 60, 5 * 60, 15 * 60, 30 * 60}, }, []string{"tier"}), } func (t *tierMetrics) Observe(tier string, dur time.Duration) { t.histogram.With(prometheus.Labels{"tier": tier}).Observe(dur.Seconds()) } func (t *tierMetrics) logSuccess(tier string) { t.Lock() defer t.Unlock() stat := t.requestsCount[tier] stat.success++ t.requestsCount[tier] = stat } func (t *tierMetrics) logFailure(tier string) { t.Lock() defer t.Unlock() stat := t.requestsCount[tier] stat.failure++ t.requestsCount[tier] = stat } var ( // {minio_node}_{tier}_{ttlb_seconds_distribution} tierTTLBMD = MetricDescription{ Namespace: nodeMetricNamespace, Subsystem: tierSubsystem, Name: ttlbDistribution, Help: "Distribution of time to last byte for objects downloaded from warm tier", Type: gaugeMetric, } // {minio_node}_{tier}_{requests_success} tierRequestsSuccessMD = MetricDescription{ Namespace: nodeMetricNamespace, Subsystem: tierSubsystem, Name: tierRequestsSuccess, Help: "Number of requests to download object from warm tier that were successful", Type: counterMetric, } // {minio_node}_{tier}_{requests_failure} tierRequestsFailureMD = MetricDescription{ Namespace: nodeMetricNamespace, Subsystem: tierSubsystem, Name: tierRequestsFailure, Help: "Number of requests to download object from warm tier that failed", Type: counterMetric, } ) func (t *tierMetrics) Report() []Metric { metrics := getHistogramMetrics(t.histogram, tierTTLBMD) t.RLock() defer t.RUnlock() for tier, stat := range t.requestsCount { metrics = append(metrics, Metric{ Description: tierRequestsSuccessMD, Value: float64(stat.success), VariableLabels: map[string]string{"tier": tier}, }) metrics = append(metrics, Metric{ Description: tierRequestsFailureMD, Value: float64(stat.failure), VariableLabels: map[string]string{"tier": tier}, }) } return metrics } func (config *TierConfigMgr) refreshedAt() time.Time { config.RLock() defer config.RUnlock() return config.lastRefreshedAt } // IsTierValid returns true if there exists a remote tier by name tierName, // otherwise returns false. func (config *TierConfigMgr) IsTierValid(tierName string) bool { config.RLock() defer config.RUnlock() _, valid := config.isTierNameInUse(tierName) return valid } // isTierNameInUse returns tier type and true if there exists a remote tier by // name tierName, otherwise returns madmin.Unsupported and false. N B this // function is meant for internal use, where the caller is expected to take // appropriate locks. func (config *TierConfigMgr) isTierNameInUse(tierName string) (madmin.TierType, bool) { if t, ok := config.Tiers[tierName]; ok { return t.Type, true } return madmin.Unsupported, false } // Add adds tier to config if it passes all validations. func (config *TierConfigMgr) Add(ctx context.Context, tier madmin.TierConfig, ignoreInUse bool) error { config.Lock() defer config.Unlock() // check if tier name is in all caps tierName := tier.Name if tierName != strings.ToUpper(tierName) { return errTierNameNotUppercase } // check if tier name already in use if _, exists := config.isTierNameInUse(tierName); exists { return errTierAlreadyExists } d, err := newWarmBackend(ctx, tier) if err != nil { return err } if !ignoreInUse { // Check if warmbackend is in use by other MinIO tenants inUse, err := d.InUse(ctx) if err != nil { return err } if inUse { return errTierBackendInUse } } config.Tiers[tierName] = tier config.drivercache[tierName] = d return nil } // Remove removes tier if it is empty. func (config *TierConfigMgr) Remove(ctx context.Context, tier string) error { d, err := config.getDriver(tier) if err != nil { return err } if inuse, err := d.InUse(ctx); err != nil { return err } else if inuse { return errTierBackendNotEmpty } config.Lock() delete(config.Tiers, tier) delete(config.drivercache, tier) config.Unlock() return nil } // Verify verifies if tier's config is valid by performing all supported // operations on the corresponding warmbackend. func (config *TierConfigMgr) Verify(ctx context.Context, tier string) error { d, err := config.getDriver(tier) if err != nil { return err } return checkWarmBackend(ctx, d) } // Empty returns if tier targets are empty func (config *TierConfigMgr) Empty() bool { if config == nil { return true } return len(config.ListTiers()) == 0 } // TierType returns the type of tier func (config *TierConfigMgr) TierType(name string) string { config.RLock() defer config.RUnlock() cfg, ok := config.Tiers[name] if !ok { return "internal" } return cfg.Type.String() } // ListTiers lists remote tiers configured in this deployment. func (config *TierConfigMgr) ListTiers() []madmin.TierConfig { if config == nil { return nil } config.RLock() defer config.RUnlock() var tierCfgs []madmin.TierConfig for _, tier := range config.Tiers { // This makes a local copy of tier config before // passing a reference to it. tier := tier.Clone() tierCfgs = append(tierCfgs, tier) } return tierCfgs } // Edit replaces the credentials of the remote tier specified by tierName with creds. func (config *TierConfigMgr) Edit(ctx context.Context, tierName string, creds madmin.TierCreds) error { config.Lock() defer config.Unlock() // check if tier by this name exists tierType, exists := config.isTierNameInUse(tierName) if !exists { return errTierNotFound } cfg := config.Tiers[tierName] switch tierType { case madmin.S3: if creds.AWSRole { cfg.S3.AWSRole = true } if creds.AWSRoleWebIdentityTokenFile != "" && creds.AWSRoleARN != "" { cfg.S3.AWSRoleARN = creds.AWSRoleARN cfg.S3.AWSRoleWebIdentityTokenFile = creds.AWSRoleWebIdentityTokenFile } if creds.AccessKey != "" && creds.SecretKey != "" { cfg.S3.AccessKey = creds.AccessKey cfg.S3.SecretKey = creds.SecretKey } case madmin.Azure: if creds.SecretKey != "" { cfg.Azure.AccountKey = creds.SecretKey } if creds.AzSP.TenantID != "" { cfg.Azure.SPAuth.TenantID = creds.AzSP.TenantID } if creds.AzSP.ClientID != "" { cfg.Azure.SPAuth.ClientID = creds.AzSP.ClientID } if creds.AzSP.ClientSecret != "" { cfg.Azure.SPAuth.ClientSecret = creds.AzSP.ClientSecret } case madmin.GCS: if creds.CredsJSON == nil { return errTierMissingCredentials } cfg.GCS.Creds = base64.URLEncoding.EncodeToString(creds.CredsJSON) case madmin.MinIO: if creds.AccessKey == "" || creds.SecretKey == "" { return errTierMissingCredentials } cfg.MinIO.AccessKey = creds.AccessKey cfg.MinIO.SecretKey = creds.SecretKey } d, err := newWarmBackend(ctx, cfg) if err != nil { return err } config.Tiers[tierName] = cfg config.drivercache[tierName] = d return nil } // Bytes returns msgpack encoded config with format and version headers. func (config *TierConfigMgr) Bytes() ([]byte, error) { config.RLock() defer config.RUnlock() data := make([]byte, 4, config.Msgsize()+4) // Initialize the header. binary.LittleEndian.PutUint16(data[0:2], tierConfigFormat) binary.LittleEndian.PutUint16(data[2:4], tierConfigVersion) // Marshal the tier config return config.MarshalMsg(data) } // getDriver returns a warmBackend interface object initialized with remote tier config matching tierName func (config *TierConfigMgr) getDriver(tierName string) (d WarmBackend, err error) { config.Lock() defer config.Unlock() var ok bool // Lookup in-memory drivercache d, ok = config.drivercache[tierName] if ok { return d, nil } // Initialize driver from tier config matching tierName t, ok := config.Tiers[tierName] if !ok { return nil, errTierNotFound } d, err = newWarmBackend(context.TODO(), t) if err != nil { return nil, err } config.drivercache[tierName] = d return d, nil } // configReader returns a PutObjReader and ObjectOptions needed to save config // using a PutObject API. PutObjReader encrypts json encoded tier configurations // if KMS is enabled, otherwise simply yields the json encoded bytes as is. // Similarly, ObjectOptions value depends on KMS' status. func (config *TierConfigMgr) configReader(ctx context.Context) (*PutObjReader, *ObjectOptions, error) { b, err := config.Bytes() if err != nil { return nil, nil, err } payloadSize := int64(len(b)) br := bytes.NewReader(b) hr, err := hash.NewReader(ctx, br, payloadSize, "", "", payloadSize) if err != nil { return nil, nil, err } if GlobalKMS == nil { return NewPutObjReader(hr), &ObjectOptions{MaxParity: true}, nil } // Note: Local variables with names ek, oek, etc are named inline with // acronyms defined here - // https://github.com/minio/minio/blob/master/docs/security/README.md#acronyms // Encrypt json encoded tier configurations metadata := make(map[string]string) encBr, oek, err := newEncryptReader(context.Background(), hr, crypto.S3, "", nil, minioMetaBucket, tierConfigPath, metadata, kms.Context{}) if err != nil { return nil, nil, err } info := ObjectInfo{ Size: payloadSize, } encSize := info.EncryptedSize() encHr, err := hash.NewReader(ctx, encBr, encSize, "", "", encSize) if err != nil { return nil, nil, err } pReader, err := NewPutObjReader(hr).WithEncryption(encHr, &oek) if err != nil { return nil, nil, err } opts := &ObjectOptions{ UserDefined: metadata, MTime: UTCNow(), MaxParity: true, } return pReader, opts, nil } // Reload updates config by reloading remote tier config from config store. func (config *TierConfigMgr) Reload(ctx context.Context, objAPI ObjectLayer) error { newConfig, err := loadTierConfig(ctx, objAPI) switch err { case nil: break case errConfigNotFound: // nothing to reload // To maintain the invariance that lastRefreshedAt records the // timestamp of last successful refresh config.lastRefreshedAt = UTCNow() return nil default: return err } config.Lock() defer config.Unlock() // Reset drivercache built using current config for k := range config.drivercache { delete(config.drivercache, k) } // Remove existing tier configs for k := range config.Tiers { delete(config.Tiers, k) } // Copy over the new tier configs for tier, cfg := range newConfig.Tiers { config.Tiers[tier] = cfg } config.lastRefreshedAt = UTCNow() return nil } // Save saves tier configuration onto objAPI func (config *TierConfigMgr) Save(ctx context.Context, objAPI ObjectLayer) error { if objAPI == nil { return errServerNotInitialized } pr, opts, err := globalTierConfigMgr.configReader(ctx) if err != nil { return err } _, err = objAPI.PutObject(ctx, minioMetaBucket, tierConfigPath, pr, *opts) return err } // NewTierConfigMgr - creates new tier configuration manager, func NewTierConfigMgr() *TierConfigMgr { return &TierConfigMgr{ drivercache: make(map[string]WarmBackend), Tiers: make(map[string]madmin.TierConfig), } } func (config *TierConfigMgr) refreshTierConfig(ctx context.Context, objAPI ObjectLayer) { const tierCfgRefresh = 15 * time.Minute r := rand.New(rand.NewSource(time.Now().UnixNano())) randInterval := func() time.Duration { return time.Duration(r.Float64() * 5 * float64(time.Second)) } // To avoid all MinIO nodes reading the tier config object at the same // time. t := time.NewTimer(tierCfgRefresh + randInterval()) defer t.Stop() for { select { case <-ctx.Done(): return case <-t.C: err := config.Reload(ctx, objAPI) if err != nil { logger.LogIf(ctx, err) } } t.Reset(tierCfgRefresh + randInterval()) } } // loadTierConfig loads remote tier configuration from objAPI. func loadTierConfig(ctx context.Context, objAPI ObjectLayer) (*TierConfigMgr, error) { if objAPI == nil { return nil, errServerNotInitialized } data, err := readConfig(ctx, objAPI, tierConfigPath) if err != nil { return nil, err } if len(data) <= 4 { return nil, fmt.Errorf("tierConfigInit: no data") } // Read header switch format := binary.LittleEndian.Uint16(data[0:2]); format { case tierConfigFormat: default: return nil, fmt.Errorf("tierConfigInit: unknown format: %d", format) } cfg := NewTierConfigMgr() switch version := binary.LittleEndian.Uint16(data[2:4]); version { case tierConfigV1, tierConfigVersion: if _, decErr := cfg.UnmarshalMsg(data[4:]); decErr != nil { return nil, decErr } default: return nil, fmt.Errorf("tierConfigInit: unknown version: %d", version) } return cfg, nil } // Init initializes tier configuration reading from objAPI func (config *TierConfigMgr) Init(ctx context.Context, objAPI ObjectLayer) error { err := config.Reload(ctx, objAPI) if globalIsDistErasure { go config.refreshTierConfig(ctx, objAPI) } return err }