Reload users upon AddUser on peers (#6975)

Also migrate ReloadFormat to notification subsystem,
remove GetConfig() we do not use this API anymore
This commit is contained in:
Harshavardhana 2018-12-18 14:39:21 -08:00 committed by Dee Koder
parent 65ddff8899
commit 4f31a9a33b
11 changed files with 146 additions and 155 deletions

View File

@ -867,6 +867,14 @@ func (a adminAPIHandlers) SetUserStatus(w http.ResponseWriter, r *http.Request)
writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL)
return
}
// Notify all other Minio peers to reload users
for host, err := range globalNotificationSys.LoadUsers() {
if err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", host.String())
logger.LogIf(ctx, err)
}
}
}
// AddUser - PUT /minio/admin/v1/add-user?accessKey=<access_key>
@ -927,6 +935,14 @@ func (a adminAPIHandlers) AddUser(w http.ResponseWriter, r *http.Request) {
writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL)
return
}
// Notify all other Minio peers to reload users
for host, err := range globalNotificationSys.LoadUsers() {
if err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", host.String())
logger.LogIf(ctx, err)
}
}
}
// ListCannedPolicies - GET /minio/admin/v1/list-canned-policies
@ -992,6 +1008,14 @@ func (a adminAPIHandlers) RemoveCannedPolicy(w http.ResponseWriter, r *http.Requ
writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL)
return
}
// Notify all other Minio peers to reload users
for host, err := range globalNotificationSys.LoadUsers() {
if err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", host.String())
logger.LogIf(ctx, err)
}
}
}
// AddCannedPolicy - PUT /minio/admin/v1/add-canned-policy?name=<policy_name>
@ -1049,6 +1073,14 @@ func (a adminAPIHandlers) AddCannedPolicy(w http.ResponseWriter, r *http.Request
writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL)
return
}
// Notify all other Minio peers to reload users
for host, err := range globalNotificationSys.LoadUsers() {
if err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", host.String())
logger.LogIf(ctx, err)
}
}
}
// SetUserPolicy - PUT /minio/admin/v1/set-user-policy?accessKey=<access_key>&name=<policy_name>
@ -1088,6 +1120,14 @@ func (a adminAPIHandlers) SetUserPolicy(w http.ResponseWriter, r *http.Request)
if err := globalIAMSys.SetUserPolicy(accessKey, policyName); err != nil {
writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL)
}
// Notify all other Minio peers to reload users
for host, err := range globalNotificationSys.LoadUsers() {
if err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", host.String())
logger.LogIf(ctx, err)
}
}
}
// SetConfigHandler - PUT /minio/admin/v1/config

View File

