mirror of
https://github.com/minio/minio.git
synced 2025-01-24 05:03:16 -05:00
fix: sts accounts map refresh and fewer list calls (#19376)
This fixes a bug where STS Accounts map accumulates accounts in memory and never removes expired accounts and the STS Policy mappings were not being refreshed. The STS purge routine now runs with every IAM credentials load instead of every 4th time. The listing of IAM files is now cached on every IAM load operation to prevent re-listing for STS accounts purging/reload. Additionally this change makes each server pick a time for IAM loading that is randomly distributed from a 10 minute interval - this is to prevent server from thundering while performing the IAM load. On average, IAM loading will happen between every 5-15min after the previous IAM load operation completes.
This commit is contained in:
parent
2eee744e34
commit
48deccdc40
@ -25,12 +25,12 @@ import (
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/minio/madmin-go/v3"
|
||||
"github.com/minio/minio-go/v7/pkg/set"
|
||||
"github.com/minio/minio/internal/config"
|
||||
xioutil "github.com/minio/minio/internal/ioutil"
|
||||
"github.com/minio/minio/internal/kms"
|
||||
@ -45,6 +45,8 @@ type IAMObjectStore struct {
|
||||
|
||||
*iamCache
|
||||
|
||||
cachedIAMListing atomic.Value
|
||||
|
||||
usersSysType UsersSysType
|
||||
|
||||
objAPI ObjectLayer
|
||||
@ -393,53 +395,45 @@ func (iamOS *IAMObjectStore) loadMappedPolicies(ctx context.Context, userType IA
|
||||
}
|
||||
|
||||
var (
|
||||
usersListKey = "users/"
|
||||
svcAccListKey = "service-accounts/"
|
||||
groupsListKey = "groups/"
|
||||
policiesListKey = "policies/"
|
||||
stsListKey = "sts/"
|
||||
policyDBUsersListKey = "policydb/users/"
|
||||
policyDBSTSUsersListKey = "policydb/sts-users/"
|
||||
policyDBServiceAccountsListKey = "policydb/service-accounts/"
|
||||
policyDBGroupsListKey = "policydb/groups/"
|
||||
|
||||
// List of directories from which to read iam data into memory.
|
||||
allListKeys = []string{
|
||||
usersListKey,
|
||||
svcAccListKey,
|
||||
groupsListKey,
|
||||
policiesListKey,
|
||||
stsListKey,
|
||||
policyDBUsersListKey,
|
||||
policyDBSTSUsersListKey,
|
||||
policyDBServiceAccountsListKey,
|
||||
policyDBGroupsListKey,
|
||||
}
|
||||
|
||||
// List of directories to skip: we do not read STS directories for better
|
||||
// performance. STS credentials would be stored in memory when they are
|
||||
// first used.
|
||||
iamLoadSkipListKeySet = set.CreateStringSet(
|
||||
stsListKey,
|
||||
policyDBSTSUsersListKey,
|
||||
)
|
||||
usersListKey = "users/"
|
||||
svcAccListKey = "service-accounts/"
|
||||
groupsListKey = "groups/"
|
||||
policiesListKey = "policies/"
|
||||
stsListKey = "sts/"
|
||||
policyDBUsersListKey = "policydb/users/"
|
||||
policyDBSTSUsersListKey = "policydb/sts-users/"
|
||||
policyDBGroupsListKey = "policydb/groups/"
|
||||
)
|
||||
|
||||
// splitPath splits a path into a top-level directory and a child item. The
|
||||
// parent directory retains the trailing slash.
|
||||
func splitPath(s string) (string, string) {
|
||||
i := strings.Index(s, "/")
|
||||
if i == -1 {
|
||||
return s, ""
|
||||
}
|
||||
// Include the trailing slash in the parent directory.
|
||||
return s[:i+1], s[i+1:]
|
||||
}
|
||||
|
||||
func (iamOS *IAMObjectStore) listAllIAMConfigItems(ctx context.Context) (map[string][]string, error) {
|
||||
res := make(map[string][]string)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
for _, listKey := range allListKeys {
|
||||
if iamLoadSkipListKeySet.Contains(listKey) {
|
||||
for item := range listIAMConfigItems(ctx, iamOS.objAPI, iamConfigPrefix+SlashSeparator) {
|
||||
if item.Err != nil {
|
||||
return nil, item.Err
|
||||
}
|
||||
|
||||
listKey, trimmedItem := splitPath(item.Item)
|
||||
if listKey == iamFormatFile {
|
||||
continue
|
||||
}
|
||||
for item := range listIAMConfigItems(ctx, iamOS.objAPI, iamConfigPrefix+SlashSeparator+listKey) {
|
||||
if item.Err != nil {
|
||||
return nil, item.Err
|
||||
}
|
||||
res[listKey] = append(res[listKey], item.Item)
|
||||
}
|
||||
|
||||
res[listKey] = append(res[listKey], trimmedItem)
|
||||
}
|
||||
// Store the listing for later re-use.
|
||||
iamOS.cachedIAMListing.Store(res)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
@ -450,22 +444,47 @@ func (iamOS *IAMObjectStore) PurgeExpiredSTS(ctx context.Context) error {
|
||||
}
|
||||
|
||||
bootstrapTraceMsg("purging expired STS credentials")
|
||||
|
||||
iamListing, ok := iamOS.cachedIAMListing.Load().(map[string][]string)
|
||||
if !ok {
|
||||
// There has been no store yet. This should never happen!
|
||||
logger.LogIf(GlobalContext, errors.New("WARNING: no cached IAM listing found"))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Scan STS users on disk and purge expired ones. We do not need to hold a
|
||||
// lock with store.lock() here.
|
||||
for item := range listIAMConfigItems(ctx, iamOS.objAPI, iamConfigPrefix+SlashSeparator+stsListKey) {
|
||||
if item.Err != nil {
|
||||
return item.Err
|
||||
}
|
||||
userName := path.Dir(item.Item)
|
||||
// loadUser() will delete expired user during the load - we do not need
|
||||
// to keep the loaded user around in memory, so we reinitialize the map
|
||||
// each time.
|
||||
m := map[string]UserIdentity{}
|
||||
if err := iamOS.loadUser(ctx, userName, stsUser, m); err != nil && err != errNoSuchUser {
|
||||
logger.LogIf(GlobalContext, fmt.Errorf("unable to load user during STS purge: %w (%s)", err, item.Item))
|
||||
stsAccountsFromStore := map[string]UserIdentity{}
|
||||
stsAccPoliciesFromStore := xsync.NewMapOf[string, MappedPolicy]()
|
||||
for _, item := range iamListing[stsListKey] {
|
||||
userName := path.Dir(item)
|
||||
// loadUser() will delete expired user during the load.
|
||||
err := iamOS.loadUser(ctx, userName, stsUser, stsAccountsFromStore)
|
||||
if err != nil && !errors.Is(err, errNoSuchUser) {
|
||||
logger.LogIf(GlobalContext,
|
||||
fmt.Errorf("unable to load user during STS purge: %w (%s)", err, item))
|
||||
}
|
||||
|
||||
}
|
||||
// Loading the STS policy mappings from disk ensures that stale entries
|
||||
// (removed during loadUser() in the loop above) are removed from memory.
|
||||
for _, item := range iamListing[policyDBSTSUsersListKey] {
|
||||
stsName := strings.TrimSuffix(item, ".json")
|
||||
err := iamOS.loadMappedPolicy(ctx, stsName, stsUser, false, stsAccPoliciesFromStore)
|
||||
if err != nil && !errors.Is(err, errNoSuchPolicy) {
|
||||
logger.LogIf(GlobalContext,
|
||||
fmt.Errorf("unable to load policies during STS purge: %w (%s)", err, item))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Store the newly populated map in the iam cache. This takes care of
|
||||
// removing stale entries from the existing map.
|
||||
iamOS.Lock()
|
||||
defer iamOS.Unlock()
|
||||
iamOS.iamCache.iamSTSAccountsMap = stsAccountsFromStore
|
||||
iamOS.iamCache.iamSTSPolicyMap = stsAccPoliciesFromStore
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -626,10 +645,8 @@ type itemOrErr struct {
|
||||
Err error
|
||||
}
|
||||
|
||||
// Lists files or dirs in the minioMetaBucket at the given path
|
||||
// prefix. If dirs is true, only directories are listed, otherwise
|
||||
// only objects are listed. All returned items have the pathPrefix
|
||||
// removed from their names.
|
||||
// Lists objects in the minioMetaBucket at the given path prefix. All returned
|
||||
// items have the pathPrefix removed from their names.
|
||||
func listIAMConfigItems(ctx context.Context, objAPI ObjectLayer, pathPrefix string) <-chan itemOrErr {
|
||||
ch := make(chan itemOrErr)
|
||||
|
||||
|
45
cmd/iam.go
45
cmd/iam.go
@ -362,13 +362,24 @@ func (sys *IAMSys) periodicRoutines(ctx context.Context, baseInterval time.Durat
|
||||
}
|
||||
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
// Add a random interval of up to 20% of the base interval.
|
||||
randInterval := func() time.Duration {
|
||||
return time.Duration(r.Float64() * float64(baseInterval) * 0.2)
|
||||
|
||||
// Calculate the waitInterval between periodic refreshes so that each server
|
||||
// independently picks a (uniformly distributed) random time in an interval
|
||||
// of size = baseInterval.
|
||||
//
|
||||
// For example:
|
||||
//
|
||||
// - if baseInterval=10s, then 5s <= waitInterval() < 15s
|
||||
//
|
||||
// - if baseInterval=10m, then 5m <= waitInterval() < 15m
|
||||
waitInterval := func() time.Duration {
|
||||
// Calculate a random value such that 0 <= value < baseInterval
|
||||
randAmt := time.Duration(r.Float64() * float64(baseInterval))
|
||||
return baseInterval/2 + randAmt
|
||||
}
|
||||
|
||||
var maxDurationSecondsForLog float64 = 5
|
||||
timer := time.NewTimer(baseInterval + randInterval())
|
||||
timer := time.NewTimer(waitInterval())
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
@ -386,21 +397,21 @@ func (sys *IAMSys) periodicRoutines(ctx context.Context, baseInterval time.Durat
|
||||
}
|
||||
}
|
||||
|
||||
// Purge expired STS credentials.
|
||||
purgeStart := time.Now()
|
||||
if err := sys.store.PurgeExpiredSTS(ctx); err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Failure in periodic STS purge for IAM (took %.2fs): %v", time.Since(purgeStart).Seconds(), err))
|
||||
} else {
|
||||
took := time.Since(purgeStart).Seconds()
|
||||
if took > maxDurationSecondsForLog {
|
||||
// Log if we took a lot of time to load.
|
||||
logger.Info("IAM expired STS purge took %.2fs", took)
|
||||
}
|
||||
}
|
||||
|
||||
// The following actions are performed about once in 4 times that
|
||||
// IAM is refreshed:
|
||||
if r.Intn(4) == 0 {
|
||||
// Purge expired STS credentials.
|
||||
purgeStart := time.Now()
|
||||
if err := sys.store.PurgeExpiredSTS(ctx); err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Failure in periodic STS purge for IAM (took %.2fs): %v", time.Since(purgeStart).Seconds(), err))
|
||||
} else {
|
||||
took := time.Since(purgeStart).Seconds()
|
||||
if took > maxDurationSecondsForLog {
|
||||
// Log if we took a lot of time to load.
|
||||
logger.Info("IAM expired STS purge took %.2fs", took)
|
||||
}
|
||||
}
|
||||
|
||||
// Poll and remove accounts for those users who were removed
|
||||
// from LDAP/OpenID.
|
||||
if sys.LDAPConfig.Enabled() {
|
||||
@ -412,7 +423,7 @@ func (sys *IAMSys) periodicRoutines(ctx context.Context, baseInterval time.Durat
|
||||
}
|
||||
}
|
||||
|
||||
timer.Reset(baseInterval + randInterval())
|
||||
timer.Reset(waitInterval())
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user