minio/cmd/tier.go

587 lines
16 KiB
Go

// Copyright (c) 2015-2024 MinIO, Inc
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"bytes"
"context"
"encoding/base64"
"encoding/binary"
"fmt"
"math/rand"
"net/http"
"path"
"strings"
"sync"
"time"
"github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/crypto"
"github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/kms"
"github.com/minio/minio/internal/logger"
"github.com/prometheus/client_golang/prometheus"
)
//go:generate msgp -file $GOFILE
var (
errTierMissingCredentials = AdminError{
Code: "XMinioAdminTierMissingCredentials",
Message: "Specified remote credentials are empty",
StatusCode: http.StatusForbidden,
}
errTierBackendInUse = AdminError{
Code: "XMinioAdminTierBackendInUse",
Message: "Specified remote tier is already in use",
StatusCode: http.StatusConflict,
}
errTierTypeUnsupported = AdminError{
Code: "XMinioAdminTierTypeUnsupported",
Message: "Specified tier type is unsupported",
StatusCode: http.StatusBadRequest,
}
errTierBackendNotEmpty = AdminError{
Code: "XMinioAdminTierBackendNotEmpty",
Message: "Specified remote backend is not empty",
StatusCode: http.StatusBadRequest,
}
)
const (
tierConfigFile = "tier-config.bin"
tierConfigFormat = 1
tierConfigV1 = 1
tierConfigVersion = 2
)
// tierConfigPath refers to remote tier config object name
var tierConfigPath = path.Join(minioConfigPrefix, tierConfigFile)
const tierCfgRefreshAtHdr = "X-MinIO-TierCfg-RefreshedAt"
// TierConfigMgr holds the collection of remote tiers configured in this deployment.
type TierConfigMgr struct {
sync.RWMutex `msg:"-"`
drivercache map[string]WarmBackend `msg:"-"`
Tiers map[string]madmin.TierConfig `json:"tiers"`
lastRefreshedAt time.Time `msg:"-"`
}
type tierMetrics struct {
sync.RWMutex // protects requestsCount only
requestsCount map[string]struct {
success int64
failure int64
}
histogram *prometheus.HistogramVec
}
var globalTierMetrics = tierMetrics{
requestsCount: make(map[string]struct {
success int64
failure int64
}),
histogram: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "tier_ttlb_seconds",
Help: "Time taken by requests served by warm tier",
Buckets: []float64{0.01, 0.1, 1, 2, 5, 10, 60, 5 * 60, 15 * 60, 30 * 60},
}, []string{"tier"}),
}
func (t *tierMetrics) Observe(tier string, dur time.Duration) {
t.histogram.With(prometheus.Labels{"tier": tier}).Observe(dur.Seconds())
}
func (t *tierMetrics) logSuccess(tier string) {
t.Lock()
defer t.Unlock()
stat := t.requestsCount[tier]
stat.success++
t.requestsCount[tier] = stat
}
func (t *tierMetrics) logFailure(tier string) {
t.Lock()
defer t.Unlock()
stat := t.requestsCount[tier]
stat.failure++
t.requestsCount[tier] = stat
}
var (
// {minio_node}_{tier}_{ttlb_seconds_distribution}
tierTTLBMD = MetricDescription{
Namespace: nodeMetricNamespace,
Subsystem: tierSubsystem,
Name: ttlbDistribution,
Help: "Distribution of time to last byte for objects downloaded from warm tier",
Type: gaugeMetric,
}
// {minio_node}_{tier}_{requests_success}
tierRequestsSuccessMD = MetricDescription{
Namespace: nodeMetricNamespace,
Subsystem: tierSubsystem,
Name: tierRequestsSuccess,
Help: "Number of requests to download object from warm tier that were successful",
Type: counterMetric,
}
// {minio_node}_{tier}_{requests_failure}
tierRequestsFailureMD = MetricDescription{
Namespace: nodeMetricNamespace,
Subsystem: tierSubsystem,
Name: tierRequestsFailure,
Help: "Number of requests to download object from warm tier that failed",
Type: counterMetric,
}
)
func (t *tierMetrics) Report() []MetricV2 {
metrics := getHistogramMetrics(t.histogram, tierTTLBMD, true)
t.RLock()
defer t.RUnlock()
for tier, stat := range t.requestsCount {
metrics = append(metrics, MetricV2{
Description: tierRequestsSuccessMD,
Value: float64(stat.success),
VariableLabels: map[string]string{"tier": tier},
})
metrics = append(metrics, MetricV2{
Description: tierRequestsFailureMD,
Value: float64(stat.failure),
VariableLabels: map[string]string{"tier": tier},
})
}
return metrics
}
func (config *TierConfigMgr) refreshedAt() time.Time {
config.RLock()
defer config.RUnlock()
return config.lastRefreshedAt
}
// IsTierValid returns true if there exists a remote tier by name tierName,
// otherwise returns false.
func (config *TierConfigMgr) IsTierValid(tierName string) bool {
config.RLock()
defer config.RUnlock()
_, valid := config.isTierNameInUse(tierName)
return valid
}
// isTierNameInUse returns tier type and true if there exists a remote tier by
// name tierName, otherwise returns madmin.Unsupported and false. N B this
// function is meant for internal use, where the caller is expected to take
// appropriate locks.
func (config *TierConfigMgr) isTierNameInUse(tierName string) (madmin.TierType, bool) {
if t, ok := config.Tiers[tierName]; ok {
return t.Type, true
}
return madmin.Unsupported, false
}
// Add adds tier to config if it passes all validations.
func (config *TierConfigMgr) Add(ctx context.Context, tier madmin.TierConfig, ignoreInUse bool) error {
config.Lock()
defer config.Unlock()
// check if tier name is in all caps
tierName := tier.Name
if tierName != strings.ToUpper(tierName) {
return errTierNameNotUppercase
}
// check if tier name already in use
if _, exists := config.isTierNameInUse(tierName); exists {
return errTierAlreadyExists
}
d, err := newWarmBackend(ctx, tier)
if err != nil {
return err
}
if !ignoreInUse {
// Check if warmbackend is in use by other MinIO tenants
inUse, err := d.InUse(ctx)
if err != nil {
return err
}
if inUse {
return errTierBackendInUse
}
}
config.Tiers[tierName] = tier
config.drivercache[tierName] = d
return nil
}
// Remove removes tier if it is empty.
func (config *TierConfigMgr) Remove(ctx context.Context, tier string) error {
d, err := config.getDriver(tier)
if err != nil {
return err
}
if inuse, err := d.InUse(ctx); err != nil {
return err
} else if inuse {
return errTierBackendNotEmpty
}
config.Lock()
delete(config.Tiers, tier)
delete(config.drivercache, tier)
config.Unlock()
return nil
}
// Verify verifies if tier's config is valid by performing all supported
// operations on the corresponding warmbackend.
func (config *TierConfigMgr) Verify(ctx context.Context, tier string) error {
d, err := config.getDriver(tier)
if err != nil {
return err
}
return checkWarmBackend(ctx, d)
}
// Empty returns if tier targets are empty
func (config *TierConfigMgr) Empty() bool {
if config == nil {
return true
}
return len(config.ListTiers()) == 0
}
// TierType returns the type of tier
func (config *TierConfigMgr) TierType(name string) string {
config.RLock()
defer config.RUnlock()
cfg, ok := config.Tiers[name]
if !ok {
return "internal"
}
return cfg.Type.String()
}
// ListTiers lists remote tiers configured in this deployment.
func (config *TierConfigMgr) ListTiers() []madmin.TierConfig {
if config == nil {
return nil
}
config.RLock()
defer config.RUnlock()
var tierCfgs []madmin.TierConfig
for _, tier := range config.Tiers {
// This makes a local copy of tier config before
// passing a reference to it.
tier := tier.Clone()
tierCfgs = append(tierCfgs, tier)
}
return tierCfgs
}
// Edit replaces the credentials of the remote tier specified by tierName with creds.
func (config *TierConfigMgr) Edit(ctx context.Context, tierName string, creds madmin.TierCreds) error {
config.Lock()
defer config.Unlock()
// check if tier by this name exists
tierType, exists := config.isTierNameInUse(tierName)
if !exists {
return errTierNotFound
}
cfg := config.Tiers[tierName]
switch tierType {
case madmin.S3:
if creds.AWSRole {
cfg.S3.AWSRole = true
}
if creds.AWSRoleWebIdentityTokenFile != "" && creds.AWSRoleARN != "" {
cfg.S3.AWSRoleARN = creds.AWSRoleARN
cfg.S3.AWSRoleWebIdentityTokenFile = creds.AWSRoleWebIdentityTokenFile
}
if creds.AccessKey != "" && creds.SecretKey != "" {
cfg.S3.AccessKey = creds.AccessKey
cfg.S3.SecretKey = creds.SecretKey
}
case madmin.Azure:
if creds.SecretKey != "" {
cfg.Azure.AccountKey = creds.SecretKey
}
if creds.AzSP.TenantID != "" {
cfg.Azure.SPAuth.TenantID = creds.AzSP.TenantID
}
if creds.AzSP.ClientID != "" {
cfg.Azure.SPAuth.ClientID = creds.AzSP.ClientID
}
if creds.AzSP.ClientSecret != "" {
cfg.Azure.SPAuth.ClientSecret = creds.AzSP.ClientSecret
}
case madmin.GCS:
if creds.CredsJSON == nil {
return errTierMissingCredentials
}
cfg.GCS.Creds = base64.URLEncoding.EncodeToString(creds.CredsJSON)
case madmin.MinIO:
if creds.AccessKey == "" || creds.SecretKey == "" {
return errTierMissingCredentials
}
cfg.MinIO.AccessKey = creds.AccessKey
cfg.MinIO.SecretKey = creds.SecretKey
}
d, err := newWarmBackend(ctx, cfg)
if err != nil {
return err
}
config.Tiers[tierName] = cfg
config.drivercache[tierName] = d
return nil
}
// Bytes returns msgpack encoded config with format and version headers.
func (config *TierConfigMgr) Bytes() ([]byte, error) {
config.RLock()
defer config.RUnlock()
data := make([]byte, 4, config.Msgsize()+4)
// Initialize the header.
binary.LittleEndian.PutUint16(data[0:2], tierConfigFormat)
binary.LittleEndian.PutUint16(data[2:4], tierConfigVersion)
// Marshal the tier config
return config.MarshalMsg(data)
}
// getDriver returns a warmBackend interface object initialized with remote tier config matching tierName
func (config *TierConfigMgr) getDriver(tierName string) (d WarmBackend, err error) {
config.Lock()
defer config.Unlock()
var ok bool
// Lookup in-memory drivercache
d, ok = config.drivercache[tierName]
if ok {
return d, nil
}
// Initialize driver from tier config matching tierName
t, ok := config.Tiers[tierName]
if !ok {
return nil, errTierNotFound
}
d, err = newWarmBackend(context.TODO(), t)
if err != nil {
return nil, err
}
config.drivercache[tierName] = d
return d, nil
}
// configReader returns a PutObjReader and ObjectOptions needed to save config
// using a PutObject API. PutObjReader encrypts json encoded tier configurations
// if KMS is enabled, otherwise simply yields the json encoded bytes as is.
// Similarly, ObjectOptions value depends on KMS' status.
func (config *TierConfigMgr) configReader(ctx context.Context) (*PutObjReader, *ObjectOptions, error) {
b, err := config.Bytes()
if err != nil {
return nil, nil, err
}
payloadSize := int64(len(b))
br := bytes.NewReader(b)
hr, err := hash.NewReader(ctx, br, payloadSize, "", "", payloadSize)
if err != nil {
return nil, nil, err
}
if GlobalKMS == nil {
return NewPutObjReader(hr), &ObjectOptions{MaxParity: true}, nil
}
// Note: Local variables with names ek, oek, etc are named inline with
// acronyms defined here -
// https://github.com/minio/minio/blob/master/docs/security/README.md#acronyms
// Encrypt json encoded tier configurations
metadata := make(map[string]string)
encBr, oek, err := newEncryptReader(context.Background(), hr, crypto.S3, "", nil, minioMetaBucket, tierConfigPath, metadata, kms.Context{})
if err != nil {
return nil, nil, err
}
info := ObjectInfo{
Size: payloadSize,
}
encSize := info.EncryptedSize()
encHr, err := hash.NewReader(ctx, encBr, encSize, "", "", encSize)
if err != nil {
return nil, nil, err
}
pReader, err := NewPutObjReader(hr).WithEncryption(encHr, &oek)
if err != nil {
return nil, nil, err
}
opts := &ObjectOptions{
UserDefined: metadata,
MTime: UTCNow(),
MaxParity: true,
}
return pReader, opts, nil
}
// Reload updates config by reloading remote tier config from config store.
func (config *TierConfigMgr) Reload(ctx context.Context, objAPI ObjectLayer) error {
newConfig, err := loadTierConfig(ctx, objAPI)
switch err {
case nil:
break
case errConfigNotFound: // nothing to reload
// To maintain the invariance that lastRefreshedAt records the
// timestamp of last successful refresh
config.lastRefreshedAt = UTCNow()
return nil
default:
return err
}
config.Lock()
defer config.Unlock()
// Reset drivercache built using current config
for k := range config.drivercache {
delete(config.drivercache, k)
}
// Remove existing tier configs
for k := range config.Tiers {
delete(config.Tiers, k)
}
// Copy over the new tier configs
for tier, cfg := range newConfig.Tiers {
config.Tiers[tier] = cfg
}
config.lastRefreshedAt = UTCNow()
return nil
}
// Save saves tier configuration onto objAPI
func (config *TierConfigMgr) Save(ctx context.Context, objAPI ObjectLayer) error {
if objAPI == nil {
return errServerNotInitialized
}
pr, opts, err := globalTierConfigMgr.configReader(ctx)
if err != nil {
return err
}
_, err = objAPI.PutObject(ctx, minioMetaBucket, tierConfigPath, pr, *opts)
return err
}
// NewTierConfigMgr - creates new tier configuration manager,
func NewTierConfigMgr() *TierConfigMgr {
return &TierConfigMgr{
drivercache: make(map[string]WarmBackend),
Tiers: make(map[string]madmin.TierConfig),
}
}
func (config *TierConfigMgr) refreshTierConfig(ctx context.Context, objAPI ObjectLayer) {
const tierCfgRefresh = 15 * time.Minute
r := rand.New(rand.NewSource(time.Now().UnixNano()))
randInterval := func() time.Duration {
return time.Duration(r.Float64() * 5 * float64(time.Second))
}
// To avoid all MinIO nodes reading the tier config object at the same
// time.
t := time.NewTimer(tierCfgRefresh + randInterval())
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
err := config.Reload(ctx, objAPI)
if err != nil {
logger.LogIf(ctx, err)
}
}
t.Reset(tierCfgRefresh + randInterval())
}
}
// loadTierConfig loads remote tier configuration from objAPI.
func loadTierConfig(ctx context.Context, objAPI ObjectLayer) (*TierConfigMgr, error) {
if objAPI == nil {
return nil, errServerNotInitialized
}
data, err := readConfig(ctx, objAPI, tierConfigPath)
if err != nil {
return nil, err
}
if len(data) <= 4 {
return nil, fmt.Errorf("tierConfigInit: no data")
}
// Read header
switch format := binary.LittleEndian.Uint16(data[0:2]); format {
case tierConfigFormat:
default:
return nil, fmt.Errorf("tierConfigInit: unknown format: %d", format)
}
cfg := NewTierConfigMgr()
switch version := binary.LittleEndian.Uint16(data[2:4]); version {
case tierConfigV1, tierConfigVersion:
if _, decErr := cfg.UnmarshalMsg(data[4:]); decErr != nil {
return nil, decErr
}
default:
return nil, fmt.Errorf("tierConfigInit: unknown version: %d", version)
}
return cfg, nil
}
// Init initializes tier configuration reading from objAPI
func (config *TierConfigMgr) Init(ctx context.Context, objAPI ObjectLayer) error {
err := config.Reload(ctx, objAPI)
if globalIsDistErasure {
go config.refreshTierConfig(ctx, objAPI)
}
return err
}