@ -641,7 +641,12 @@ func (h *healSequence) healDiskFormat() error {
// Healing succeeded notify the peers to reload format and re-initialize disks.
// We will not notify peers only if healing succeeded.
if err == nil {
peersReInitFormat(globalAdminPeers, h.settings.DryRun)
for host, rerr := range globalNotificationSys.ReloadFormat(h.settings.DryRun) {
if rerr != nil {
logger.GetReqInfo(h.ctx).SetTags("peerAddress", host.String())
logger.LogIf(h.ctx, rerr)
}
}
}
// Push format heal result

View File

@ -45,29 +45,12 @@ func (rpcClient *AdminRPCClient) SignalService(signal serviceSignal) (err error)
return rpcClient.Call(adminServiceName+".SignalService", &args, &reply)
}
// ReInitFormat - re-initialize disk format, remotely.
func (rpcClient *AdminRPCClient) ReInitFormat(dryRun bool) error {
args := ReInitFormatArgs{DryRun: dryRun}
reply := VoidReply{}
return rpcClient.Call(adminServiceName+".ReInitFormat", &args, &reply)
}
// ServerInfo - returns the server info of the server to which the RPC call is made.
func (rpcClient *AdminRPCClient) ServerInfo() (sid ServerInfoData, err error) {
err = rpcClient.Call(adminServiceName+".ServerInfo", &AuthArgs{}, &sid)
return sid, err
}
// GetConfig - returns config.json of the remote server.
func (rpcClient *AdminRPCClient) GetConfig() ([]byte, error) {
args := AuthArgs{}
var reply []byte
err := rpcClient.Call(adminServiceName+".GetConfig", &args, &reply)
return reply, err
}
// StartProfiling - starts profiling in the remote server.
func (rpcClient *AdminRPCClient) StartProfiling(profiler string) error {
args := StartProfilingArgs{Profiler: profiler}
@ -125,9 +108,7 @@ func NewAdminRPCClient(host *xnet.Host) (*AdminRPCClient, error) {
// commands like service stop and service restart.
type adminCmdRunner interface {
SignalService(s serviceSignal) error
ReInitFormat(dryRun bool) error
ServerInfo() (ServerInfoData, error)
GetConfig() ([]byte, error)
StartProfiling(string) error
DownloadProfilingData() ([]byte, error)
}
@ -174,26 +155,6 @@ func makeAdminPeers(endpoints EndpointList) (adminPeerList adminPeers) {
return adminPeerList
}
// peersReInitFormat - reinitialize remote object layers to new format.
func peersReInitFormat(peers adminPeers, dryRun bool) error {
errs := make([]error, len(peers))
// Send ReInitFormat RPC call to all nodes.
// for local adminPeer this is a no-op.
wg := sync.WaitGroup{}
for i, peer := range peers {
wg.Add(1)
go func(idx int, peer adminPeer) {
defer wg.Done()
if !peer.isLocal {
errs[idx] = peer.cmdRunner.ReInitFormat(dryRun)
}
}(i, peer)
}
wg.Wait()
return nil
}
// Initialize global adminPeer collection.
func initGlobalAdminPeers(endpoints EndpointList) {
globalAdminPeers = makeAdminPeers(endpoints)

View File

@ -68,23 +68,6 @@ func (receiver *adminRPCReceiver) DownloadProfilingData(args *AuthArgs, reply *[
return
}
// GetConfig - returns the config.json of this server.
func (receiver *adminRPCReceiver) GetConfig(args *AuthArgs, reply *[]byte) (err error) {
*reply, err = receiver.local.GetConfig()
return err
}
// ReInitFormatArgs - provides dry-run information to re-initialize format.json
type ReInitFormatArgs struct {
AuthArgs
DryRun bool
}
// ReInitFormat - re-init 'format.json'
func (receiver *adminRPCReceiver) ReInitFormat(args *ReInitFormatArgs, reply *VoidReply) error {
return receiver.local.ReInitFormat(args.DryRun)
}
// NewAdminRPCServer - returns new admin RPC server.
func NewAdminRPCServer() (*xrpc.Server, error) {
rpcServer := xrpc.NewServer()

View File

@ -63,34 +63,6 @@ func testAdminCmdRunnerSignalService(t *testing.T, client adminCmdRunner) {
}
}
func testAdminCmdRunnerReInitFormat(t *testing.T, client adminCmdRunner) {
tmpGlobalObjectAPI := globalObjectAPI
defer func() {
globalObjectAPI = tmpGlobalObjectAPI
}()
testCases := []struct {
objectAPI ObjectLayer
dryRun bool
expectErr bool
}{
{&DummyObjectLayer{}, true, false},
{&DummyObjectLayer{}, false, false},
{nil, true, true},
{nil, false, true},
}
for i, testCase := range testCases {
globalObjectAPI = testCase.objectAPI
err := client.ReInitFormat(testCase.dryRun)
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("case %v: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
}
}
func testAdminCmdRunnerServerInfo(t *testing.T, client adminCmdRunner) {
tmpGlobalBootTime := globalBootTime
tmpGlobalObjectAPI := globalObjectAPI
@ -137,33 +109,6 @@ func testAdminCmdRunnerServerInfo(t *testing.T, client adminCmdRunner) {
}
}
func testAdminCmdRunnerGetConfig(t *testing.T, client adminCmdRunner) {
tmpGlobalServerConfig := globalServerConfig
defer func() {
globalServerConfig = tmpGlobalServerConfig
}()
config := newServerConfig()
testCases := []struct {
config *serverConfig
expectErr bool
}{
{globalServerConfig, false},
{config, false},
}
for i, testCase := range testCases {
globalServerConfig = testCase.config
_, err := client.GetConfig()
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("case %v: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
}
}
func newAdminRPCHTTPServerClient(t *testing.T) (*httptest.Server, *AdminRPCClient, *serverConfig) {
rpcServer, err := NewAdminRPCServer()
if err != nil {
@ -205,16 +150,6 @@ func TestAdminRPCClientSignalService(t *testing.T) {
testAdminCmdRunnerSignalService(t, rpcClient)
}
func TestAdminRPCClientReInitFormat(t *testing.T) {
httpServer, rpcClient, prevGlobalServerConfig := newAdminRPCHTTPServerClient(t)
defer httpServer.Close()
defer func() {
globalServerConfig = prevGlobalServerConfig
}()
testAdminCmdRunnerReInitFormat(t, rpcClient)
}
func TestAdminRPCClientServerInfo(t *testing.T) {
httpServer, rpcClient, prevGlobalServerConfig := newAdminRPCHTTPServerClient(t)
defer httpServer.Close()
@ -224,13 +159,3 @@ func TestAdminRPCClientServerInfo(t *testing.T) {
testAdminCmdRunnerServerInfo(t, rpcClient)
}
func TestAdminRPCClientGetConfig(t *testing.T) {
httpServer, rpcClient, prevGlobalServerConfig := newAdminRPCHTTPServerClient(t)
defer httpServer.Close()
defer func() {
globalServerConfig = prevGlobalServerConfig
}()
testAdminCmdRunnerGetConfig(t, rpcClient)
}

View File

@ -60,9 +60,9 @@ type IAMSys struct {
iamCannedPolicyMap map[string]iampolicy.Policy
}
// Load - load iam.json
// Load - loads iam subsystem
func (sys *IAMSys) Load(objAPI ObjectLayer) error {
return sys.Init(objAPI)
return sys.refresh(objAPI)
}
// Init - initializes config system from iam.json

View File

@ -18,9 +18,7 @@ package cmd
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"io/ioutil"
@ -76,15 +74,6 @@ func (lc localAdminClient) ServerInfo() (sid ServerInfoData, e error) {
}, nil
}
// GetConfig - returns config.json of the local server.
func (lc localAdminClient) GetConfig() ([]byte, error) {
if globalServerConfig == nil {
return nil, fmt.Errorf("config not present")
}
return json.Marshal(globalServerConfig)
}
// StartProfiling - starts profiling on the local server.
func (lc localAdminClient) StartProfiling(profiler string) error {
if globalProfiler != nil {

View File

@ -24,14 +24,6 @@ func TestLocalAdminClientSignalService(t *testing.T) {
testAdminCmdRunnerSignalService(t, &localAdminClient{})
}
func TestLocalAdminClientReInitFormat(t *testing.T) {
testAdminCmdRunnerReInitFormat(t, &localAdminClient{})
}
func TestLocalAdminClientServerInfo(t *testing.T) {
testAdminCmdRunnerServerInfo(t, &localAdminClient{})
}
func TestLocalAdminClientGetConfig(t *testing.T) {
testAdminCmdRunnerGetConfig(t, &localAdminClient{})
}

View File

@ -90,6 +90,60 @@ func (sys *NotificationSys) DeleteBucket(ctx context.Context, bucketName string)
}()
}
// ReloadFormat - calls ReloadFormat RPC call on all peers.
func (sys *NotificationSys) ReloadFormat(dryRun bool) map[xnet.Host]error {
errors := make(map[xnet.Host]error)
var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap {
wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) {
defer wg.Done()
// Try to load format in three attempts, before giving up.
for i := 0; i < 3; i++ {
err := client.ReloadFormat(dryRun)
if err == nil {
break
}
errors[addr] = err
// Wait for one second and no need wait after last attempt.
if i < 2 {
time.Sleep(1 * time.Second)
}
}
}(addr, client)
}
wg.Wait()
return errors
}
// LoadUsers - calls LoadUsers RPC call on all peers.
func (sys *NotificationSys) LoadUsers() map[xnet.Host]error {
errors := make(map[xnet.Host]error)
var wg sync.WaitGroup
for addr, client := range sys.peerRPCClientMap {
wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) {
defer wg.Done()
// Try to load users in three attempts.
for i := 0; i < 3; i++ {
err := client.LoadUsers()
if err == nil {
break
}
errors[addr] = err
// Wait for one second and no need wait after last attempt.
if i < 2 {
time.Sleep(1 * time.Second)
}
}
}(addr, client)
}
wg.Wait()
return errors
}
// LoadCredentials - calls LoadCredentials RPC call on all peers.
func (sys *NotificationSys) LoadCredentials() map[xnet.Host]error {
errors := make(map[xnet.Host]error)
@ -98,7 +152,7 @@ func (sys *NotificationSys) LoadCredentials() map[xnet.Host]error {
wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) {
defer wg.Done()
// Try to set credentials in three attempts.
// Try to load credentials in three attempts.
for i := 0; i < 3; i++ {
err := client.LoadCredentials()
if err == nil {

View File

@ -115,12 +115,30 @@ func (rpcClient *PeerRPCClient) SendEvent(bucketName string, targetID, remoteTar
return err
}
// ReloadFormat - calls reload format RPC.
func (rpcClient *PeerRPCClient) ReloadFormat(dryRun bool) error {
args := ReloadFormatArgs{
DryRun: dryRun,
}
reply := VoidReply{}
return rpcClient.Call(peerServiceName+".ReloadFormat", &args, &reply)
}
// LoadUsers - calls load users RPC.
func (rpcClient *PeerRPCClient) LoadUsers() error {
args := AuthArgs{}
reply := VoidReply{}
return rpcClient.Call(peerServiceName+".LoadUsers", &args, &reply)
}
// LoadCredentials - calls load credentials RPC.
func (rpcClient *PeerRPCClient) LoadCredentials() error {
args := AuthArgs{}
reply := VoidReply{}
return rpcClient.Call(peerServiceName+".SetCredentials", &args, &reply)
return rpcClient.Call(peerServiceName+".LoadCredentials", &args, &reply)
}
// NewPeerRPCClient - returns new peer RPC client.

View File

@ -192,6 +192,30 @@ func (receiver *peerRPCReceiver) SendEvent(args *SendEventArgs, reply *bool) err
return nil
}
// ReloadFormatArgs - send event RPC arguments.
type ReloadFormatArgs struct {
AuthArgs
DryRun bool
}
// ReloadFormat - handles reload format RPC call, reloads latest `format.json`
func (receiver *peerRPCReceiver) ReloadFormat(args *ReloadFormatArgs, reply *VoidReply) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
return objAPI.ReloadFormat(context.Background(), args.DryRun)
}
// LoadUsers - handles load users RPC call.
func (receiver *peerRPCReceiver) LoadUsers(args *AuthArgs, reply *VoidReply) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
return globalIAMSys.Load(objAPI)
}
// LoadCredentials - handles load credentials RPC call.
func (receiver *peerRPCReceiver) LoadCredentials(args *AuthArgs, reply *VoidReply) error {
objAPI := newObjectLayerFn()