mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
30dc11a931
In FS or single-node XL mode, there is no need to save listener configuration to persistent storage. As there is only one server, if it is restarted, any connected listenBucketAPI clients were disconnected and will have to reconnect - so there is nothing to actually store. This incidentally solves #3052 by avoiding the problem.
223 lines
6.1 KiB
Go
223 lines
6.1 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2014-2016 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/rpc"
|
|
"path"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type s3Peers struct {
|
|
// A map of peer server address (in `host:port` format) to RPC
|
|
// client connections.
|
|
rpcClients map[string]*AuthRPCClient
|
|
|
|
mutex *sync.RWMutex
|
|
|
|
// Is single-node?
|
|
isDistXL bool
|
|
|
|
// Slice of all peer addresses (in `host:port` format).
|
|
peers []string
|
|
}
|
|
|
|
func initGlobalS3Peers(eps []storageEndPoint) {
|
|
// Get list of de-duplicated peers.
|
|
peers := getAllPeers(eps)
|
|
|
|
// Initialize global state.
|
|
globalS3Peers = s3Peers{
|
|
rpcClients: make(map[string]*AuthRPCClient),
|
|
mutex: &sync.RWMutex{},
|
|
}
|
|
|
|
// Initialize each peer connection.
|
|
for _, peer := range peers {
|
|
globalS3Peers.InitS3PeerClient(peer)
|
|
}
|
|
|
|
// Save new peers
|
|
globalS3Peers.peers = peers
|
|
|
|
// store if this is a distributed setup or not.
|
|
globalS3Peers.isDistXL = len(globalS3Peers.peers) > 1
|
|
}
|
|
|
|
func (s3p *s3Peers) GetPeers() []string {
|
|
return s3p.peers
|
|
}
|
|
|
|
func (s3p *s3Peers) GetPeerClient(peer string) *AuthRPCClient {
|
|
// Take a read lock
|
|
s3p.mutex.RLock()
|
|
defer s3p.mutex.RUnlock()
|
|
return s3p.rpcClients[peer]
|
|
}
|
|
|
|
// Initializes a new RPC connection (or closes and re-opens if it
|
|
// already exists) to a peer. Note that peer address is in `host:port`
|
|
// format.
|
|
func (s3p *s3Peers) InitS3PeerClient(peer string) {
|
|
// Take a write lock
|
|
s3p.mutex.Lock()
|
|
defer s3p.mutex.Unlock()
|
|
|
|
if s3p.rpcClients[peer] != nil {
|
|
_ = s3p.rpcClients[peer].Close()
|
|
delete(s3p.rpcClients, peer)
|
|
}
|
|
authCfg := &authConfig{
|
|
accessKey: serverConfig.GetCredential().AccessKeyID,
|
|
secretKey: serverConfig.GetCredential().SecretAccessKey,
|
|
address: peer,
|
|
secureConn: isSSL(),
|
|
path: path.Join(reservedBucket, s3Path),
|
|
loginMethod: "S3.LoginHandler",
|
|
}
|
|
s3p.rpcClients[peer] = newAuthClient(authCfg)
|
|
}
|
|
|
|
func (s3p *s3Peers) Close() error {
|
|
// Take a write lock
|
|
s3p.mutex.Lock()
|
|
defer s3p.mutex.Unlock()
|
|
|
|
for _, v := range s3p.rpcClients {
|
|
if err := v.Close(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
s3p.rpcClients = nil
|
|
s3p.peers = nil
|
|
return nil
|
|
}
|
|
|
|
// Returns the network addresses of all Minio servers in the cluster in `host:port` format.
|
|
func getAllPeers(eps []storageEndPoint) (peers []string) {
|
|
if eps == nil {
|
|
return nil
|
|
}
|
|
peers = []string{globalMinioAddr} // Starts with a default peer.
|
|
for _, ep := range eps {
|
|
// Rest of the peers configured.
|
|
if ep.host != "" {
|
|
peers = append(peers, fmt.Sprintf("%s:%d", ep.host, ep.port))
|
|
}
|
|
}
|
|
return peers
|
|
}
|
|
|
|
// Make RPC calls with the given method and arguments to all the given
|
|
// peers (in parallel), and collects the results. Since the methods
|
|
// intended for use here, have only a success or failure response, we
|
|
// do not return/inspect the `reply` parameter in the RPC call. The
|
|
// function attempts to connect to a peer only once, and returns a map
|
|
// of peer address to error response. If the error is nil, it means
|
|
// the RPC succeeded.
|
|
func (s3p *s3Peers) SendRPC(peers []string, method string, args interface {
|
|
SetToken(token string)
|
|
SetTimestamp(tstamp time.Time)
|
|
}) map[string]error {
|
|
|
|
// peer error responses array
|
|
errArr := make([]error, len(peers))
|
|
|
|
// Start a wait group and make RPC requests to peers.
|
|
var wg sync.WaitGroup
|
|
for i, target := range peers {
|
|
wg.Add(1)
|
|
go func(ix int, target string) {
|
|
defer wg.Done()
|
|
reply := &GenericReply{}
|
|
// Get RPC client object safely.
|
|
client := s3p.GetPeerClient(target)
|
|
var err error
|
|
if client == nil {
|
|
err = fmt.Errorf("Requested client was not initialized - %v",
|
|
target)
|
|
} else {
|
|
err = client.Call(method, args, reply)
|
|
// Check for network errors and try
|
|
// again just once.
|
|
if err != nil {
|
|
if err.Error() == rpc.ErrShutdown.Error() {
|
|
err = client.Call(method, args, reply)
|
|
}
|
|
}
|
|
}
|
|
errArr[ix] = err
|
|
}(i, target)
|
|
}
|
|
|
|
// Wait for requests to complete.
|
|
wg.Wait()
|
|
|
|
// Map of errors
|
|
errsMap := make(map[string]error)
|
|
for i, errVal := range errArr {
|
|
if errVal != nil {
|
|
errsMap[peers[i]] = errVal
|
|
}
|
|
}
|
|
|
|
return errsMap
|
|
}
|
|
|
|
// S3PeersUpdateBucketNotification - Sends Update Bucket notification
|
|
// request to all peers. Currently we log an error and continue.
|
|
func S3PeersUpdateBucketNotification(bucket string, ncfg *notificationConfig) {
|
|
setBNPArgs := &SetBNPArgs{Bucket: bucket, NCfg: ncfg}
|
|
peers := globalS3Peers.GetPeers()
|
|
errsMap := globalS3Peers.SendRPC(peers, "S3.SetBucketNotificationPeer",
|
|
setBNPArgs)
|
|
for peer, err := range errsMap {
|
|
errorIf(err, "Error sending peer update bucket notification to %s - %v", peer, err)
|
|
}
|
|
}
|
|
|
|
// S3PeersUpdateBucketListener - Sends Update Bucket listeners request
|
|
// to all peers. Currently we log an error and continue.
|
|
func S3PeersUpdateBucketListener(bucket string, lcfg []listenerConfig) {
|
|
setBLPArgs := &SetBLPArgs{Bucket: bucket, LCfg: lcfg}
|
|
peers := globalS3Peers.GetPeers()
|
|
errsMap := globalS3Peers.SendRPC(peers, "S3.SetBucketListenerPeer",
|
|
setBLPArgs)
|
|
for peer, err := range errsMap {
|
|
errorIf(err, "Error sending peer update bucket listener to %s - %v", peer, err)
|
|
}
|
|
}
|
|
|
|
// S3PeersUpdateBucketPolicy - Sends update bucket policy request to
|
|
// all peers. Currently we log an error and continue.
|
|
func S3PeersUpdateBucketPolicy(bucket string, pCh policyChange) {
|
|
byts, err := json.Marshal(pCh)
|
|
if err != nil {
|
|
errorIf(err, "Failed to marshal policyChange - this is a BUG!")
|
|
return
|
|
}
|
|
setBPPArgs := &SetBPPArgs{Bucket: bucket, PChBytes: byts}
|
|
peers := globalS3Peers.GetPeers()
|
|
errsMap := globalS3Peers.SendRPC(peers, "S3.SetBucketPolicyPeer", setBPPArgs)
|
|
for peer, err := range errsMap {
|
|
errorIf(err, "Error sending peer update bucket policy to %s - %v", peer, err)
|
|
}
|
|
}
|