diff --git a/cmd/admin-handlers-users.go b/cmd/admin-handlers-users.go index 7b8047eb7..b0b85ac3f 100644 --- a/cmd/admin-handlers-users.go +++ b/cmd/admin-handlers-users.go @@ -38,6 +38,7 @@ import ( "github.com/minio/minio/internal/logger" "github.com/minio/mux" "github.com/minio/pkg/v2/policy" + "github.com/puzpuzpuz/xsync/v3" ) // RemoveUser - DELETE /minio/admin/v3/remove-user?accessKey= @@ -1936,13 +1937,13 @@ func (a adminAPIHandlers) ExportIAM(w http.ResponseWriter, r *http.Request) { return } case userPolicyMappingsFile: - userPolicyMap := make(map[string]MappedPolicy) + userPolicyMap := xsync.NewMapOf[string, MappedPolicy]() err := globalIAMSys.store.loadMappedPolicies(ctx, regUser, false, userPolicyMap) if err != nil { writeErrorResponse(ctx, w, exportError(ctx, err, iamFile, ""), r.URL) return } - userPolData, err := json.Marshal(userPolicyMap) + userPolData, err := json.Marshal(mappedPoliciesToMap(userPolicyMap)) if err != nil { writeErrorResponse(ctx, w, exportError(ctx, err, iamFile, ""), r.URL) return @@ -1953,13 +1954,13 @@ func (a adminAPIHandlers) ExportIAM(w http.ResponseWriter, r *http.Request) { return } case groupPolicyMappingsFile: - groupPolicyMap := make(map[string]MappedPolicy) + groupPolicyMap := xsync.NewMapOf[string, MappedPolicy]() err := globalIAMSys.store.loadMappedPolicies(ctx, regUser, true, groupPolicyMap) if err != nil { writeErrorResponse(ctx, w, exportError(ctx, err, iamFile, ""), r.URL) return } - grpPolData, err := json.Marshal(groupPolicyMap) + grpPolData, err := json.Marshal(mappedPoliciesToMap(groupPolicyMap)) if err != nil { writeErrorResponse(ctx, w, exportError(ctx, err, iamFile, ""), r.URL) return @@ -1970,13 +1971,13 @@ func (a adminAPIHandlers) ExportIAM(w http.ResponseWriter, r *http.Request) { return } case stsUserPolicyMappingsFile: - userPolicyMap := make(map[string]MappedPolicy) + userPolicyMap := xsync.NewMapOf[string, MappedPolicy]() err := globalIAMSys.store.loadMappedPolicies(ctx, stsUser, false, userPolicyMap) if err != nil { writeErrorResponse(ctx, w, exportError(ctx, err, iamFile, ""), r.URL) return } - userPolData, err := json.Marshal(userPolicyMap) + userPolData, err := json.Marshal(mappedPoliciesToMap(userPolicyMap)) if err != nil { writeErrorResponse(ctx, w, exportError(ctx, err, iamFile, ""), r.URL) return @@ -1986,13 +1987,13 @@ func (a adminAPIHandlers) ExportIAM(w http.ResponseWriter, r *http.Request) { return } case stsGroupPolicyMappingsFile: - groupPolicyMap := make(map[string]MappedPolicy) + groupPolicyMap := xsync.NewMapOf[string, MappedPolicy]() err := globalIAMSys.store.loadMappedPolicies(ctx, stsUser, true, groupPolicyMap) if err != nil { writeErrorResponse(ctx, w, exportError(ctx, err, iamFile, ""), r.URL) return } - grpPolData, err := json.Marshal(groupPolicyMap) + grpPolData, err := json.Marshal(mappedPoliciesToMap(groupPolicyMap)) if err != nil { writeErrorResponse(ctx, w, exportError(ctx, err, iamFile, ""), r.URL) return diff --git a/cmd/iam-etcd-store.go b/cmd/iam-etcd-store.go index 22e7f6ca6..52dff016c 100644 --- a/cmd/iam-etcd-store.go +++ b/cmd/iam-etcd-store.go @@ -31,6 +31,7 @@ import ( "github.com/minio/minio/internal/config" "github.com/minio/minio/internal/kms" "github.com/minio/minio/internal/logger" + "github.com/puzpuzpuz/xsync/v3" "go.etcd.io/etcd/api/v3/mvccpb" etcd "go.etcd.io/etcd/client/v3" ) @@ -325,11 +326,11 @@ func (ies *IAMEtcdStore) loadGroups(ctx context.Context, m map[string]GroupInfo) return nil } -func (ies *IAMEtcdStore) loadMappedPolicyWithRetry(ctx context.Context, name string, userType IAMUserType, isGroup bool, m map[string]MappedPolicy, _ int) error { +func (ies *IAMEtcdStore) loadMappedPolicyWithRetry(ctx context.Context, name string, userType IAMUserType, isGroup bool, m *xsync.MapOf[string, MappedPolicy], retries int) error { return ies.loadMappedPolicy(ctx, name, userType, isGroup, m) } -func (ies *IAMEtcdStore) loadMappedPolicy(ctx context.Context, name string, userType IAMUserType, isGroup bool, m map[string]MappedPolicy) error { +func (ies *IAMEtcdStore) loadMappedPolicy(ctx context.Context, name string, userType IAMUserType, isGroup bool, m *xsync.MapOf[string, MappedPolicy]) error { var p MappedPolicy err := ies.loadIAMConfig(ctx, &p, getMappedPolicyPath(name, userType, isGroup)) if err != nil { @@ -338,11 +339,11 @@ func (ies *IAMEtcdStore) loadMappedPolicy(ctx context.Context, name string, user } return err } - m[name] = p + m.Store(name, p) return nil } -func getMappedPolicy(ctx context.Context, kv *mvccpb.KeyValue, userType IAMUserType, isGroup bool, m map[string]MappedPolicy, basePrefix string) error { +func getMappedPolicy(kv *mvccpb.KeyValue, m *xsync.MapOf[string, MappedPolicy], basePrefix string) error { var p MappedPolicy err := getIAMConfig(&p, kv.Value, string(kv.Key)) if err != nil { @@ -352,11 +353,11 @@ func getMappedPolicy(ctx context.Context, kv *mvccpb.KeyValue, userType IAMUserT return err } name := extractPathPrefixAndSuffix(string(kv.Key), basePrefix, ".json") - m[name] = p + m.Store(name, p) return nil } -func (ies *IAMEtcdStore) loadMappedPolicies(ctx context.Context, userType IAMUserType, isGroup bool, m map[string]MappedPolicy) error { +func (ies *IAMEtcdStore) loadMappedPolicies(ctx context.Context, userType IAMUserType, isGroup bool, m *xsync.MapOf[string, MappedPolicy]) error { cctx, cancel := context.WithTimeout(ctx, defaultContextTimeout) defer cancel() var basePrefix string @@ -381,7 +382,7 @@ func (ies *IAMEtcdStore) loadMappedPolicies(ctx context.Context, userType IAMUse // Parse all policies mapping to create the proper data model for _, kv := range r.Kvs { - if err = getMappedPolicy(ctx, kv, userType, isGroup, m, basePrefix); err != nil && !errors.Is(err, errNoSuchPolicy) { + if err = getMappedPolicy(kv, m, basePrefix); err != nil && !errors.Is(err, errNoSuchPolicy) { return err } } diff --git a/cmd/iam-object-store.go b/cmd/iam-object-store.go index 4c44a0115..b4e354606 100644 --- a/cmd/iam-object-store.go +++ b/cmd/iam-object-store.go @@ -35,6 +35,7 @@ import ( xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/kms" "github.com/minio/minio/internal/logger" + "github.com/puzpuzpuz/xsync/v3" ) // IAMObjectStore implements IAMStorageAPI @@ -325,9 +326,7 @@ func (iamOS *IAMObjectStore) loadGroups(ctx context.Context, m map[string]GroupI return nil } -func (iamOS *IAMObjectStore) loadMappedPolicyWithRetry(ctx context.Context, name string, userType IAMUserType, isGroup bool, - m map[string]MappedPolicy, retries int, -) error { +func (iamOS *IAMObjectStore) loadMappedPolicyWithRetry(ctx context.Context, name string, userType IAMUserType, isGroup bool, m *xsync.MapOf[string, MappedPolicy], retries int) error { for { retry: var p MappedPolicy @@ -344,14 +343,12 @@ func (iamOS *IAMObjectStore) loadMappedPolicyWithRetry(ctx context.Context, name goto retry } - m[name] = p + m.Store(name, p) return nil } } -func (iamOS *IAMObjectStore) loadMappedPolicy(ctx context.Context, name string, userType IAMUserType, isGroup bool, - m map[string]MappedPolicy, -) error { +func (iamOS *IAMObjectStore) loadMappedPolicy(ctx context.Context, name string, userType IAMUserType, isGroup bool, m *xsync.MapOf[string, MappedPolicy]) error { var p MappedPolicy err := iamOS.loadIAMConfig(ctx, &p, getMappedPolicyPath(name, userType, isGroup)) if err != nil { @@ -361,11 +358,11 @@ func (iamOS *IAMObjectStore) loadMappedPolicy(ctx context.Context, name string, return err } - m[name] = p + m.Store(name, p) return nil } -func (iamOS *IAMObjectStore) loadMappedPolicies(ctx context.Context, userType IAMUserType, isGroup bool, m map[string]MappedPolicy) error { +func (iamOS *IAMObjectStore) loadMappedPolicies(ctx context.Context, userType IAMUserType, isGroup bool, m *xsync.MapOf[string, MappedPolicy]) error { var basePath string if isGroup { basePath = iamConfigPolicyDBGroupsPrefix diff --git a/cmd/iam-store.go b/cmd/iam-store.go index ec43ca8f9..2e4e100d5 100644 --- a/cmd/iam-store.go +++ b/cmd/iam-store.go @@ -35,6 +35,7 @@ import ( "github.com/minio/minio/internal/jwt" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/policy" + "github.com/puzpuzpuz/xsync/v3" ) const ( @@ -179,6 +180,16 @@ type MappedPolicy struct { UpdatedAt time.Time `json:"updatedAt,omitempty"` } +// mappedPoliciesToMap copies the map of mapped policies to a regular map. +func mappedPoliciesToMap(m *xsync.MapOf[string, MappedPolicy]) map[string]MappedPolicy { + policies := make(map[string]MappedPolicy, m.Size()) + m.Range(func(k string, v MappedPolicy) bool { + policies[k] = v + return true + }) + return policies +} + // converts a mapped policy into a slice of distinct policies func (mp MappedPolicy) toSlice() []string { var policies []string @@ -277,32 +288,32 @@ type iamCache struct { // map of regular username to credentials iamUsersMap map[string]UserIdentity // map of regular username to policy names - iamUserPolicyMap map[string]MappedPolicy + iamUserPolicyMap *xsync.MapOf[string, MappedPolicy] // STS accounts are loaded on demand and not via the periodic IAM reload. // map of STS access key to credentials iamSTSAccountsMap map[string]UserIdentity // map of STS access key to policy names - iamSTSPolicyMap map[string]MappedPolicy + iamSTSPolicyMap *xsync.MapOf[string, MappedPolicy] // map of group names to group info iamGroupsMap map[string]GroupInfo // map of user names to groups they are a member of iamUserGroupMemberships map[string]set.StringSet // map of group names to policy names - iamGroupPolicyMap map[string]MappedPolicy + iamGroupPolicyMap *xsync.MapOf[string, MappedPolicy] } func newIamCache() *iamCache { return &iamCache{ iamPolicyDocsMap: map[string]PolicyDoc{}, iamUsersMap: map[string]UserIdentity{}, - iamUserPolicyMap: map[string]MappedPolicy{}, + iamUserPolicyMap: xsync.NewMapOf[string, MappedPolicy](), iamSTSAccountsMap: map[string]UserIdentity{}, - iamSTSPolicyMap: map[string]MappedPolicy{}, + iamSTSPolicyMap: xsync.NewMapOf[string, MappedPolicy](), iamGroupsMap: map[string]GroupInfo{}, iamUserGroupMemberships: map[string]set.StringSet{}, - iamGroupPolicyMap: map[string]MappedPolicy{}, + iamGroupPolicyMap: xsync.NewMapOf[string, MappedPolicy](), } } @@ -375,14 +386,14 @@ func (c *iamCache) policyDBGet(store *IAMStoreSys, name string, isGroup bool) ([ } } - policy, ok := c.iamGroupPolicyMap[name] + policy, ok := c.iamGroupPolicyMap.Load(name) if ok { return policy.toSlice(), policy.UpdatedAt, nil } if err := store.loadMappedPolicyWithRetry(context.TODO(), name, regUser, true, c.iamGroupPolicyMap, 3); err != nil && !errors.Is(err, errNoSuchPolicy) { return nil, time.Time{}, err } - policy = c.iamGroupPolicyMap[name] + policy, _ = c.iamGroupPolicyMap.Load(name) return policy.toSlice(), policy.UpdatedAt, nil } @@ -398,23 +409,23 @@ func (c *iamCache) policyDBGet(store *IAMStoreSys, name string, isGroup bool) ([ // For internal IDP regular/service account user accounts, the policy // mapping is iamUserPolicyMap. For STS accounts, the parent user would be // passed here and we lookup the mapping in iamSTSPolicyMap. - mp, ok := c.iamUserPolicyMap[name] + mp, ok := c.iamUserPolicyMap.Load(name) if !ok { if err := store.loadMappedPolicyWithRetry(context.TODO(), name, regUser, false, c.iamUserPolicyMap, 3); err != nil && !errors.Is(err, errNoSuchPolicy) { return nil, time.Time{}, err } - mp, ok = c.iamUserPolicyMap[name] + mp, ok = c.iamUserPolicyMap.Load(name) if !ok { - // Since user "name" could be a parent user of an STS account, we lookup + // Since user "name" could be a parent user of an STS account, we look up // mappings for those too. - mp, ok = c.iamSTSPolicyMap[name] + mp, ok = c.iamSTSPolicyMap.Load(name) if !ok { // Attempt to load parent user mapping for STS accounts if err := store.loadMappedPolicyWithRetry(context.TODO(), name, stsUser, false, c.iamSTSPolicyMap, 3); err != nil && !errors.Is(err, errNoSuchPolicy) { return nil, time.Time{}, err } - mp = c.iamSTSPolicyMap[name] + mp, _ = c.iamSTSPolicyMap.Load(name) } } } @@ -442,12 +453,12 @@ func (c *iamCache) policyDBGet(store *IAMStoreSys, name string, isGroup bool) ([ } } - policy, ok := c.iamGroupPolicyMap[group] - if ok { + policy, ok := c.iamGroupPolicyMap.Load(group) + if !ok { if err := store.loadMappedPolicyWithRetry(context.TODO(), group, regUser, true, c.iamGroupPolicyMap, 3); err != nil && !errors.Is(err, errNoSuchPolicy) { return nil, time.Time{}, err } - policy = c.iamGroupPolicyMap[group] + policy, _ = c.iamGroupPolicyMap.Load(group) } policies = append(policies, policy.toSlice()...) @@ -491,9 +502,9 @@ type IAMStorageAPI interface { loadUsers(ctx context.Context, userType IAMUserType, m map[string]UserIdentity) error loadGroup(ctx context.Context, group string, m map[string]GroupInfo) error loadGroups(ctx context.Context, m map[string]GroupInfo) error - loadMappedPolicy(ctx context.Context, name string, userType IAMUserType, isGroup bool, m map[string]MappedPolicy) error - loadMappedPolicyWithRetry(ctx context.Context, name string, userType IAMUserType, isGroup bool, m map[string]MappedPolicy, retries int) error - loadMappedPolicies(ctx context.Context, userType IAMUserType, isGroup bool, m map[string]MappedPolicy) error + loadMappedPolicy(ctx context.Context, name string, userType IAMUserType, isGroup bool, m *xsync.MapOf[string, MappedPolicy]) error + loadMappedPolicyWithRetry(ctx context.Context, name string, userType IAMUserType, isGroup bool, m *xsync.MapOf[string, MappedPolicy], retries int) error + loadMappedPolicies(ctx context.Context, userType IAMUserType, isGroup bool, m *xsync.MapOf[string, MappedPolicy]) error saveIAMConfig(ctx context.Context, item interface{}, path string, opts ...options) error loadIAMConfig(ctx context.Context, item interface{}, path string) error deleteIAMConfig(ctx context.Context, path string) error @@ -618,9 +629,10 @@ func (store *IAMStoreSys) LoadIAMCache(ctx context.Context, firstTime bool) erro // here is to account for STS policy mapping changes that should apply // for service accounts derived from such STS accounts (i.e. LDAP STS // accounts). - for k, v := range newCache.iamSTSPolicyMap { - cache.iamSTSPolicyMap[k] = v - } + newCache.iamSTSPolicyMap.Range(func(k string, v MappedPolicy) bool { + cache.iamSTSPolicyMap.Store(k, v) + return true + }) cache.updatedAt = time.Now() } @@ -659,12 +671,10 @@ func (store *IAMStoreSys) GetMappedPolicy(name string, isGroup bool) (MappedPoli defer store.runlock() if isGroup { - v, ok := cache.iamGroupPolicyMap[name] + v, ok := cache.iamGroupPolicyMap.Load(name) return v, ok } - - v, ok := cache.iamUserPolicyMap[name] - return v, ok + return cache.iamUserPolicyMap.Load(name) } // GroupNotificationHandler - updates in-memory cache on notification of @@ -683,7 +693,7 @@ func (store *IAMStoreSys) GroupNotificationHandler(ctx context.Context, group st // group does not exist - so remove from memory. cache.removeGroupFromMembershipsMap(group) delete(cache.iamGroupsMap, group) - delete(cache.iamGroupPolicyMap, group) + cache.iamGroupPolicyMap.Delete(group) cache.updatedAt = time.Now() return nil @@ -862,7 +872,7 @@ func (store *IAMStoreSys) RemoveUsersFromGroup(ctx context.Context, group string // Delete from server memory delete(cache.iamGroupsMap, group) - delete(cache.iamGroupPolicyMap, group) + cache.iamGroupPolicyMap.Delete(group) cache.updatedAt = time.Now() return cache.updatedAt, nil } @@ -954,16 +964,17 @@ func (store *IAMStoreSys) ListGroups(ctx context.Context) (res []string, err err } if store.getUsersSysType() == LDAPUsersSysType { - m := map[string]MappedPolicy{} + m := xsync.NewMapOf[string, MappedPolicy]() err = store.loadMappedPolicies(ctx, stsUser, true, m) if err != nil { return } cache.iamGroupPolicyMap = m cache.updatedAt = time.Now() - for k := range cache.iamGroupPolicyMap { + cache.iamGroupPolicyMap.Range(func(k string, v MappedPolicy) bool { res = append(res, k) - } + return true + }) } return @@ -981,9 +992,10 @@ func (store *IAMStoreSys) listGroups(ctx context.Context) (res []string, err err } if store.getUsersSysType() == LDAPUsersSysType { - for k := range cache.iamGroupPolicyMap { + cache.iamGroupPolicyMap.Range(func(k string, _ MappedPolicy) bool { res = append(res, k) - } + return true + }) } return } @@ -1006,14 +1018,14 @@ func (store *IAMStoreSys) PolicyDBUpdate(ctx context.Context, name string, isGro var mp MappedPolicy if !isGroup { if userType == stsUser { - stsMap := map[string]MappedPolicy{} + stsMap := xsync.NewMapOf[string, MappedPolicy]() // Attempt to load parent user mapping for STS accounts store.loadMappedPolicy(context.TODO(), name, stsUser, false, stsMap) - mp = stsMap[name] + mp, _ = stsMap.Load(name) } else { - mp = cache.iamUserPolicyMap[name] + mp, _ = cache.iamUserPolicyMap.Load(name) } } else { if store.getUsersSysType() == MinIOUsersSysType { @@ -1028,7 +1040,7 @@ func (store *IAMStoreSys) PolicyDBUpdate(ctx context.Context, name string, isGro return } } - mp = cache.iamGroupPolicyMap[name] + mp, _ = cache.iamGroupPolicyMap.Load(name) } // Compute net policy change effect and updated policy mapping @@ -1071,12 +1083,12 @@ func (store *IAMStoreSys) PolicyDBUpdate(ctx context.Context, name string, isGro } if !isGroup { if userType == stsUser { - delete(cache.iamSTSPolicyMap, name) + cache.iamSTSPolicyMap.Delete(name) } else { - delete(cache.iamUserPolicyMap, name) + cache.iamUserPolicyMap.Delete(name) } } else { - delete(cache.iamGroupPolicyMap, name) + cache.iamGroupPolicyMap.Delete(name) } } else { @@ -1085,12 +1097,12 @@ func (store *IAMStoreSys) PolicyDBUpdate(ctx context.Context, name string, isGro } if !isGroup { if userType == stsUser { - cache.iamSTSPolicyMap[name] = newPolicyMapping + cache.iamSTSPolicyMap.Store(name, newPolicyMapping) } else { - cache.iamUserPolicyMap[name] = newPolicyMapping + cache.iamUserPolicyMap.Store(name, newPolicyMapping) } } else { - cache.iamGroupPolicyMap[name] = newPolicyMapping + cache.iamGroupPolicyMap.Store(name, newPolicyMapping) } } @@ -1125,12 +1137,12 @@ func (store *IAMStoreSys) PolicyDBSet(ctx context.Context, name, policy string, } if !isGroup { if userType == stsUser { - delete(cache.iamSTSPolicyMap, name) + cache.iamSTSPolicyMap.Delete(name) } else { - delete(cache.iamUserPolicyMap, name) + cache.iamUserPolicyMap.Delete(name) } } else { - delete(cache.iamGroupPolicyMap, name) + cache.iamGroupPolicyMap.Delete(name) } cache.updatedAt = time.Now() return cache.updatedAt, nil @@ -1149,12 +1161,12 @@ func (store *IAMStoreSys) PolicyDBSet(ctx context.Context, name, policy string, } if !isGroup { if userType == stsUser { - cache.iamSTSPolicyMap[name] = mp + cache.iamSTSPolicyMap.Store(name, mp) } else { - cache.iamUserPolicyMap[name] = mp + cache.iamUserPolicyMap.Store(name, mp) } } else { - cache.iamGroupPolicyMap[name] = mp + cache.iamGroupPolicyMap.Store(name, mp) } cache.updatedAt = time.Now() return mp.UpdatedAt, nil @@ -1178,33 +1190,35 @@ func (store *IAMStoreSys) PolicyNotificationHandler(ctx context.Context, policy delete(cache.iamPolicyDocsMap, policy) // update user policy map - for u, mp := range cache.iamUserPolicyMap { + cache.iamUserPolicyMap.Range(func(u string, mp MappedPolicy) bool { pset := mp.policySet() if !pset.Contains(policy) { - continue + return true } if store.getUsersSysType() == MinIOUsersSysType { _, ok := cache.iamUsersMap[u] if !ok { // happens when account is deleted or // expired. - delete(cache.iamUserPolicyMap, u) - continue + cache.iamUserPolicyMap.Delete(u) + return true } } pset.Remove(policy) - cache.iamUserPolicyMap[u] = newMappedPolicy(strings.Join(pset.ToSlice(), ",")) - } + cache.iamUserPolicyMap.Store(u, newMappedPolicy(strings.Join(pset.ToSlice(), ","))) + return true + }) // update group policy map - for g, mp := range cache.iamGroupPolicyMap { + cache.iamGroupPolicyMap.Range(func(g string, mp MappedPolicy) bool { pset := mp.policySet() if !pset.Contains(policy) { - continue + return true } pset.Remove(policy) - cache.iamGroupPolicyMap[g] = newMappedPolicy(strings.Join(pset.ToSlice(), ",")) - } + cache.iamGroupPolicyMap.Store(g, newMappedPolicy(strings.Join(pset.ToSlice(), ","))) + return true + }) cache.updatedAt = time.Now() return nil @@ -1230,26 +1244,28 @@ func (store *IAMStoreSys) DeletePolicy(ctx context.Context, policy string, isFro // we do allow deletion. users := []string{} groups := []string{} - for u, mp := range cache.iamUserPolicyMap { + cache.iamUserPolicyMap.Range(func(u string, mp MappedPolicy) bool { pset := mp.policySet() if store.getUsersSysType() == MinIOUsersSysType { if _, ok := cache.iamUsersMap[u]; !ok { // This case can happen when a temporary account is // deleted or expired - remove it from userPolicyMap. - delete(cache.iamUserPolicyMap, u) - continue + cache.iamUserPolicyMap.Delete(u) + return true } } if pset.Contains(policy) { users = append(users, u) } - } - for g, mp := range cache.iamGroupPolicyMap { + return true + }) + cache.iamGroupPolicyMap.Range(func(g string, mp MappedPolicy) bool { pset := mp.policySet() if pset.Contains(policy) { groups = append(groups, g) } - } + return true + }) if len(users) != 0 || len(groups) != 0 { return errPolicyInUse } @@ -1466,11 +1482,11 @@ func (store *IAMStoreSys) GetBucketUsers(bucket string) (map[string]madmin.UserI continue } var policies []string - mp, ok := cache.iamUserPolicyMap[k] + mp, ok := cache.iamUserPolicyMap.Load(k) if ok { policies = append(policies, mp.Policies) for _, group := range cache.iamUserGroupMemberships[k].ToSlice() { - if nmp, ok := cache.iamGroupPolicyMap[group]; ok { + if nmp, ok := cache.iamGroupPolicyMap.Load(group); ok { policies = append(policies, nmp.Policies) } } @@ -1505,8 +1521,9 @@ func (store *IAMStoreSys) GetUsers() map[string]madmin.UserInfo { if v.IsTemp() || v.IsServiceAccount() { continue } + pl, _ := cache.iamUserPolicyMap.Load(k) result[k] = madmin.UserInfo{ - PolicyName: cache.iamUserPolicyMap[k].Policies, + PolicyName: pl.Policies, Status: func() madmin.AccountStatus { if v.IsValid() { return madmin.AccountEnabled @@ -1514,7 +1531,7 @@ func (store *IAMStoreSys) GetUsers() map[string]madmin.UserInfo { return madmin.AccountDisabled }(), MemberOf: cache.iamUserGroupMemberships[k].ToSlice(), - UpdatedAt: cache.iamUserPolicyMap[k].UpdatedAt, + UpdatedAt: pl.UpdatedAt, } } @@ -1527,12 +1544,14 @@ func (store *IAMStoreSys) GetUsersWithMappedPolicies() map[string]string { defer store.runlock() result := make(map[string]string) - for k, v := range cache.iamUserPolicyMap { + cache.iamUserPolicyMap.Range(func(k string, v MappedPolicy) bool { result[k] = v.Policies - } - for k, v := range cache.iamSTSPolicyMap { + return true + }) + cache.iamSTSPolicyMap.Range(func(k string, v MappedPolicy) bool { result[k] = v.Policies - } + return true + }) return result } @@ -1561,14 +1580,14 @@ func (store *IAMStoreSys) GetUserInfo(name string) (u madmin.UserInfo, err error break } } - mappedPolicy, ok := cache.iamUserPolicyMap[name] + mappedPolicy, ok := cache.iamUserPolicyMap.Load(name) if !ok { - mappedPolicy, ok = cache.iamSTSPolicyMap[name] + mappedPolicy, ok = cache.iamSTSPolicyMap.Load(name) } if !ok { // Attempt to load parent user mapping for STS accounts store.loadMappedPolicy(context.TODO(), name, stsUser, false, cache.iamSTSPolicyMap) - mappedPolicy, ok = cache.iamSTSPolicyMap[name] + mappedPolicy, ok = cache.iamSTSPolicyMap.Load(name) if !ok { return u, errNoSuchUser } @@ -1589,9 +1608,9 @@ func (store *IAMStoreSys) GetUserInfo(name string) (u madmin.UserInfo, err error if cred.IsTemp() || cred.IsServiceAccount() { return u, errIAMActionNotAllowed } - + pl, _ := cache.iamUserPolicyMap.Load(name) return madmin.UserInfo{ - PolicyName: cache.iamUserPolicyMap[name].Policies, + PolicyName: pl.Policies, Status: func() madmin.AccountStatus { if cred.IsValid() { return madmin.AccountEnabled @@ -1599,7 +1618,7 @@ func (store *IAMStoreSys) GetUserInfo(name string) (u madmin.UserInfo, err error return madmin.AccountDisabled }(), MemberOf: cache.iamUserGroupMemberships[name].ToSlice(), - UpdatedAt: cache.iamUserPolicyMap[name].UpdatedAt, + UpdatedAt: pl.UpdatedAt, }, nil } @@ -1612,7 +1631,7 @@ func (store *IAMStoreSys) PolicyMappingNotificationHandler(ctx context.Context, cache := store.lock() defer store.unlock() - var m map[string]MappedPolicy + var m *xsync.MapOf[string, MappedPolicy] switch { case isGroup: m = cache.iamGroupPolicyMap @@ -1623,7 +1642,7 @@ func (store *IAMStoreSys) PolicyMappingNotificationHandler(ctx context.Context, if errors.Is(err, errNoSuchPolicy) { // This means that the policy mapping was deleted, so we update // the cache. - delete(m, userOrGroup) + m.Delete(userOrGroup) cache.updatedAt = time.Now() err = nil @@ -1688,7 +1707,7 @@ func (store *IAMStoreSys) UserNotificationHandler(ctx context.Context, accessKey } // 3. Delete any mapped policy - delete(cache.iamUserPolicyMap, accessKey) + cache.iamUserPolicyMap.Delete(accessKey) return nil } @@ -1782,7 +1801,7 @@ func (store *IAMStoreSys) DeleteUser(ctx context.Context, accessKey string, user // It is ok to ignore deletion error on the mapped policy store.deleteMappedPolicy(ctx, accessKey, userType, false) - delete(cache.iamUserPolicyMap, accessKey) + cache.iamUserPolicyMap.Delete(accessKey) err := store.deleteUserIdentity(ctx, accessKey, userType) if err == errNoSuchUser { @@ -1822,7 +1841,7 @@ func (store *IAMStoreSys) SetTempUser(ctx context.Context, accessKey string, cre return time.Time{}, err } - cache.iamSTSPolicyMap[cred.ParentUser] = mp + cache.iamSTSPolicyMap.Store(cred.ParentUser, mp) } u := newUserIdentity(cred) @@ -1859,7 +1878,7 @@ func (store *IAMStoreSys) DeleteUsers(ctx context.Context, users []string) error if usersToDelete.Contains(user) || usersToDelete.Contains(cred.ParentUser) { // Delete this user account and its policy mapping store.deleteMappedPolicy(ctx, user, userType, false) - delete(cache.iamUserPolicyMap, user) + cache.iamUserPolicyMap.Delete(user) // we are only logging errors, not handling them. err := store.deleteUserIdentity(ctx, user, userType) @@ -1961,13 +1980,13 @@ func (store *IAMStoreSys) listUserPolicyMappings(cache *iamCache, users []string ) []madmin.UserPolicyEntities { var r []madmin.UserPolicyEntities usersSet := set.CreateStringSet(users...) - for user, mappedPolicy := range cache.iamUserPolicyMap { + cache.iamUserPolicyMap.Range(func(user string, mappedPolicy MappedPolicy) bool { if userPredicate != nil && !userPredicate(user) { - continue + return true } if !usersSet.IsEmpty() && !usersSet.Contains(user) { - continue + return true } ps := mappedPolicy.toSlice() @@ -1976,17 +1995,18 @@ func (store *IAMStoreSys) listUserPolicyMappings(cache *iamCache, users []string User: user, Policies: ps, }) - } + return true + }) - stsMap := map[string]MappedPolicy{} + stsMap := xsync.NewMapOf[string, MappedPolicy]() for _, user := range users { // Attempt to load parent user mapping for STS accounts store.loadMappedPolicy(context.TODO(), user, stsUser, false, stsMap) } - for user, mappedPolicy := range stsMap { + stsMap.Range(func(user string, mappedPolicy MappedPolicy) bool { if userPredicate != nil && !userPredicate(user) { - continue + return true } ps := mappedPolicy.toSlice() @@ -1995,7 +2015,8 @@ func (store *IAMStoreSys) listUserPolicyMappings(cache *iamCache, users []string User: user, Policies: ps, }) - } + return true + }) sort.Slice(r, func(i, j int) bool { return r[i].User < r[j].User @@ -2010,13 +2031,13 @@ func (store *IAMStoreSys) listGroupPolicyMappings(cache *iamCache, groups []stri ) []madmin.GroupPolicyEntities { var r []madmin.GroupPolicyEntities groupsSet := set.CreateStringSet(groups...) - for group, mappedPolicy := range cache.iamGroupPolicyMap { + cache.iamGroupPolicyMap.Range(func(group string, mappedPolicy MappedPolicy) bool { if groupPredicate != nil && !groupPredicate(group) { - continue + return true } if !groupsSet.IsEmpty() && !groupsSet.Contains(group) { - continue + return true } ps := mappedPolicy.toSlice() @@ -2025,7 +2046,8 @@ func (store *IAMStoreSys) listGroupPolicyMappings(cache *iamCache, groups []stri Group: group, Policies: ps, }) - } + return true + }) sort.Slice(r, func(i, j int) bool { return r[i].Group < r[j].Group @@ -2041,9 +2063,9 @@ func (store *IAMStoreSys) listPolicyMappings(cache *iamCache, policies []string, queryPolSet := set.CreateStringSet(policies...) policyToUsersMap := make(map[string]set.StringSet) - for user, mappedPolicy := range cache.iamUserPolicyMap { + cache.iamUserPolicyMap.Range(func(user string, mappedPolicy MappedPolicy) bool { if userPredicate != nil && !userPredicate(user) { - continue + return true } commonPolicySet := mappedPolicy.policySet() @@ -2059,7 +2081,8 @@ func (store *IAMStoreSys) listPolicyMappings(cache *iamCache, policies []string, policyToUsersMap[policy] = s } } - } + return true + }) if iamOS, ok := store.IAMStorageAPI.(*IAMObjectStore); ok { for item := range listIAMConfigItems(context.Background(), iamOS.objAPI, iamConfigPrefix+SlashSeparator+policyDBSTSUsersListKey) { @@ -2088,9 +2111,9 @@ func (store *IAMStoreSys) listPolicyMappings(cache *iamCache, policies []string, } policyToGroupsMap := make(map[string]set.StringSet) - for group, mappedPolicy := range cache.iamGroupPolicyMap { + cache.iamGroupPolicyMap.Range(func(group string, mappedPolicy MappedPolicy) bool { if groupPredicate != nil && !groupPredicate(group) { - continue + return true } commonPolicySet := mappedPolicy.policySet() @@ -2106,7 +2129,8 @@ func (store *IAMStoreSys) listPolicyMappings(cache *iamCache, policies []string, policyToGroupsMap[policy] = s } } - } + return true + }) m := make(map[string]madmin.PolicyEntities, len(policyToGroupsMap)) for policy, groups := range policyToGroupsMap { @@ -2564,13 +2588,15 @@ func (store *IAMStoreSys) LoadUser(ctx context.Context, accessKey string) { // Load any associated policy definitions if !stsAccountFound { - for _, policy := range cache.iamUserPolicyMap[accessKey].toSlice() { + pols, _ := cache.iamUserPolicyMap.Load(accessKey) + for _, policy := range pols.toSlice() { if _, found = cache.iamPolicyDocsMap[policy]; !found { store.loadPolicyDocWithRetry(ctx, policy, cache.iamPolicyDocsMap, 3) } } } else { - for _, policy := range cache.iamSTSPolicyMap[stsUserCred.Credentials.AccessKey].toSlice() { + pols, _ := cache.iamSTSPolicyMap.Load(stsUserCred.Credentials.AccessKey) + for _, policy := range pols.toSlice() { if _, found = cache.iamPolicyDocsMap[policy]; !found { store.loadPolicyDocWithRetry(ctx, policy, cache.iamPolicyDocsMap, 3) } diff --git a/cmd/site-replication.go b/cmd/site-replication.go index 6b30c59ee..12cb3ed04 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -46,6 +46,7 @@ import ( sreplication "github.com/minio/minio/internal/bucket/replication" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/policy" + "github.com/puzpuzpuz/xsync/v3" ) const ( @@ -2044,26 +2045,28 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context, addOpts madmin. // Followed by group policy mapping { // Replicate policy mappings on local to all peers. - groupPolicyMap := make(map[string]MappedPolicy) + groupPolicyMap := xsync.NewMapOf[string, MappedPolicy]() errG := globalIAMSys.store.loadMappedPolicies(ctx, unknownIAMUserType, true, groupPolicyMap) if errG != nil { return errSRBackendIssue(errG) } - for group, mp := range groupPolicyMap { - err := c.IAMChangeHook(ctx, madmin.SRIAMItem{ + var err error + groupPolicyMap.Range(func(k string, mp MappedPolicy) bool { + err = c.IAMChangeHook(ctx, madmin.SRIAMItem{ Type: madmin.SRIAMItemPolicyMapping, PolicyMapping: &madmin.SRPolicyMapping{ - UserOrGroup: group, + UserOrGroup: k, UserType: int(unknownIAMUserType), IsGroup: true, Policy: mp.Policies, }, UpdatedAt: mp.UpdatedAt, }) - if err != nil { - return errSRIAMError(err) - } + return err == nil + }) + if err != nil { + return errSRIAMError(err) } } @@ -2128,14 +2131,14 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context, addOpts madmin. // Followed by policy mapping for the userAccounts we previously synced. { // Replicate policy mappings on local to all peers. - userPolicyMap := make(map[string]MappedPolicy) + userPolicyMap := xsync.NewMapOf[string, MappedPolicy]() errU := globalIAMSys.store.loadMappedPolicies(ctx, regUser, false, userPolicyMap) if errU != nil { return errSRBackendIssue(errU) } - - for user, mp := range userPolicyMap { - err := c.IAMChangeHook(ctx, madmin.SRIAMItem{ + var err error + userPolicyMap.Range(func(user string, mp MappedPolicy) bool { + err = c.IAMChangeHook(ctx, madmin.SRIAMItem{ Type: madmin.SRIAMItemPolicyMapping, PolicyMapping: &madmin.SRPolicyMapping{ UserOrGroup: user, @@ -2145,23 +2148,25 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context, addOpts madmin. }, UpdatedAt: mp.UpdatedAt, }) - if err != nil { - return errSRIAMError(err) - } + return err == nil + }) + if err != nil { + return errSRIAMError(err) } } // and finally followed by policy mappings for for STS users. { // Replicate policy mappings on local to all peers. - stsPolicyMap := make(map[string]MappedPolicy) + stsPolicyMap := xsync.NewMapOf[string, MappedPolicy]() errU := globalIAMSys.store.loadMappedPolicies(ctx, stsUser, false, stsPolicyMap) if errU != nil { return errSRBackendIssue(errU) } - for user, mp := range stsPolicyMap { - err := c.IAMChangeHook(ctx, madmin.SRIAMItem{ + var err error + stsPolicyMap.Range(func(user string, mp MappedPolicy) bool { + err = c.IAMChangeHook(ctx, madmin.SRIAMItem{ Type: madmin.SRIAMItemPolicyMapping, PolicyMapping: &madmin.SRPolicyMapping{ UserOrGroup: user, @@ -2171,9 +2176,10 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context, addOpts madmin. }, UpdatedAt: mp.UpdatedAt, }) - if err != nil { - return errSRIAMError(err) - } + return err == nil + }) + if err != nil { + return errSRIAMError(err) } } @@ -3783,12 +3789,12 @@ func (c *SiteReplicationSys) SiteReplicationMetaInfo(ctx context.Context, objAPI if opts.Users || opts.Entity == madmin.SRUserEntity { // Replicate policy mappings on local to all peers. - userPolicyMap := make(map[string]MappedPolicy) - stsPolicyMap := make(map[string]MappedPolicy) - svcPolicyMap := make(map[string]MappedPolicy) + userPolicyMap := xsync.NewMapOf[string, MappedPolicy]() + stsPolicyMap := xsync.NewMapOf[string, MappedPolicy]() + svcPolicyMap := xsync.NewMapOf[string, MappedPolicy]() if opts.Entity == madmin.SRUserEntity { if mp, ok := globalIAMSys.store.GetMappedPolicy(opts.EntityValue, false); ok { - userPolicyMap[opts.EntityValue] = mp + userPolicyMap.Store(opts.EntityValue, mp) } } else { stsErr := globalIAMSys.store.loadMappedPolicies(ctx, stsUser, false, stsPolicyMap) @@ -3804,34 +3810,23 @@ func (c *SiteReplicationSys) SiteReplicationMetaInfo(ctx context.Context, objAPI return info, errSRBackendIssue(svcErr) } } - info.UserPolicies = make(map[string]madmin.SRPolicyMapping, len(userPolicyMap)) - for user, mp := range userPolicyMap { - info.UserPolicies[user] = madmin.SRPolicyMapping{ - IsGroup: false, - UserOrGroup: user, - UserType: int(regUser), - Policy: mp.Policies, - UpdatedAt: mp.UpdatedAt, - } - } - for stsU, mp := range stsPolicyMap { - info.UserPolicies[stsU] = madmin.SRPolicyMapping{ - IsGroup: false, - UserOrGroup: stsU, - UserType: int(stsUser), - Policy: mp.Policies, - UpdatedAt: mp.UpdatedAt, - } - } - for svcU, mp := range svcPolicyMap { - info.UserPolicies[svcU] = madmin.SRPolicyMapping{ - IsGroup: false, - UserOrGroup: svcU, - UserType: int(svcUser), - Policy: mp.Policies, - UpdatedAt: mp.UpdatedAt, - } + info.UserPolicies = make(map[string]madmin.SRPolicyMapping, userPolicyMap.Size()) + addPolicy := func(t IAMUserType, mp *xsync.MapOf[string, MappedPolicy]) { + mp.Range(func(k string, mp MappedPolicy) bool { + info.UserPolicies[k] = madmin.SRPolicyMapping{ + IsGroup: false, + UserOrGroup: k, + UserType: int(t), + Policy: mp.Policies, + UpdatedAt: mp.UpdatedAt, + } + return true + }) } + addPolicy(regUser, userPolicyMap) + addPolicy(stsUser, stsPolicyMap) + addPolicy(svcUser, svcPolicyMap) + info.UserInfoMap = make(map[string]madmin.UserInfo) if opts.Entity == madmin.SRUserEntity { if ui, err := globalIAMSys.GetUserInfo(ctx, opts.EntityValue); err == nil { @@ -3875,10 +3870,10 @@ func (c *SiteReplicationSys) SiteReplicationMetaInfo(ctx context.Context, objAPI if opts.Groups || opts.Entity == madmin.SRGroupEntity { // Replicate policy mappings on local to all peers. - groupPolicyMap := make(map[string]MappedPolicy) + groupPolicyMap := xsync.NewMapOf[string, MappedPolicy]() if opts.Entity == madmin.SRGroupEntity { if mp, ok := globalIAMSys.store.GetMappedPolicy(opts.EntityValue, true); ok { - groupPolicyMap[opts.EntityValue] = mp + groupPolicyMap.Store(opts.EntityValue, mp) } } else { stsErr := globalIAMSys.store.loadMappedPolicies(ctx, stsUser, true, groupPolicyMap) @@ -3891,15 +3886,16 @@ func (c *SiteReplicationSys) SiteReplicationMetaInfo(ctx context.Context, objAPI } } - info.GroupPolicies = make(map[string]madmin.SRPolicyMapping, len(c.state.Peers)) - for group, mp := range groupPolicyMap { + info.GroupPolicies = make(map[string]madmin.SRPolicyMapping, groupPolicyMap.Size()) + groupPolicyMap.Range(func(group string, mp MappedPolicy) bool { info.GroupPolicies[group] = madmin.SRPolicyMapping{ IsGroup: true, UserOrGroup: group, Policy: mp.Policies, UpdatedAt: mp.UpdatedAt, } - } + return true + }) info.GroupDescMap = make(map[string]madmin.GroupDesc) if opts.Entity == madmin.SRGroupEntity { if gd, err := globalIAMSys.GetGroupDescription(opts.EntityValue); err == nil { diff --git a/go.mod b/go.mod index 42eea31f4..3d117024b 100644 --- a/go.mod +++ b/go.mod @@ -219,6 +219,7 @@ require ( github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect github.com/pquerna/cachecontrol v0.2.0 // indirect github.com/prometheus/prom2json v1.3.3 // indirect + github.com/puzpuzpuz/xsync/v3 v3.1.0 // indirect github.com/rivo/tview v0.0.0-20240307173318-e804876934a1 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/rjeczalik/notify v0.9.3 // indirect diff --git a/go.sum b/go.sum index bee67f0d1..1542f1482 100644 --- a/go.sum +++ b/go.sum @@ -588,6 +588,8 @@ github.com/prometheus/procfs v0.13.0 h1:GqzLlQyfsPbaEHaQkO7tbDlriv/4o5Hudv6OXHGK github.com/prometheus/procfs v0.13.0/go.mod h1:cd4PFCR54QLnGKPaKGA6l+cfuNXtht43ZKY6tow0Y1g= github.com/prometheus/prom2json v1.3.3 h1:IYfSMiZ7sSOfliBoo89PcufjWO4eAR0gznGcETyaUgo= github.com/prometheus/prom2json v1.3.3/go.mod h1:Pv4yIPktEkK7btWsrUTWDDDrnpUrAELaOCj+oFwlgmc= +github.com/puzpuzpuz/xsync/v3 v3.1.0 h1:EewKT7/LNac5SLiEblJeUu8z5eERHrmRLnMQL2d7qX4= +github.com/puzpuzpuz/xsync/v3 v3.1.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= diff --git a/internal/grid/connection.go b/internal/grid/connection.go index 6a6979c74..6bb2da408 100644 --- a/internal/grid/connection.go +++ b/internal/grid/connection.go @@ -42,6 +42,7 @@ import ( xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/pubsub" + "github.com/puzpuzpuz/xsync/v3" "github.com/tinylib/msgp/msgp" "github.com/zeebo/xxh3" ) @@ -75,10 +76,10 @@ type Connection struct { ctx context.Context // Active mux connections. - outgoing *lockedClientMap + outgoing *xsync.MapOf[uint64, *muxClient] // Incoming streams - inStream *lockedServerMap + inStream *xsync.MapOf[uint64, *muxServer] // outQueue is the output queue outQueue chan []byte @@ -205,8 +206,8 @@ func newConnection(o connectionParams) *Connection { Local: o.local, id: o.id, ctx: o.ctx, - outgoing: &lockedClientMap{m: make(map[uint64]*muxClient, 1000)}, - inStream: &lockedServerMap{m: make(map[uint64]*muxServer, 1000)}, + outgoing: xsync.NewMapOfPresized[uint64, *muxClient](1000), + inStream: xsync.NewMapOfPresized[uint64, *muxServer](1000), outQueue: make(chan []byte, defaultOutQueue), dialer: o.dial, side: ws.StateServerSide, diff --git a/internal/grid/grid.go b/internal/grid/grid.go index 5034e1a8e..2652ce053 100644 --- a/internal/grid/grid.go +++ b/internal/grid/grid.go @@ -146,142 +146,3 @@ func bytesOrLength(b []byte) string { } return fmt.Sprint(b) } - -type lockedClientMap struct { - m map[uint64]*muxClient - mu sync.Mutex -} - -func (m *lockedClientMap) Load(id uint64) (*muxClient, bool) { - m.mu.Lock() - v, ok := m.m[id] - m.mu.Unlock() - return v, ok -} - -func (m *lockedClientMap) LoadAndDelete(id uint64) (*muxClient, bool) { - m.mu.Lock() - v, ok := m.m[id] - if ok { - delete(m.m, id) - } - m.mu.Unlock() - return v, ok -} - -func (m *lockedClientMap) Size() int { - m.mu.Lock() - v := len(m.m) - m.mu.Unlock() - return v -} - -func (m *lockedClientMap) Delete(id uint64) { - m.mu.Lock() - delete(m.m, id) - m.mu.Unlock() -} - -func (m *lockedClientMap) Range(fn func(key uint64, value *muxClient) bool) { - m.mu.Lock() - defer m.mu.Unlock() - for k, v := range m.m { - if !fn(k, v) { - break - } - } -} - -func (m *lockedClientMap) Clear() { - m.mu.Lock() - m.m = map[uint64]*muxClient{} - m.mu.Unlock() -} - -func (m *lockedClientMap) LoadOrStore(id uint64, v *muxClient) (*muxClient, bool) { - m.mu.Lock() - v2, ok := m.m[id] - if ok { - m.mu.Unlock() - return v2, true - } - m.m[id] = v - m.mu.Unlock() - return v, false -} - -type lockedServerMap struct { - m map[uint64]*muxServer - mu sync.Mutex -} - -func (m *lockedServerMap) Load(id uint64) (*muxServer, bool) { - m.mu.Lock() - v, ok := m.m[id] - m.mu.Unlock() - return v, ok -} - -func (m *lockedServerMap) LoadAndDelete(id uint64) (*muxServer, bool) { - m.mu.Lock() - v, ok := m.m[id] - if ok { - delete(m.m, id) - } - m.mu.Unlock() - return v, ok -} - -func (m *lockedServerMap) Size() int { - m.mu.Lock() - v := len(m.m) - m.mu.Unlock() - return v -} - -func (m *lockedServerMap) Delete(id uint64) { - m.mu.Lock() - delete(m.m, id) - m.mu.Unlock() -} - -func (m *lockedServerMap) Range(fn func(key uint64, value *muxServer) bool) { - m.mu.Lock() - for k, v := range m.m { - if !fn(k, v) { - break - } - } - m.mu.Unlock() -} - -func (m *lockedServerMap) Clear() { - m.mu.Lock() - m.m = map[uint64]*muxServer{} - m.mu.Unlock() -} - -func (m *lockedServerMap) LoadOrStore(id uint64, v *muxServer) (*muxServer, bool) { - m.mu.Lock() - v2, ok := m.m[id] - if ok { - m.mu.Unlock() - return v2, true - } - m.m[id] = v - m.mu.Unlock() - return v, false -} - -func (m *lockedServerMap) LoadOrCompute(id uint64, fn func() *muxServer) (*muxServer, bool) { - m.mu.Lock() - v2, ok := m.m[id] - if ok { - m.mu.Unlock() - return v2, true - } - v := fn() - m.m[id] = v - m.mu.Unlock() - return v, false -}