Add bucket metadata state client/handler (Fixes #3022) (#3152)

- Adds an interface to update in-memory bucket metadata state called
  BucketMetaState - this interface has functions to:
     - update bucket notification configuration,
     - bucket listener configuration,
     - bucket policy configuration, and
     - send bucket event

- This interface is implemented by `localBMS` a type for manipulating
  local node in-memory bucket metadata, and by `remoteBMS` a type for
  manipulating remote node in-memory bucket metadata.

- The remote node interface, makes an RPC call, but the local node
  interface does not - it updates in-memory bucket state directly.

- Rename mkPeersFromEndpoints to makeS3Peers and refactored it.

- Use arrayslice instead of map in s3Peers struct

- `s3Peers.SendUpdate` now receives an arrayslice of peer indexes to
  send the request to, with a special nil value slice indicating that
  all peers should be sent the update.

- `s3Peers.SendUpdate` now returns an arrayslice of errors, representing
  errors from peers when sending an update. The array positions
  correspond to peer array s3Peers.peers

Improve globalS3Peers:

- Make isDistXL a global `globalIsDistXL` and remove from s3Peers

- Make globalS3Peers an array of (address, bucket-meta-state) pairs.

- Fix code and tests.
This commit is contained in:
Aditya Manthramurthy 2016-11-07 15:09:24 -05:00 committed by Harshavardhana
parent 33c771bb3e
commit 85a5c358d8
19 changed files with 386 additions and 252 deletions

View File

@ -78,8 +78,11 @@ func (br *browserPeerAPIHandlers) SetAuthPeer(args SetAuthPeerArgs, reply *Gener
// Sends SetAuthPeer RPCs to all peers in the Minio cluster // Sends SetAuthPeer RPCs to all peers in the Minio cluster
func updateCredsOnPeers(creds credential) map[string]error { func updateCredsOnPeers(creds credential) map[string]error {
// Get list of peers (from globalS3Peers) // Get list of peer addresses (from globalS3Peers)
peers := globalS3Peers.GetPeers() peers := []string{}
for _, p := range globalS3Peers {
peers = append(peers, p.addr)
}
// Array of errors for each peer // Array of errors for each peer
errs := make([]error, len(peers)) errs := make([]error, len(peers))

152
cmd/bucket-metadata.go Normal file
View File

@ -0,0 +1,152 @@
/*
* Minio Cloud Storage, (C) 2014-2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"encoding/json"
"net/rpc"
)
// BucketMetaState - Interface to update bucket metadata in-memory
// state.
type BucketMetaState interface {
// Updates bucket notification
UpdateBucketNotification(args *SetBNPArgs) error
// Updates bucket listener
UpdateBucketListener(args *SetBLPArgs) error
// Updates bucket policy
UpdateBucketPolicy(args *SetBPPArgs) error
// Sends event
SendEvent(args *EventArgs) error
}
// Type that implements BucketMetaState for local node.
type localBMS struct {
ObjectAPI func() ObjectLayer
}
// localBMS.UpdateBucketNotification - updates in-memory global bucket
// notification info.
func (lc *localBMS) UpdateBucketNotification(args *SetBNPArgs) error {
// check if object layer is available.
objAPI := lc.ObjectAPI()
if objAPI == nil {
return errServerNotInitialized
}
globalEventNotifier.SetBucketNotificationConfig(args.Bucket, args.NCfg)
return nil
}
// localBMS.UpdateBucketListener - updates in-memory global bucket
// listeners info.
func (lc *localBMS) UpdateBucketListener(args *SetBLPArgs) error {
// check if object layer is available.
objAPI := lc.ObjectAPI()
if objAPI == nil {
return errServerNotInitialized
}
// Update in-memory notification config.
return globalEventNotifier.SetBucketListenerConfig(args.Bucket, args.LCfg)
}
// localBMS.UpdateBucketPolicy - updates in-memory global bucket
// policy info.
func (lc *localBMS) UpdateBucketPolicy(args *SetBPPArgs) error {
// check if object layer is available.
objAPI := lc.ObjectAPI()
if objAPI == nil {
return errServerNotInitialized
}
var pCh policyChange
if err := json.Unmarshal(args.PChBytes, &pCh); err != nil {
return err
}
return globalBucketPolicies.SetBucketPolicy(args.Bucket, pCh)
}
// localBMS.SendEvent - sends event to local event notifier via
// `globalEventNotifier`
func (lc *localBMS) SendEvent(args *EventArgs) error {
// check if object layer is available.
objAPI := lc.ObjectAPI()
if objAPI == nil {
return errServerNotInitialized
}
return globalEventNotifier.SendListenerEvent(args.Arn, args.Event)
}
// Type that implements BucketMetaState for remote node.
type remoteBMS struct {
*AuthRPCClient
}
// remoteBMS.UpdateBucketNotification - sends bucket notification
// change to remote peer via RPC call.
func (rc *remoteBMS) UpdateBucketNotification(args *SetBNPArgs) error {
reply := GenericReply{}
err := rc.Call("S3.SetBucketNotificationPeer", args, &reply)
// Check for network error and retry once.
if err != nil && err.Error() == rpc.ErrShutdown.Error() {
err = rc.Call("S3.SetBucketNotificationPeer", args, &reply)
}
return err
}
// remoteBMS.UpdateBucketListener - sends bucket listener change to
// remote peer via RPC call.
func (rc *remoteBMS) UpdateBucketListener(args *SetBLPArgs) error {
reply := GenericReply{}
err := rc.Call("S3.SetBucketListenerPeer", args, &reply)
// Check for network error and retry once.
if err != nil && err.Error() == rpc.ErrShutdown.Error() {
err = rc.Call("S3.SetBucketListenerPeer", args, &reply)
}
return err
}
// remoteBMS.UpdateBucketPolicy - sends bucket policy change to remote
// peer via RPC call.
func (rc *remoteBMS) UpdateBucketPolicy(args *SetBPPArgs) error {
reply := GenericReply{}
err := rc.Call("S3.SetBucketPolicyPeer", args, &reply)
// Check for network error and retry once.
if err != nil && err.Error() == rpc.ErrShutdown.Error() {
err = rc.Call("S3.SetBucketPolicyPeer", args, &reply)
}
return err
}
// remoteBMS.SendEvent - sends event for bucket listener to remote
// peer via RPC call.
func (rc *remoteBMS) SendEvent(args *EventArgs) error {
reply := GenericReply{}
err := rc.Call("S3.Event", args, &reply)
// Check for network error and retry once.
if err != nil && err.Error() == rpc.ErrShutdown.Error() {
err = rc.Call("S3.Event", args, &reply)
}
return err
}

View File

@ -380,7 +380,7 @@ func AddBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI ObjectL
defer nsMutex.Unlock(bucket, "", opsID) defer nsMutex.Unlock(bucket, "", opsID)
// update persistent config if dist XL // update persistent config if dist XL
if globalS3Peers.isDistXL { if globalIsDistXL {
err := persistListenerConfig(bucket, listenerCfgs, objAPI) err := persistListenerConfig(bucket, listenerCfgs, objAPI)
if err != nil { if err != nil {
errorIf(err, "Error persisting listener config when adding a listener.") errorIf(err, "Error persisting listener config when adding a listener.")
@ -422,7 +422,7 @@ func RemoveBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI Obje
defer nsMutex.Unlock(bucket, "", opsID) defer nsMutex.Unlock(bucket, "", opsID)
// update persistent config if dist XL // update persistent config if dist XL
if globalS3Peers.isDistXL { if globalIsDistXL {
err := persistListenerConfig(bucket, updatedLcfgs, objAPI) err := persistListenerConfig(bucket, updatedLcfgs, objAPI)
if err != nil { if err != nil {
errorIf(err, "Error persisting listener config when removing a listener.") errorIf(err, "Error persisting listener config when removing a listener.")

View File

@ -30,7 +30,7 @@ const (
// Initializes remote control clients for making remote requests. // Initializes remote control clients for making remote requests.
func initRemoteControlClients(srvCmdConfig serverCmdConfig) []*AuthRPCClient { func initRemoteControlClients(srvCmdConfig serverCmdConfig) []*AuthRPCClient {
if !srvCmdConfig.isDistXL { if !globalIsDistXL {
return nil return nil
} }
// Initialize auth rpc clients. // Initialize auth rpc clients.
@ -72,7 +72,7 @@ func registerControlRPCRouter(mux *router.Router, srvCmdConfig serverCmdConfig)
// Initialize Control. // Initialize Control.
ctrlHandlers := &controlAPIHandlers{ ctrlHandlers := &controlAPIHandlers{
ObjectAPI: newObjectLayerFn, ObjectAPI: newObjectLayerFn,
IsXL: srvCmdConfig.isDistXL || len(srvCmdConfig.storageDisks) > 1, IsXL: globalIsDistXL || len(srvCmdConfig.storageDisks) > 1,
RemoteControls: initRemoteControlClients(srvCmdConfig), RemoteControls: initRemoteControlClients(srvCmdConfig),
LocalNode: getLocalAddress(srvCmdConfig), LocalNode: getLocalAddress(srvCmdConfig),
StorageDisks: srvCmdConfig.storageDisks, StorageDisks: srvCmdConfig.storageDisks,

View File

@ -30,20 +30,20 @@ func TestInitRemoteControlClients(t *testing.T) {
defer removeAll(rootPath) defer removeAll(rootPath)
testCases := []struct { testCases := []struct {
isDistXL bool
srvCmdConfig serverCmdConfig srvCmdConfig serverCmdConfig
totalClients int totalClients int
}{ }{
// Test - 1 no allocation if server config is not distributed XL. // Test - 1 no allocation if server config is not distributed XL.
{ {
srvCmdConfig: serverCmdConfig{ isDistXL: false,
isDistXL: false, srvCmdConfig: serverCmdConfig{},
},
totalClients: 0, totalClients: 0,
}, },
// Test - 2 two clients allocated with 4 disks with 2 disks on same node each. // Test - 2 two clients allocated with 4 disks with 2 disks on same node each.
{ {
isDistXL: true,
srvCmdConfig: serverCmdConfig{ srvCmdConfig: serverCmdConfig{
isDistXL: true,
endpoints: []*url.URL{{ endpoints: []*url.URL{{
Scheme: "http", Scheme: "http",
Host: "10.1.10.1:9000", Host: "10.1.10.1:9000",
@ -63,8 +63,8 @@ func TestInitRemoteControlClients(t *testing.T) {
}, },
// Test - 3 4 clients allocated with 4 disks with 1 disk on each node. // Test - 3 4 clients allocated with 4 disks with 1 disk on each node.
{ {
isDistXL: true,
srvCmdConfig: serverCmdConfig{ srvCmdConfig: serverCmdConfig{
isDistXL: true,
endpoints: []*url.URL{{ endpoints: []*url.URL{{
Scheme: "http", Scheme: "http",
Host: "10.1.10.1:9000", Path: "/mnt/disk1", Host: "10.1.10.1:9000", Path: "/mnt/disk1",
@ -85,6 +85,7 @@ func TestInitRemoteControlClients(t *testing.T) {
// Evaluate and validate all test cases. // Evaluate and validate all test cases.
for i, testCase := range testCases { for i, testCase := range testCases {
globalIsDistXL = testCase.isDistXL
rclients := initRemoteControlClients(testCase.srvCmdConfig) rclients := initRemoteControlClients(testCase.srvCmdConfig)
if len(rclients) != testCase.totalClients { if len(rclients) != testCase.totalClients {
t.Errorf("Test %d, Expected %d, got %d RPC clients.", i+1, testCase.totalClients, len(rclients)) t.Errorf("Test %d, Expected %d, got %d RPC clients.", i+1, testCase.totalClients, len(rclients))

View File

@ -350,7 +350,7 @@ func loadListenerConfig(bucket string, objAPI ObjectLayer) ([]listenerConfig, er
// in single node mode, there are no peers, so in this case // in single node mode, there are no peers, so in this case
// there is no configuration to load, as any previously // there is no configuration to load, as any previously
// connected listen clients have been disconnected // connected listen clients have been disconnected
if !globalS3Peers.isDistXL { if !globalIsDistXL {
return nil, nil return nil, nil
} }

View File

@ -291,7 +291,7 @@ func TestInitEventNotifier(t *testing.T) {
// needed to load listener config from disk for testing (in // needed to load listener config from disk for testing (in
// single peer mode, the listener config is ingored, but here // single peer mode, the listener config is ingored, but here
// we want to test the loading from disk too.) // we want to test the loading from disk too.)
globalS3Peers.isDistXL = true globalIsDistXL = true
// test event notifier init // test event notifier init
if err := initEventNotifier(obj); err != nil { if err := initEventNotifier(obj); err != nil {
@ -366,7 +366,7 @@ func TestListenBucketNotification(t *testing.T) {
// needed to load listener config from disk for testing (in // needed to load listener config from disk for testing (in
// single peer mode, the listener config is ingored, but here // single peer mode, the listener config is ingored, but here
// we want to test the loading from disk too.) // we want to test the loading from disk too.)
globalS3Peers.isDistXL = true globalIsDistXL = true
// Init event notifier // Init event notifier
if err := initEventNotifier(obj); err != nil { if err := initEventNotifier(obj); err != nil {

View File

@ -41,7 +41,8 @@ const (
) )
var ( var (
globalQuiet = false // Quiet flag set via command line globalQuiet = false // Quiet flag set via command line
globalIsDistXL = false // "Is Distributed?" flag.
// Add new global flags here. // Add new global flags here.

View File

@ -447,13 +447,14 @@ func TestLockServers(t *testing.T) {
} }
globalMinioHost = "" globalMinioHost = ""
testCases := []struct { testCases := []struct {
isDistXL bool
srvCmdConfig serverCmdConfig srvCmdConfig serverCmdConfig
totalLockServers int totalLockServers int
}{ }{
// Test - 1 one lock server initialized. // Test - 1 one lock server initialized.
{ {
isDistXL: true,
srvCmdConfig: serverCmdConfig{ srvCmdConfig: serverCmdConfig{
isDistXL: true,
endpoints: []*url.URL{{ endpoints: []*url.URL{{
Scheme: "http", Scheme: "http",
Host: "localhost:9000", Host: "localhost:9000",
@ -476,8 +477,8 @@ func TestLockServers(t *testing.T) {
}, },
// Test - 2 two servers possible, 1 ignored. // Test - 2 two servers possible, 1 ignored.
{ {
isDistXL: true,
srvCmdConfig: serverCmdConfig{ srvCmdConfig: serverCmdConfig{
isDistXL: true,
endpoints: []*url.URL{{ endpoints: []*url.URL{{
Scheme: "http", Scheme: "http",
Host: "localhost:9000", Host: "localhost:9000",
@ -507,6 +508,7 @@ func TestLockServers(t *testing.T) {
// Validates lock server initialization. // Validates lock server initialization.
for i, testCase := range testCases { for i, testCase := range testCases {
globalIsDistXL = testCase.isDistXL
lockServers := newLockServers(testCase.srvCmdConfig) lockServers := newLockServers(testCase.srvCmdConfig)
if len(lockServers) != testCase.totalLockServers { if len(lockServers) != testCase.totalLockServers {
t.Fatalf("Test %d: Expected total %d, got %d", i+1, testCase.totalLockServers, len(lockServers)) t.Fatalf("Test %d: Expected total %d, got %d", i+1, testCase.totalLockServers, len(lockServers))

View File

@ -27,6 +27,7 @@ import (
type listenerConn struct { type listenerConn struct {
TargetAddr string TargetAddr string
ListenerARN string ListenerARN string
BMSClient BucketMetaState
} }
type listenerLogger struct { type listenerLogger struct {
@ -35,7 +36,8 @@ type listenerLogger struct {
} }
func newListenerLogger(listenerArn, targetAddr string) (*listenerLogger, error) { func newListenerLogger(listenerArn, targetAddr string) (*listenerLogger, error) {
if globalS3Peers.GetPeerClient(targetAddr) == nil { bmsClient := globalS3Peers.GetPeerClient(targetAddr)
if bmsClient == nil {
return nil, fmt.Errorf( return nil, fmt.Errorf(
"Peer %s was not initialized - bug!", "Peer %s was not initialized - bug!",
targetAddr, targetAddr,
@ -44,6 +46,7 @@ func newListenerLogger(listenerArn, targetAddr string) (*listenerLogger, error)
lc := listenerConn{ lc := listenerConn{
TargetAddr: targetAddr, TargetAddr: targetAddr,
ListenerARN: listenerArn, ListenerARN: listenerArn,
BMSClient: bmsClient,
} }
lcLog := logrus.New() lcLog := logrus.New()
@ -66,21 +69,14 @@ func (lc listenerConn) Fire(entry *logrus.Entry) error {
return nil return nil
} }
// Fetch peer client object
client := globalS3Peers.GetPeerClient(lc.TargetAddr)
if client == nil {
return fmt.Errorf("Target %s client RPC object not available!", lc.TargetAddr)
}
// Send Event RPC call and return error // Send Event RPC call and return error
arg := EventArgs{Event: notificationEvent, Arn: lc.ListenerARN} arg := EventArgs{Event: notificationEvent, Arn: lc.ListenerARN}
reply := GenericReply{} err := lc.BMSClient.SendEvent(&arg)
err := client.Call("S3.Event", &arg, &reply)
// In case connection is shutdown, retry once. // In case connection is shutdown, retry once.
if err != nil { if err != nil {
if err.Error() == rpc.ErrShutdown.Error() { if err.Error() == rpc.ErrShutdown.Error() {
err = client.Call("S3.Event", &arg, &reply) err = lc.BMSClient.SendEvent(&arg)
} }
} }
return err return err

View File

@ -83,7 +83,7 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) (http.Handler, error)
mux := router.NewRouter() mux := router.NewRouter()
// Initialize distributed NS lock. // Initialize distributed NS lock.
if srvCmdConfig.isDistXL { if globalIsDistXL {
// Register storage rpc router only if its a distributed setup. // Register storage rpc router only if its a distributed setup.
err := registerStorageRPCRouters(mux, srvCmdConfig) err := registerStorageRPCRouters(mux, srvCmdConfig)
if err != nil { if err != nil {

View File

@ -19,179 +19,168 @@ package cmd
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/rpc"
"net/url" "net/url"
"path" "path"
"sync" "sync"
"time"
) )
type s3Peers struct { // s3Peer structs contains the address of a peer in the cluster, and
// A map of peer server address (in `host:port` format) to RPC // its BucketMetaState interface objects.
// client connections. type s3Peer struct {
rpcClients map[string]*AuthRPCClient // address in `host:port` format
addr string
mutex *sync.RWMutex // BucketMetaState client interface
bmsClient BucketMetaState
// Is single-node?
isDistXL bool
// Slice of all peer addresses (in `host:port` format).
peers []string
} }
func initGlobalS3Peers(eps []*url.URL) { // type representing all peers in the cluster
// Get list of de-duplicated peers. type s3Peers []s3Peer
peers := getAllPeers(eps)
// Initialize global state. // makeS3Peers makes an s3Peers struct value from the given urls
globalS3Peers = s3Peers{ // slice. The urls slice is assumed to be non-empty and free of nil
rpcClients: make(map[string]*AuthRPCClient), // values.
mutex: &sync.RWMutex{}, func makeS3Peers(eps []*url.URL) s3Peers {
} var ret []s3Peer
// Initialize each peer connection. // map to store peers that are already added to ret
for _, peer := range peers { seenAddr := make(map[string]bool)
globalS3Peers.InitS3PeerClient(peer)
}
// Save new peers // add local (self) as peer in the array
globalS3Peers.peers = peers ret = append(ret, s3Peer{
globalMinioAddr,
&localBMS{ObjectAPI: newObjectLayerFn},
})
seenAddr[globalMinioAddr] = true
// store if this is a distributed setup or not. // iterate over endpoints to find new remote peers and add
globalS3Peers.isDistXL = len(globalS3Peers.peers) > 1 // them to ret.
} for _, ep := range eps {
if ep.Host == "" {
continue
}
func (s3p *s3Peers) GetPeers() []string { // Check if the remote host has been added already
return s3p.peers if !seenAddr[ep.Host] {
} cfg := authConfig{
accessKey: serverConfig.GetCredential().AccessKeyID,
secretKey: serverConfig.GetCredential().SecretAccessKey,
address: ep.Host,
secureConn: isSSL(),
path: path.Join(reservedBucket, s3Path),
loginMethod: "S3.LoginHandler",
}
func (s3p *s3Peers) GetPeerClient(peer string) *AuthRPCClient { ret = append(ret, s3Peer{
// Take a read lock ep.Host,
s3p.mutex.RLock() &remoteBMS{newAuthClient(&cfg)},
defer s3p.mutex.RUnlock() })
return s3p.rpcClients[peer] seenAddr[ep.Host] = true
} }
}
// Initializes a new RPC connection (or closes and re-opens if it
// already exists) to a peer. Note that peer address is in `host:port` return ret
// format. }
func (s3p *s3Peers) InitS3PeerClient(peer string) {
// Take a write lock // initGlobalS3Peers - initialize globalS3Peers by passing in
s3p.mutex.Lock() // endpoints - intended to be called early in program start-up.
defer s3p.mutex.Unlock() func initGlobalS3Peers(eps []*url.URL) {
globalS3Peers = makeS3Peers(eps)
if s3p.rpcClients[peer] != nil { }
_ = s3p.rpcClients[peer].Close()
delete(s3p.rpcClients, peer) // GetPeerClient - fetch BucketMetaState interface by peer address
} func (s3p s3Peers) GetPeerClient(peer string) BucketMetaState {
authCfg := &authConfig{ for _, p := range s3p {
accessKey: serverConfig.GetCredential().AccessKeyID, if p.addr == peer {
secretKey: serverConfig.GetCredential().SecretAccessKey, return p.bmsClient
address: peer,
secureConn: isSSL(),
path: path.Join(reservedBucket, s3Path),
loginMethod: "S3.LoginHandler",
}
s3p.rpcClients[peer] = newAuthClient(authCfg)
}
func (s3p *s3Peers) Close() error {
// Take a write lock
s3p.mutex.Lock()
defer s3p.mutex.Unlock()
for _, v := range s3p.rpcClients {
if err := v.Close(); err != nil {
return err
} }
} }
s3p.rpcClients = nil
s3p.peers = nil
return nil return nil
} }
// Returns the network addresses of all Minio servers in the cluster in `host:port` format. // SendUpdate sends bucket metadata updates to all given peer
func getAllPeers(eps []*url.URL) (peers []string) { // indices. The update calls are sent in parallel, and errors are
if eps == nil { // returned per peer in an array. The returned error arrayslice is
return nil // always as long as s3p.peers.addr.
} //
peers = []string{globalMinioAddr} // Starts with a default peer. // The input peerIndex slice can be nil if the update is to be sent to
for _, ep := range eps { // all peers. This is the common case.
if ep == nil { //
return nil // The updates are sent via a type implementing the BucketMetaState
} // interface. This makes sure that the local node is directly updated,
// Rest of the peers configured. // and remote nodes are updated via RPC calls.
peers = append(peers, ep.Host) func (s3p s3Peers) SendUpdate(peerIndex []int, args interface{}) []error {
}
return peers
}
// Make RPC calls with the given method and arguments to all the given // peer error array
// peers (in parallel), and collects the results. Since the methods errs := make([]error, len(s3p))
// intended for use here, have only a success or failure response, we
// do not return/inspect the `reply` parameter in the RPC call. The
// function attempts to connect to a peer only once, and returns a map
// of peer address to error response. If the error is nil, it means
// the RPC succeeded.
func (s3p *s3Peers) SendRPC(peers []string, method string, args interface {
SetToken(token string)
SetTimestamp(tstamp time.Time)
}) map[string]error {
// peer error responses array
errArr := make([]error, len(peers))
// Start a wait group and make RPC requests to peers. // Start a wait group and make RPC requests to peers.
var wg sync.WaitGroup var wg sync.WaitGroup
for i, target := range peers {
wg.Add(1) // Function that sends update to peer at `index`
go func(ix int, target string) { sendUpdateToPeer := func(index int) {
defer wg.Done() defer wg.Done()
reply := &GenericReply{} var err error
// Get RPC client object safely. // Get BMS client for peer at `index`. The index is
client := s3p.GetPeerClient(target) // already checked for being within array bounds.
var err error client := s3p[index].bmsClient
if client == nil {
err = fmt.Errorf("Requested client was not initialized - %v", // Make the appropriate bucket metadata update
target) // according to the argument type
} else { switch v := args.(type) {
err = client.Call(method, args, reply) case *SetBNPArgs:
// Check for network errors and try err = client.UpdateBucketNotification(v)
// again just once.
if err != nil { case *SetBLPArgs:
if err.Error() == rpc.ErrShutdown.Error() { err = client.UpdateBucketListener(v)
err = client.Call(method, args, reply)
} case *SetBPPArgs:
} err = client.UpdateBucketPolicy(v)
}
errArr[ix] = err default:
}(i, target) err = fmt.Errorf("Unknown arg in BucketMetaState updater - %v", args)
}
errs[index] = err
} }
// Wait for requests to complete. // Special (but common) case of peerIndex == nil, implies send
wg.Wait() // update to all peers.
if peerIndex == nil {
// Map of errors for idx := 0; idx < len(s3p); idx++ {
errsMap := make(map[string]error) wg.Add(1)
for i, errVal := range errArr { go sendUpdateToPeer(idx)
if errVal != nil { }
errsMap[peers[i]] = errVal } else {
// Send update only to given peer indices.
for _, idx := range peerIndex {
// check idx is in array bounds.
if !(idx >= 0 && idx < len(s3p)) {
errorIf(
fmt.Errorf("Bad peer index %d input to SendUpdate()", idx),
"peerIndex out of bounds",
)
continue
}
wg.Add(1)
go sendUpdateToPeer(idx)
} }
} }
return errsMap // Wait for requests to complete and return
wg.Wait()
return errs
} }
// S3PeersUpdateBucketNotification - Sends Update Bucket notification // S3PeersUpdateBucketNotification - Sends Update Bucket notification
// request to all peers. Currently we log an error and continue. // request to all peers. Currently we log an error and continue.
func S3PeersUpdateBucketNotification(bucket string, ncfg *notificationConfig) { func S3PeersUpdateBucketNotification(bucket string, ncfg *notificationConfig) {
setBNPArgs := &SetBNPArgs{Bucket: bucket, NCfg: ncfg} setBNPArgs := &SetBNPArgs{Bucket: bucket, NCfg: ncfg}
peers := globalS3Peers.GetPeers() errs := globalS3Peers.SendUpdate(nil, setBNPArgs)
errsMap := globalS3Peers.SendRPC(peers, "S3.SetBucketNotificationPeer", for idx, err := range errs {
setBNPArgs) errorIf(
for peer, err := range errsMap { err,
errorIf(err, "Error sending peer update bucket notification to %s - %v", peer, err) "Error sending update bucket notification to %s - %v",
globalS3Peers[idx].addr, err,
)
} }
} }
@ -199,11 +188,13 @@ func S3PeersUpdateBucketNotification(bucket string, ncfg *notificationConfig) {
// to all peers. Currently we log an error and continue. // to all peers. Currently we log an error and continue.
func S3PeersUpdateBucketListener(bucket string, lcfg []listenerConfig) { func S3PeersUpdateBucketListener(bucket string, lcfg []listenerConfig) {
setBLPArgs := &SetBLPArgs{Bucket: bucket, LCfg: lcfg} setBLPArgs := &SetBLPArgs{Bucket: bucket, LCfg: lcfg}
peers := globalS3Peers.GetPeers() errs := globalS3Peers.SendUpdate(nil, setBLPArgs)
errsMap := globalS3Peers.SendRPC(peers, "S3.SetBucketListenerPeer", for idx, err := range errs {
setBLPArgs) errorIf(
for peer, err := range errsMap { err,
errorIf(err, "Error sending peer update bucket listener to %s - %v", peer, err) "Error sending update bucket listener to %s - %v",
globalS3Peers[idx].addr, err,
)
} }
} }
@ -216,9 +207,12 @@ func S3PeersUpdateBucketPolicy(bucket string, pCh policyChange) {
return return
} }
setBPPArgs := &SetBPPArgs{Bucket: bucket, PChBytes: byts} setBPPArgs := &SetBPPArgs{Bucket: bucket, PChBytes: byts}
peers := globalS3Peers.GetPeers() errs := globalS3Peers.SendUpdate(nil, setBPPArgs)
errsMap := globalS3Peers.SendRPC(peers, "S3.SetBucketPolicyPeer", setBPPArgs) for idx, err := range errs {
for peer, err := range errsMap { errorIf(
errorIf(err, "Error sending peer update bucket policy to %s - %v", peer, err) err,
"Error sending update bucket policy to %s - %v",
globalS3Peers[idx].addr, err,
)
} }
} }

View File

@ -22,24 +22,42 @@ import (
"testing" "testing"
) )
// Validates getAllPeers, fetches all peers based on list of storage endpoints. // Validates makeS3Peers, fetches all peers based on list of storage
func TestGetAllPeers(t *testing.T) { // endpoints.
func TestMakeS3Peers(t *testing.T) {
// Initialize configuration
root, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("%s", err)
}
defer removeAll(root)
// test cases
testCases := []struct { testCases := []struct {
eps []*url.URL gMinioAddr string
peers []string eps []*url.URL
peers []string
}{ }{
{nil, nil}, {":9000", []*url.URL{{Path: "/mnt/disk1"}}, []string{":9000"}},
{[]*url.URL{nil}, nil}, {":9000", []*url.URL{{Host: "localhost:9001"}}, []string{":9000", "localhost:9001"}},
{[]*url.URL{{Path: "/mnt/disk1"}}, []string{globalMinioAddr, ""}}, {"m1:9000", []*url.URL{{Host: "m1:9000"}, {Host: "m2:9000"}, {Host: "m3:9000"}}, []string{"m1:9000", "m2:9000", "m3:9000"}},
{[]*url.URL{{Host: "localhost:9001"}}, []string{globalMinioAddr,
"localhost:9001",
}},
} }
getPeersHelper := func(s3p s3Peers) []string {
r := []string{}
for _, p := range s3p {
r = append(r, p.addr)
}
return r
}
// execute tests
for i, testCase := range testCases { for i, testCase := range testCases {
peers := getAllPeers(testCase.eps) globalMinioAddr = testCase.gMinioAddr
if !reflect.DeepEqual(testCase.peers, peers) { s3peers := makeS3Peers(testCase.eps)
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.peers, peers) referencePeers := getPeersHelper(s3peers)
if !reflect.DeepEqual(testCase.peers, referencePeers) {
t.Errorf("Test %d: Expected %v, got %v", i+1, testCase.peers, referencePeers)
} }
} }
} }

View File

@ -27,12 +27,14 @@ const (
) )
type s3PeerAPIHandlers struct { type s3PeerAPIHandlers struct {
ObjectAPI func() ObjectLayer *localBMS
} }
func registerS3PeerRPCRouter(mux *router.Router) error { func registerS3PeerRPCRouter(mux *router.Router) error {
s3PeerHandlers := &s3PeerAPIHandlers{ s3PeerHandlers := &s3PeerAPIHandlers{
ObjectAPI: newObjectLayerFn, &localBMS{
ObjectAPI: newObjectLayerFn,
},
} }
s3PeerRPCServer := rpc.NewServer() s3PeerRPCServer := rpc.NewServer()

View File

@ -16,10 +16,7 @@
package cmd package cmd
import ( import "time"
"encoding/json"
"time"
)
func (s3 *s3PeerAPIHandlers) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) error { func (s3 *s3PeerAPIHandlers) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) error {
jwt, err := newJWT(defaultInterNodeJWTExpiry) jwt, err := newJWT(defaultInterNodeJWTExpiry)
@ -57,16 +54,7 @@ func (s3 *s3PeerAPIHandlers) SetBucketNotificationPeer(args *SetBNPArgs, reply *
return errInvalidToken return errInvalidToken
} }
// check if object layer is available. return s3.UpdateBucketNotification(args)
objAPI := s3.ObjectAPI()
if objAPI == nil {
return errServerNotInitialized
}
// Update in-memory notification config.
globalEventNotifier.SetBucketNotificationConfig(args.Bucket, args.NCfg)
return nil
} }
// SetBLPArgs - Arguments collection to SetBucketListenerPeer RPC call // SetBLPArgs - Arguments collection to SetBucketListenerPeer RPC call
@ -80,20 +68,13 @@ type SetBLPArgs struct {
LCfg []listenerConfig LCfg []listenerConfig
} }
func (s3 *s3PeerAPIHandlers) SetBucketListenerPeer(args SetBLPArgs, reply *GenericReply) error { func (s3 *s3PeerAPIHandlers) SetBucketListenerPeer(args *SetBLPArgs, reply *GenericReply) error {
// check auth // check auth
if !isRPCTokenValid(args.Token) { if !isRPCTokenValid(args.Token) {
return errInvalidToken return errInvalidToken
} }
// check if object layer is available. return s3.UpdateBucketListener(args)
objAPI := s3.ObjectAPI()
if objAPI == nil {
return errServerNotInitialized
}
// Update in-memory notification config.
return globalEventNotifier.SetBucketListenerConfig(args.Bucket, args.LCfg)
} }
// EventArgs - Arguments collection for Event RPC call // EventArgs - Arguments collection for Event RPC call
@ -115,13 +96,7 @@ func (s3 *s3PeerAPIHandlers) Event(args *EventArgs, reply *GenericReply) error {
return errInvalidToken return errInvalidToken
} }
// check if object layer is available. return s3.SendEvent(args)
objAPI := s3.ObjectAPI()
if objAPI == nil {
return errServerNotInitialized
}
return globalEventNotifier.SendListenerEvent(args.Arn, args.Event)
} }
// SetBPPArgs - Arguments collection for SetBucketPolicyPeer RPC call // SetBPPArgs - Arguments collection for SetBucketPolicyPeer RPC call
@ -136,22 +111,11 @@ type SetBPPArgs struct {
} }
// tell receiving server to update a bucket policy // tell receiving server to update a bucket policy
func (s3 *s3PeerAPIHandlers) SetBucketPolicyPeer(args SetBPPArgs, reply *GenericReply) error { func (s3 *s3PeerAPIHandlers) SetBucketPolicyPeer(args *SetBPPArgs, reply *GenericReply) error {
// check auth // check auth
if !isRPCTokenValid(args.Token) { if !isRPCTokenValid(args.Token) {
return errInvalidToken return errInvalidToken
} }
// check if object layer is available. return s3.UpdateBucketPolicy(args)
objAPI := s3.ObjectAPI()
if objAPI == nil {
return errServerNotInitialized
}
var pCh policyChange
if err := json.Unmarshal(args.PChBytes, &pCh); err != nil {
return err
}
return globalBucketPolicies.SetBucketPolicy(args.Bucket, pCh)
} }

View File

@ -103,7 +103,6 @@ type serverCmdConfig struct {
serverAddr string serverAddr string
endpoints []*url.URL endpoints []*url.URL
ignoredEndpoints []*url.URL ignoredEndpoints []*url.URL
isDistXL bool // True only if its distributed XL.
storageDisks []StorageAPI storageDisks []StorageAPI
} }
@ -266,17 +265,16 @@ func checkSufficientDisks(eps []*url.URL) error {
} }
// Returns if slice of disks is a distributed setup. // Returns if slice of disks is a distributed setup.
func isDistributedSetup(eps []*url.URL) (isDist bool) { func isDistributedSetup(eps []*url.URL) bool {
// Validate if one the disks is not local. // Validate if one the disks is not local.
for _, ep := range eps { for _, ep := range eps {
if !isLocalStorage(ep) { if !isLocalStorage(ep) {
// One or more disks supplied as arguments are not // One or more disks supplied as arguments are
// attached to the local node. // not attached to the local node.
isDist = true return true
break
} }
} }
return isDist return false
} }
// We just exit for invalid endpoints. // We just exit for invalid endpoints.
@ -446,7 +444,7 @@ func serverMain(c *cli.Context) {
firstDisk := isLocalStorage(endpoints[0]) firstDisk := isLocalStorage(endpoints[0])
// Check if endpoints are part of distributed setup. // Check if endpoints are part of distributed setup.
isDistXL := isDistributedSetup(endpoints) globalIsDistXL = isDistributedSetup(endpoints)
// Configure server. // Configure server.
srvConfig := serverCmdConfig{ srvConfig := serverCmdConfig{
@ -454,7 +452,6 @@ func serverMain(c *cli.Context) {
endpoints: endpoints, endpoints: endpoints,
ignoredEndpoints: ignoredEndpoints, ignoredEndpoints: ignoredEndpoints,
storageDisks: storageDisks, storageDisks: storageDisks,
isDistXL: isDistXL,
} }
// Configure server. // Configure server.
@ -462,12 +459,12 @@ func serverMain(c *cli.Context) {
fatalIf(err, "Unable to configure one of server's RPC services.") fatalIf(err, "Unable to configure one of server's RPC services.")
// Set nodes for dsync for distributed setup. // Set nodes for dsync for distributed setup.
if isDistXL { if globalIsDistXL {
fatalIf(initDsyncNodes(endpoints), "Unable to initialize distributed locking") fatalIf(initDsyncNodes(endpoints), "Unable to initialize distributed locking")
} }
// Initialize name space lock. // Initialize name space lock.
initNSLock(isDistXL) initNSLock(globalIsDistXL)
// Initialize a new HTTP server. // Initialize a new HTTP server.
apiServer := NewServerMux(serverAddr, handler) apiServer := NewServerMux(serverAddr, handler)

View File

@ -411,8 +411,10 @@ func StartTestPeersRPCServer(t TestErrHandler, instanceType string) TestServer {
// Run TestServer. // Run TestServer.
testRPCServer.Server = httptest.NewServer(mux) testRPCServer.Server = httptest.NewServer(mux)
// Set as non-distributed.
globalIsDistXL = false
// initialize remainder of serverCmdConfig // initialize remainder of serverCmdConfig
srvCfg.isDistXL = false
testRPCServer.SrvCmdCfg = srvCfg testRPCServer.SrvCmdCfg = srvCfg
return testRPCServer return testRPCServer

View File

@ -81,7 +81,7 @@ func checkDuplicateEndpoints(endpoints []*url.URL) error {
// Find local node through the command line arguments. Returns in `host:port` format. // Find local node through the command line arguments. Returns in `host:port` format.
func getLocalAddress(srvCmdConfig serverCmdConfig) string { func getLocalAddress(srvCmdConfig serverCmdConfig) string {
if !srvCmdConfig.isDistXL { if !globalIsDistXL {
return srvCmdConfig.serverAddr return srvCmdConfig.serverAddr
} }
for _, ep := range srvCmdConfig.endpoints { for _, ep := range srvCmdConfig.endpoints {

View File

@ -229,13 +229,14 @@ func TestLocalAddress(t *testing.T) {
globalMinioPort = "9000" globalMinioPort = "9000"
globalMinioHost = "" globalMinioHost = ""
testCases := []struct { testCases := []struct {
isDistXL bool
srvCmdConfig serverCmdConfig srvCmdConfig serverCmdConfig
localAddr string localAddr string
}{ }{
// Test 1 - local address is found. // Test 1 - local address is found.
{ {
isDistXL: true,
srvCmdConfig: serverCmdConfig{ srvCmdConfig: serverCmdConfig{
isDistXL: true,
endpoints: []*url.URL{{ endpoints: []*url.URL{{
Scheme: "http", Scheme: "http",
Host: "localhost:9000", Host: "localhost:9000",
@ -258,9 +259,9 @@ func TestLocalAddress(t *testing.T) {
}, },
// Test 2 - local address is everything. // Test 2 - local address is everything.
{ {
isDistXL: false,
srvCmdConfig: serverCmdConfig{ srvCmdConfig: serverCmdConfig{
serverAddr: net.JoinHostPort("", globalMinioPort), serverAddr: net.JoinHostPort("", globalMinioPort),
isDistXL: false,
endpoints: []*url.URL{{ endpoints: []*url.URL{{
Path: "/mnt/disk1", Path: "/mnt/disk1",
}, { }, {
@ -275,8 +276,8 @@ func TestLocalAddress(t *testing.T) {
}, },
// Test 3 - local address is not found. // Test 3 - local address is not found.
{ {
isDistXL: true,
srvCmdConfig: serverCmdConfig{ srvCmdConfig: serverCmdConfig{
isDistXL: true,
endpoints: []*url.URL{{ endpoints: []*url.URL{{
Scheme: "http", Scheme: "http",
Host: "1.1.1.1:9000", Host: "1.1.1.1:9000",
@ -301,9 +302,9 @@ func TestLocalAddress(t *testing.T) {
// name is specified in the --address option on the // name is specified in the --address option on the
// server command line. // server command line.
{ {
isDistXL: false,
srvCmdConfig: serverCmdConfig{ srvCmdConfig: serverCmdConfig{
serverAddr: "play.minio.io:9000", serverAddr: "play.minio.io:9000",
isDistXL: false,
endpoints: []*url.URL{{ endpoints: []*url.URL{{
Path: "/mnt/disk1", Path: "/mnt/disk1",
}, { }, {
@ -320,6 +321,7 @@ func TestLocalAddress(t *testing.T) {
// Validates fetching local address. // Validates fetching local address.
for i, testCase := range testCases { for i, testCase := range testCases {
globalIsDistXL = testCase.isDistXL
localAddr := getLocalAddress(testCase.srvCmdConfig) localAddr := getLocalAddress(testCase.srvCmdConfig)
if localAddr != testCase.localAddr { if localAddr != testCase.localAddr {
t.Fatalf("Test %d: Expected %s, got %s", i+1, testCase.localAddr, localAddr) t.Fatalf("Test %d: Expected %s, got %s", i+1, testCase.localAddr, localAddr)