Reload a specific user or policy on peers (#7705)

Fixes #7587
This commit is contained in:
Harshavardhana
2019-06-06 17:46:22 -07:00
committed by kannappanr
parent 975237cbf8
commit 6d89435356
7 changed files with 565 additions and 187 deletions

View File

@@ -25,6 +25,7 @@ import (
"time"
etcd "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/minio/minio-go/v6/pkg/set"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth"
@@ -60,6 +61,45 @@ type IAMSys struct {
iamCannedPolicyMap map[string]iampolicy.Policy
}
// LoadPolicy - reloads a specific canned policy from backend disks or etcd.
func (sys *IAMSys) LoadPolicy(objAPI ObjectLayer, policyName string) error {
if objAPI == nil {
return errInvalidArgument
}
sys.Lock()
defer sys.Unlock()
prefix := iamConfigPoliciesPrefix
if globalEtcdClient == nil {
return reloadPolicy(context.Background(), objAPI, prefix, policyName, sys.iamCannedPolicyMap)
}
// When etcd is set, we use watch APIs so this code is not needed.
return nil
}
// LoadUser - reloads a specific user from backend disks or etcd.
func (sys *IAMSys) LoadUser(objAPI ObjectLayer, accessKey string, temp bool) error {
if objAPI == nil {
return errInvalidArgument
}
sys.Lock()
defer sys.Unlock()
prefix := iamConfigUsersPrefix
if temp {
prefix = iamConfigSTSPrefix
}
if globalEtcdClient == nil {
return reloadUser(context.Background(), objAPI, prefix, accessKey, sys.iamUsersMap, sys.iamPolicyMap)
}
// When etcd is set, we use watch APIs so this code is not needed.
return nil
}
// Load - loads iam subsystem
func (sys *IAMSys) Load(objAPI ObjectLayer) error {
if globalEtcdClient != nil {
@@ -68,6 +108,105 @@ func (sys *IAMSys) Load(objAPI ObjectLayer) error {
return sys.refresh(objAPI)
}
func (sys *IAMSys) reloadFromEvent(event *etcd.Event) {
eventCreate := event.IsModify() || event.IsCreate()
eventDelete := event.Type == etcd.EventTypeDelete
usersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigUsersPrefix)
stsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigSTSPrefix)
policyPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPoliciesPrefix)
ctx, cancel := context.WithTimeout(context.Background(),
defaultContextTimeout)
defer cancel()
switch {
case eventCreate:
switch {
case usersPrefix:
accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigUsersPrefix))
reloadEtcdUser(ctx, iamConfigUsersPrefix, accessKey,
sys.iamUsersMap, sys.iamPolicyMap)
case stsPrefix:
accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigSTSPrefix))
reloadEtcdUser(ctx, iamConfigSTSPrefix, accessKey,
sys.iamUsersMap, sys.iamPolicyMap)
case policyPrefix:
policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigPoliciesPrefix))
reloadEtcdPolicy(ctx, iamConfigPoliciesPrefix,
policyName, sys.iamCannedPolicyMap)
}
case eventDelete:
switch {
case usersPrefix:
accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigUsersPrefix))
delete(sys.iamUsersMap, accessKey)
delete(sys.iamPolicyMap, accessKey)
case stsPrefix:
accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigSTSPrefix))
delete(sys.iamUsersMap, accessKey)
delete(sys.iamPolicyMap, accessKey)
case policyPrefix:
policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigPoliciesPrefix))
delete(sys.iamCannedPolicyMap, policyName)
}
}
}
// Watch etcd entries for IAM
func (sys *IAMSys) watchIAMEtcd() {
watchEtcd := func() {
// Refresh IAMSys with etcd watch.
for {
watchCh := globalEtcdClient.Watch(context.Background(),
iamConfigPrefix, etcd.WithPrefix(), etcd.WithKeysOnly())
select {
case <-GlobalServiceDoneCh:
return
case watchResp, ok := <-watchCh:
if !ok {
time.Sleep(1 * time.Second)
continue
}
if err := watchResp.Err(); err != nil {
logger.LogIf(context.Background(), err)
// log and retry.
time.Sleep(1 * time.Second)
continue
}
for _, event := range watchResp.Events {
sys.Lock()
sys.reloadFromEvent(event)
sys.Unlock()
}
}
}
}
go watchEtcd()
}
func (sys *IAMSys) watchIAMDisk(objAPI ObjectLayer) {
watchDisk := func() {
ticker := time.NewTicker(globalRefreshIAMInterval)
defer ticker.Stop()
for {
select {
case <-GlobalServiceDoneCh:
return
case <-ticker.C:
sys.refresh(objAPI)
}
}
}
// Refresh IAMSys in background.
go watchDisk()
}
// Init - initializes config system from iam.json
func (sys *IAMSys) Init(objAPI ObjectLayer) error {
if objAPI == nil {
@@ -75,51 +214,9 @@ func (sys *IAMSys) Init(objAPI ObjectLayer) error {
}
if globalEtcdClient != nil {
defer func() {
go func() {
// Refresh IAMSys with etcd watch.
for {
watchCh := globalEtcdClient.Watch(context.Background(),
iamConfigPrefix, etcd.WithPrefix())
select {
case <-GlobalServiceDoneCh:
return
case watchResp, ok := <-watchCh:
if !ok {
time.Sleep(1 * time.Second)
continue
}
if err := watchResp.Err(); err != nil {
logger.LogIf(context.Background(), err)
// log and retry.
time.Sleep(1 * time.Second)
continue
}
for _, event := range watchResp.Events {
if event.IsModify() || event.IsCreate() || event.Type == etcd.EventTypeDelete {
sys.refreshEtcd()
}
}
}
}
}()
}()
defer sys.watchIAMEtcd()
} else {
defer func() {
// Refresh IAMSys in background.
go func() {
ticker := time.NewTicker(globalRefreshIAMInterval)
defer ticker.Stop()
for {
select {
case <-GlobalServiceDoneCh:
return
case <-ticker.C:
sys.refresh(objAPI)
}
}
}()
}()
defer sys.watchIAMDisk(objAPI)
}
doneCh := make(chan struct{})
@@ -148,8 +245,8 @@ func (sys *IAMSys) Init(objAPI ObjectLayer) error {
return nil
}
// DeleteCannedPolicy - deletes a canned policy.
func (sys *IAMSys) DeleteCannedPolicy(policyName string) error {
// DeletePolicy - deletes a canned policy from backend or etcd.
func (sys *IAMSys) DeletePolicy(policyName string) error {
objectAPI := newObjectLayerFn()
if objectAPI == nil {
return errServerNotInitialized
@@ -160,11 +257,17 @@ func (sys *IAMSys) DeleteCannedPolicy(policyName string) error {
}
var err error
configFile := pathJoin(iamConfigPoliciesPrefix, policyName, iamPolicyFile)
pFile := pathJoin(iamConfigPoliciesPrefix, policyName, iamPolicyFile)
if globalEtcdClient != nil {
err = deleteConfigEtcd(context.Background(), globalEtcdClient, configFile)
err = deleteConfigEtcd(context.Background(), globalEtcdClient, pFile)
} else {
err = deleteConfig(context.Background(), objectAPI, configFile)
err = deleteConfig(context.Background(), objectAPI, pFile)
}
switch err.(type) {
case ObjectNotFound:
// Ignore error if policy is already deleted.
err = nil
}
sys.Lock()
@@ -174,8 +277,8 @@ func (sys *IAMSys) DeleteCannedPolicy(policyName string) error {
return err
}
// ListCannedPolicies - lists all canned policies.
func (sys *IAMSys) ListCannedPolicies() (map[string][]byte, error) {
// ListPolicies - lists all canned policies.
func (sys *IAMSys) ListPolicies() (map[string][]byte, error) {
objectAPI := newObjectLayerFn()
if objectAPI == nil {
return nil, errServerNotInitialized
@@ -197,8 +300,8 @@ func (sys *IAMSys) ListCannedPolicies() (map[string][]byte, error) {
return cannedPolicyMap, nil
}
// SetCannedPolicy - sets a new canned policy.
func (sys *IAMSys) SetCannedPolicy(policyName string, p iampolicy.Policy) error {
// SetPolicy - sets a new canned policy.
func (sys *IAMSys) SetPolicy(policyName string, p iampolicy.Policy) error {
objectAPI := newObjectLayerFn()
if objectAPI == nil {
return errServerNotInitialized
@@ -279,11 +382,11 @@ func (sys *IAMSys) DeleteUser(accessKey string) error {
pFile := pathJoin(iamConfigUsersPrefix, accessKey, iamPolicyFile)
iFile := pathJoin(iamConfigUsersPrefix, accessKey, iamIdentityFile)
if globalEtcdClient != nil {
// It is okay to ingnore errors when deleting policy.json for the user.
_ = deleteConfigEtcd(context.Background(), globalEtcdClient, pFile)
// It is okay to ignore errors when deleting policy.json for the user.
deleteConfigEtcd(context.Background(), globalEtcdClient, pFile)
err = deleteConfigEtcd(context.Background(), globalEtcdClient, iFile)
} else {
// It is okay to ingnore errors when deleting policy.json for the user.
// It is okay to ignore errors when deleting policy.json for the user.
_ = deleteConfig(context.Background(), objectAPI, pFile)
err = deleteConfig(context.Background(), objectAPI, iFile)
}
@@ -291,7 +394,8 @@ func (sys *IAMSys) DeleteUser(accessKey string) error {
//
switch err.(type) {
case ObjectNotFound:
err = errNoSuchUser
// ignore if user is already deleted.
err = nil
}
sys.Lock()
@@ -567,21 +671,9 @@ func (sys *IAMSys) IsAllowed(args iampolicy.Args) bool {
var defaultContextTimeout = 30 * time.Second
// Similar to reloadUsers but updates users, policies maps from etcd server,
func reloadEtcdUsers(prefix string, usersMap map[string]auth.Credentials, policyMap map[string]string) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout)
defer cancel()
r, err := globalEtcdClient.Get(ctx, prefix, etcd.WithPrefix(), etcd.WithKeysOnly())
if err != nil {
return err
}
// No users are created yet.
if r.Count == 0 {
return nil
}
func etcdKvsToSet(prefix string, kvs []*mvccpb.KeyValue) set.StringSet {
users := set.NewStringSet()
for _, kv := range r.Kvs {
for _, kv := range kvs {
// Extract user by stripping off the `prefix` value as suffix,
// then strip off the remaining basename to obtain the prefix
// value, usually in the following form.
@@ -595,46 +687,48 @@ func reloadEtcdUsers(prefix string, usersMap map[string]auth.Credentials, policy
users.Add(user)
}
}
return users
}
// Similar to reloadUsers but updates users, policies maps from etcd server,
func reloadEtcdUsers(prefix string, usersMap map[string]auth.Credentials, policyMap map[string]string) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout)
defer cancel()
r, err := globalEtcdClient.Get(ctx, prefix, etcd.WithPrefix(), etcd.WithKeysOnly())
if err != nil {
return err
}
// No users are created yet.
if r.Count == 0 {
return nil
}
users := etcdKvsToSet(prefix, r.Kvs)
// Reload config and policies for all users.
for _, user := range users.ToSlice() {
idFile := pathJoin(prefix, user, iamIdentityFile)
pFile := pathJoin(prefix, user, iamPolicyFile)
cdata, cerr := readConfigEtcd(ctx, globalEtcdClient, idFile)
pdata, perr := readConfigEtcd(ctx, globalEtcdClient, pFile)
if cerr != nil && cerr != errConfigNotFound {
return cerr
}
if perr != nil && perr != errConfigNotFound {
return perr
}
if cerr == errConfigNotFound && perr == errConfigNotFound {
continue
}
if cerr == nil {
var cred auth.Credentials
if err = json.Unmarshal(cdata, &cred); err != nil {
return err
}
cred.AccessKey = user
if cred.IsExpired() {
deleteConfigEtcd(ctx, globalEtcdClient, idFile)
deleteConfigEtcd(ctx, globalEtcdClient, pFile)
continue
}
usersMap[cred.AccessKey] = cred
}
if perr == nil {
var policyName string
if err = json.Unmarshal(pdata, &policyName); err != nil {
return err
}
policyMap[user] = policyName
if err = reloadEtcdUser(ctx, prefix, user, usersMap, policyMap); err != nil {
return err
}
}
return nil
}
func reloadEtcdPolicy(ctx context.Context, prefix string, policyName string,
cannedPolicyMap map[string]iampolicy.Policy) error {
pFile := pathJoin(prefix, policyName, iamPolicyFile)
pdata, err := readConfigEtcd(ctx, globalEtcdClient, pFile)
if err != nil {
return err
}
var p iampolicy.Policy
if err = json.Unmarshal(pdata, &p); err != nil {
return err
}
cannedPolicyMap[policyName] = p
return nil
}
func reloadEtcdPolicies(prefix string, cannedPolicyMap map[string]iampolicy.Policy) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout)
defer cancel()
@@ -647,38 +741,32 @@ func reloadEtcdPolicies(prefix string, cannedPolicyMap map[string]iampolicy.Poli
return nil
}
policies := set.NewStringSet()
for _, kv := range r.Kvs {
// Extract policy by stripping off the `prefix` value as suffix,
// then strip off the remaining basename to obtain the prefix
// value, usually in the following form.
//
// key := "config/iam/policies/newpolicy/identity.json"
// prefix := "config/iam/policies/"
// v := trim(trim(key, prefix), base(key)) == "newpolicy"
//
policyName := path.Clean(strings.TrimSuffix(strings.TrimPrefix(string(kv.Key), prefix), path.Base(string(kv.Key))))
if !policies.Contains(policyName) {
policies.Add(policyName)
}
}
policies := etcdKvsToSet(prefix, r.Kvs)
// Reload config and policies for all policys.
for _, policyName := range policies.ToSlice() {
pFile := pathJoin(prefix, policyName, iamPolicyFile)
pdata, perr := readConfigEtcd(ctx, globalEtcdClient, pFile)
if perr != nil {
return perr
}
var p iampolicy.Policy
if err = json.Unmarshal(pdata, &p); err != nil {
if err = reloadEtcdPolicy(ctx, prefix, policyName, cannedPolicyMap); err != nil {
return err
}
cannedPolicyMap[policyName] = p
}
return nil
}
func reloadPolicy(ctx context.Context, objectAPI ObjectLayer, prefix string,
policyName string, cannedPolicyMap map[string]iampolicy.Policy) error {
pFile := pathJoin(prefix, policyName, iamPolicyFile)
pdata, err := readConfig(context.Background(), objectAPI, pFile)
if err != nil {
return err
}
var p iampolicy.Policy
if err = json.Unmarshal(pdata, &p); err != nil {
return err
}
cannedPolicyMap[path.Base(prefix)] = p
return nil
}
func reloadPolicies(objectAPI ObjectLayer, prefix string, cannedPolicyMap map[string]iampolicy.Policy) error {
marker := ""
for {
@@ -690,16 +778,9 @@ func reloadPolicies(objectAPI ObjectLayer, prefix string, cannedPolicyMap map[st
}
marker = lo.NextMarker
for _, prefix := range lo.Prefixes {
pFile := pathJoin(prefix, iamPolicyFile)
pdata, perr := readConfig(context.Background(), objectAPI, pFile)
if perr != nil {
return perr
}
var p iampolicy.Policy
if err = json.Unmarshal(pdata, &p); err != nil {
if err = reloadPolicy(context.Background(), objectAPI, "", prefix, cannedPolicyMap); err != nil {
return err
}
cannedPolicyMap[path.Base(prefix)] = p
}
if !lo.IsTruncated {
break
@@ -709,6 +790,86 @@ func reloadPolicies(objectAPI ObjectLayer, prefix string, cannedPolicyMap map[st
}
func reloadEtcdUser(ctx context.Context, prefix string, accessKey string,
usersMap map[string]auth.Credentials, policyMap map[string]string) error {
idFile := pathJoin(prefix, accessKey, iamIdentityFile)
pFile := pathJoin(prefix, accessKey, iamPolicyFile)
cdata, cerr := readConfigEtcd(ctx, globalEtcdClient, idFile)
pdata, perr := readConfigEtcd(ctx, globalEtcdClient, pFile)
if cerr != nil && cerr != errConfigNotFound {
return cerr
}
if perr != nil && perr != errConfigNotFound {
return perr
}
if cerr == errConfigNotFound && perr == errConfigNotFound {
return nil
}
if cerr == nil {
var cred auth.Credentials
if err := json.Unmarshal(cdata, &cred); err != nil {
return err
}
cred.AccessKey = path.Base(accessKey)
if cred.IsExpired() {
// Delete expired identity.
deleteConfigEtcd(ctx, globalEtcdClient, idFile)
// Delete expired identity policy.
deleteConfigEtcd(ctx, globalEtcdClient, pFile)
return nil
}
usersMap[cred.AccessKey] = cred
}
if perr == nil {
var policyName string
if err := json.Unmarshal(pdata, &policyName); err != nil {
return err
}
policyMap[path.Base(accessKey)] = policyName
}
return nil
}
func reloadUser(ctx context.Context, objectAPI ObjectLayer, prefix string, accessKey string,
usersMap map[string]auth.Credentials, policyMap map[string]string) error {
idFile := pathJoin(prefix, accessKey, iamIdentityFile)
pFile := pathJoin(prefix, accessKey, iamPolicyFile)
cdata, cerr := readConfig(ctx, objectAPI, idFile)
pdata, perr := readConfig(ctx, objectAPI, pFile)
if cerr != nil && cerr != errConfigNotFound {
return cerr
}
if perr != nil && perr != errConfigNotFound {
return perr
}
if cerr == errConfigNotFound && perr == errConfigNotFound {
return nil
}
if cerr == nil {
var cred auth.Credentials
if err := json.Unmarshal(cdata, &cred); err != nil {
return err
}
cred.AccessKey = path.Base(accessKey)
if cred.IsExpired() {
// Delete expired identity.
objectAPI.DeleteObject(context.Background(), minioMetaBucket, idFile)
// Delete expired identity policy.
objectAPI.DeleteObject(context.Background(), minioMetaBucket, pFile)
return nil
}
usersMap[cred.AccessKey] = cred
}
if perr == nil {
var policyName string
if err := json.Unmarshal(pdata, &policyName); err != nil {
return err
}
policyMap[path.Base(accessKey)] = policyName
}
return nil
}
// reloadUsers reads an updates users, policies from object layer into user and policy maps.
func reloadUsers(objectAPI ObjectLayer, prefix string, usersMap map[string]auth.Credentials, policyMap map[string]string) error {
marker := ""
@@ -721,40 +882,9 @@ func reloadUsers(objectAPI ObjectLayer, prefix string, usersMap map[string]auth.
}
marker = lo.NextMarker
for _, prefix := range lo.Prefixes {
idFile := pathJoin(prefix, iamIdentityFile)
pFile := pathJoin(prefix, iamPolicyFile)
cdata, cerr := readConfig(context.Background(), objectAPI, idFile)
pdata, perr := readConfig(context.Background(), objectAPI, pFile)
if cerr != nil && cerr != errConfigNotFound {
return cerr
}
if perr != nil && perr != errConfigNotFound {
return perr
}
if cerr == errConfigNotFound && perr == errConfigNotFound {
continue
}
if cerr == nil {
var cred auth.Credentials
if err = json.Unmarshal(cdata, &cred); err != nil {
return err
}
cred.AccessKey = path.Base(prefix)
if cred.IsExpired() {
// Delete expired identity.
objectAPI.DeleteObject(context.Background(), minioMetaBucket, idFile)
// Delete expired identity policy.
objectAPI.DeleteObject(context.Background(), minioMetaBucket, pFile)
continue
}
usersMap[cred.AccessKey] = cred
}
if perr == nil {
var policyName string
if err = json.Unmarshal(pdata, &policyName); err != nil {
return err
}
policyMap[path.Base(prefix)] = policyName
// Prefix is empty because prefix is already part of the List output.
if err = reloadUser(context.Background(), objectAPI, "", prefix, usersMap, policyMap); err != nil {
return err
}
}
if !lo.IsTruncated {