mirror of
https://github.com/minio/minio.git
synced 2025-01-23 12:43:16 -05:00
28d526bc68
During startup until the object layer is initialized logger is disabled to provide for a cleaner UI error message. CriticalIf is disabled, use FatalIf instead. Also never call os.Exit(1) on running servers where you can return error to client in handlers.
552 lines
16 KiB
Go
552 lines
16 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2014, 2015, 2016, 2017, 2018 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/minio/minio/cmd/logger"
|
|
xnet "github.com/minio/minio/pkg/net"
|
|
)
|
|
|
|
var errUnsupportedSignal = fmt.Errorf("unsupported signal: only restart and stop signals are supported")
|
|
|
|
// AdminRPCClient - admin RPC client talks to admin RPC server.
|
|
type AdminRPCClient struct {
|
|
*RPCClient
|
|
}
|
|
|
|
// SignalService - calls SignalService RPC.
|
|
func (rpcClient *AdminRPCClient) SignalService(signal serviceSignal) (err error) {
|
|
args := SignalServiceArgs{Sig: signal}
|
|
reply := VoidReply{}
|
|
|
|
return rpcClient.Call(adminServiceName+".SignalService", &args, &reply)
|
|
}
|
|
|
|
// ReInitFormat - re-initialize disk format, remotely.
|
|
func (rpcClient *AdminRPCClient) ReInitFormat(dryRun bool) error {
|
|
args := ReInitFormatArgs{DryRun: dryRun}
|
|
reply := VoidReply{}
|
|
|
|
return rpcClient.Call(adminServiceName+".ReInitFormat", &args, &reply)
|
|
}
|
|
|
|
// ListLocks - Sends list locks command to remote server via RPC.
|
|
func (rpcClient *AdminRPCClient) ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) {
|
|
args := ListLocksQuery{
|
|
Bucket: bucket,
|
|
Prefix: prefix,
|
|
Duration: duration,
|
|
}
|
|
var reply []VolumeLockInfo
|
|
|
|
err := rpcClient.Call(adminServiceName+".ListLocks", &args, &reply)
|
|
return reply, err
|
|
}
|
|
|
|
// ServerInfo - returns the server info of the server to which the RPC call is made.
|
|
func (rpcClient *AdminRPCClient) ServerInfo() (sid ServerInfoData, err error) {
|
|
err = rpcClient.Call(adminServiceName+".ServerInfo", &AuthArgs{}, &sid)
|
|
return sid, err
|
|
}
|
|
|
|
// GetConfig - returns config.json of the remote server.
|
|
func (rpcClient *AdminRPCClient) GetConfig() ([]byte, error) {
|
|
args := AuthArgs{}
|
|
var reply []byte
|
|
|
|
err := rpcClient.Call(adminServiceName+".GetConfig", &args, &reply)
|
|
return reply, err
|
|
}
|
|
|
|
// WriteTmpConfig - writes config file content to a temporary file on a remote node.
|
|
func (rpcClient *AdminRPCClient) WriteTmpConfig(tmpFileName string, configBytes []byte) error {
|
|
args := WriteConfigArgs{
|
|
TmpFileName: tmpFileName,
|
|
Buf: configBytes,
|
|
}
|
|
reply := VoidReply{}
|
|
|
|
err := rpcClient.Call(adminServiceName+".WriteTmpConfig", &args, &reply)
|
|
logger.LogIf(context.Background(), err)
|
|
return err
|
|
}
|
|
|
|
// CommitConfig - Move the new config in tmpFileName onto config.json on a remote node.
|
|
func (rpcClient *AdminRPCClient) CommitConfig(tmpFileName string) error {
|
|
args := CommitConfigArgs{FileName: tmpFileName}
|
|
reply := VoidReply{}
|
|
|
|
err := rpcClient.Call(adminServiceName+".CommitConfig", &args, &reply)
|
|
logger.LogIf(context.Background(), err)
|
|
return err
|
|
}
|
|
|
|
// NewAdminRPCClient - returns new admin RPC client.
|
|
func NewAdminRPCClient(host *xnet.Host) (*AdminRPCClient, error) {
|
|
scheme := "http"
|
|
if globalIsSSL {
|
|
scheme = "https"
|
|
}
|
|
|
|
serviceURL := &xnet.URL{
|
|
Scheme: scheme,
|
|
Host: host.String(),
|
|
Path: adminServicePath,
|
|
}
|
|
|
|
var tlsConfig *tls.Config
|
|
if globalIsSSL {
|
|
tlsConfig = &tls.Config{
|
|
ServerName: host.Name,
|
|
RootCAs: globalRootCAs,
|
|
}
|
|
}
|
|
|
|
rpcClient, err := NewRPCClient(
|
|
RPCClientArgs{
|
|
NewAuthTokenFunc: newAuthToken,
|
|
RPCVersion: globalRPCAPIVersion,
|
|
ServiceName: adminServiceName,
|
|
ServiceURL: serviceURL,
|
|
TLSConfig: tlsConfig,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &AdminRPCClient{rpcClient}, nil
|
|
}
|
|
|
|
// adminCmdRunner - abstracts local and remote execution of admin
|
|
// commands like service stop and service restart.
|
|
type adminCmdRunner interface {
|
|
SignalService(s serviceSignal) error
|
|
ReInitFormat(dryRun bool) error
|
|
ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error)
|
|
ServerInfo() (ServerInfoData, error)
|
|
GetConfig() ([]byte, error)
|
|
WriteTmpConfig(tmpFileName string, configBytes []byte) error
|
|
CommitConfig(tmpFileName string) error
|
|
}
|
|
|
|
// adminPeer - represents an entity that implements admin API RPCs.
|
|
type adminPeer struct {
|
|
addr string
|
|
cmdRunner adminCmdRunner
|
|
isLocal bool
|
|
}
|
|
|
|
// type alias for a collection of adminPeer.
|
|
type adminPeers []adminPeer
|
|
|
|
// makeAdminPeers - helper function to construct a collection of adminPeer.
|
|
func makeAdminPeers(endpoints EndpointList) (adminPeerList adminPeers) {
|
|
localAddr := GetLocalPeer(endpoints)
|
|
if strings.HasPrefix(localAddr, "127.0.0.1:") {
|
|
// Use first IPv4 instead of loopback address.
|
|
localAddr = net.JoinHostPort(sortIPs(localIP4.ToSlice())[0], globalMinioPort)
|
|
}
|
|
adminPeerList = append(adminPeerList, adminPeer{
|
|
addr: localAddr,
|
|
cmdRunner: localAdminClient{},
|
|
isLocal: true,
|
|
})
|
|
|
|
for _, hostStr := range GetRemotePeers(endpoints) {
|
|
host, err := xnet.ParseHost(hostStr)
|
|
logger.FatalIf(err, "Unable to parse Admin RPC Host", context.Background())
|
|
rpcClient, err := NewAdminRPCClient(host)
|
|
logger.FatalIf(err, "Unable to initialize Admin RPC Client", context.Background())
|
|
adminPeerList = append(adminPeerList, adminPeer{
|
|
addr: hostStr,
|
|
cmdRunner: rpcClient,
|
|
})
|
|
}
|
|
|
|
return adminPeerList
|
|
}
|
|
|
|
// peersReInitFormat - reinitialize remote object layers to new format.
|
|
func peersReInitFormat(peers adminPeers, dryRun bool) error {
|
|
errs := make([]error, len(peers))
|
|
|
|
// Send ReInitFormat RPC call to all nodes.
|
|
// for local adminPeer this is a no-op.
|
|
wg := sync.WaitGroup{}
|
|
for i, peer := range peers {
|
|
wg.Add(1)
|
|
go func(idx int, peer adminPeer) {
|
|
defer wg.Done()
|
|
if !peer.isLocal {
|
|
errs[idx] = peer.cmdRunner.ReInitFormat(dryRun)
|
|
}
|
|
}(i, peer)
|
|
}
|
|
wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
// Initialize global adminPeer collection.
|
|
func initGlobalAdminPeers(endpoints EndpointList) {
|
|
globalAdminPeers = makeAdminPeers(endpoints)
|
|
}
|
|
|
|
// invokeServiceCmd - Invoke Restart/Stop command.
|
|
func invokeServiceCmd(cp adminPeer, cmd serviceSignal) (err error) {
|
|
switch cmd {
|
|
case serviceRestart, serviceStop:
|
|
err = cp.cmdRunner.SignalService(cmd)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// sendServiceCmd - Invoke Restart command on remote peers
|
|
// adminPeer followed by on the local peer.
|
|
func sendServiceCmd(cps adminPeers, cmd serviceSignal) {
|
|
// Send service command like stop or restart to all remote nodes and finally run on local node.
|
|
errs := make([]error, len(cps))
|
|
var wg sync.WaitGroup
|
|
remotePeers := cps[1:]
|
|
for i := range remotePeers {
|
|
wg.Add(1)
|
|
go func(idx int) {
|
|
defer wg.Done()
|
|
// we use idx+1 because remotePeers slice is 1 position shifted w.r.t cps
|
|
errs[idx+1] = invokeServiceCmd(remotePeers[idx], cmd)
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
errs[0] = invokeServiceCmd(cps[0], cmd)
|
|
}
|
|
|
|
// listPeerLocksInfo - fetch list of locks held on the given bucket,
|
|
// matching prefix held longer than duration from all peer servers.
|
|
func listPeerLocksInfo(peers adminPeers, bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) {
|
|
// Used to aggregate volume lock information from all nodes.
|
|
allLocks := make([][]VolumeLockInfo, len(peers))
|
|
errs := make([]error, len(peers))
|
|
var wg sync.WaitGroup
|
|
localPeer := peers[0]
|
|
remotePeers := peers[1:]
|
|
for i, remotePeer := range remotePeers {
|
|
wg.Add(1)
|
|
go func(idx int, remotePeer adminPeer) {
|
|
defer wg.Done()
|
|
// `remotePeers` is right-shifted by one position relative to `peers`
|
|
allLocks[idx], errs[idx] = remotePeer.cmdRunner.ListLocks(bucket, prefix, duration)
|
|
}(i+1, remotePeer)
|
|
}
|
|
wg.Wait()
|
|
allLocks[0], errs[0] = localPeer.cmdRunner.ListLocks(bucket, prefix, duration)
|
|
|
|
// Summarizing errors received for ListLocks RPC across all
|
|
// nodes. N B the possible unavailability of quorum in errors
|
|
// applies only to distributed setup.
|
|
errCount, err := reduceErrs(errs, []error{})
|
|
if err != nil {
|
|
if errCount >= (len(peers)/2 + 1) {
|
|
return nil, err
|
|
}
|
|
return nil, InsufficientReadQuorum{}
|
|
}
|
|
|
|
// Group lock information across nodes by (bucket, object)
|
|
// pair. For readability only.
|
|
paramLockMap := make(map[nsParam][]VolumeLockInfo)
|
|
for _, nodeLocks := range allLocks {
|
|
for _, lockInfo := range nodeLocks {
|
|
param := nsParam{
|
|
volume: lockInfo.Bucket,
|
|
path: lockInfo.Object,
|
|
}
|
|
paramLockMap[param] = append(paramLockMap[param], lockInfo)
|
|
}
|
|
}
|
|
groupedLockInfos := []VolumeLockInfo{}
|
|
for _, volLocks := range paramLockMap {
|
|
groupedLockInfos = append(groupedLockInfos, volLocks...)
|
|
}
|
|
return groupedLockInfos, nil
|
|
}
|
|
|
|
// uptimeSlice - used to sort uptimes in chronological order.
|
|
type uptimeSlice []struct {
|
|
err error
|
|
uptime time.Duration
|
|
}
|
|
|
|
func (ts uptimeSlice) Len() int {
|
|
return len(ts)
|
|
}
|
|
|
|
func (ts uptimeSlice) Less(i, j int) bool {
|
|
return ts[i].uptime < ts[j].uptime
|
|
}
|
|
|
|
func (ts uptimeSlice) Swap(i, j int) {
|
|
ts[i], ts[j] = ts[j], ts[i]
|
|
}
|
|
|
|
// getPeerUptimes - returns the uptime since the last time read quorum
|
|
// was established on success. Otherwise returns errXLReadQuorum.
|
|
func getPeerUptimes(peers adminPeers) (time.Duration, error) {
|
|
// In a single node Erasure or FS backend setup the uptime of
|
|
// the setup is the uptime of the single minio server
|
|
// instance.
|
|
if !globalIsDistXL {
|
|
return UTCNow().Sub(globalBootTime), nil
|
|
}
|
|
|
|
uptimes := make(uptimeSlice, len(peers))
|
|
|
|
// Get up time of all servers.
|
|
wg := sync.WaitGroup{}
|
|
for i, peer := range peers {
|
|
wg.Add(1)
|
|
go func(idx int, peer adminPeer) {
|
|
defer wg.Done()
|
|
serverInfoData, rpcErr := peer.cmdRunner.ServerInfo()
|
|
uptimes[idx].uptime, uptimes[idx].err = serverInfoData.Properties.Uptime, rpcErr
|
|
}(i, peer)
|
|
}
|
|
wg.Wait()
|
|
|
|
// Sort uptimes in chronological order.
|
|
sort.Sort(uptimes)
|
|
|
|
// Pick the readQuorum'th uptime in chronological order. i.e,
|
|
// the time at which read quorum was (re-)established.
|
|
readQuorum := len(uptimes) / 2
|
|
validCount := 0
|
|
latestUptime := time.Duration(0)
|
|
for _, uptime := range uptimes {
|
|
if uptime.err != nil {
|
|
logger.LogIf(context.Background(), uptime.err)
|
|
continue
|
|
}
|
|
|
|
validCount++
|
|
if validCount >= readQuorum {
|
|
latestUptime = uptime.uptime
|
|
break
|
|
}
|
|
}
|
|
|
|
// Less than readQuorum "Admin.Uptime" RPC call returned
|
|
// successfully, so read-quorum unavailable.
|
|
if validCount < readQuorum {
|
|
return time.Duration(0), InsufficientReadQuorum{}
|
|
}
|
|
|
|
return latestUptime, nil
|
|
}
|
|
|
|
// getPeerConfig - Fetches config.json from all nodes in the setup and
|
|
// returns the one that occurs in a majority of them.
|
|
func getPeerConfig(peers adminPeers) ([]byte, error) {
|
|
if !globalIsDistXL {
|
|
return peers[0].cmdRunner.GetConfig()
|
|
}
|
|
|
|
errs := make([]error, len(peers))
|
|
configs := make([][]byte, len(peers))
|
|
|
|
// Get config from all servers.
|
|
wg := sync.WaitGroup{}
|
|
for i, peer := range peers {
|
|
wg.Add(1)
|
|
go func(idx int, peer adminPeer) {
|
|
defer wg.Done()
|
|
configs[idx], errs[idx] = peer.cmdRunner.GetConfig()
|
|
}(i, peer)
|
|
}
|
|
wg.Wait()
|
|
|
|
// Find the maximally occurring config among peers in a
|
|
// distributed setup.
|
|
|
|
serverConfigs := make([]serverConfig, len(peers))
|
|
for i, configBytes := range configs {
|
|
if errs[i] != nil {
|
|
continue
|
|
}
|
|
|
|
// Unmarshal the received config files.
|
|
err := json.Unmarshal(configBytes, &serverConfigs[i])
|
|
if err != nil {
|
|
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", peers[i].addr)
|
|
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
|
logger.LogIf(ctx, err)
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
configJSON, err := getValidServerConfig(serverConfigs, errs)
|
|
if err != nil {
|
|
logger.LogIf(context.Background(), err)
|
|
return nil, err
|
|
}
|
|
|
|
// Return the config.json that was present quorum or more
|
|
// number of disks.
|
|
return json.Marshal(configJSON)
|
|
}
|
|
|
|
// getValidServerConfig - finds the server config that is present in
|
|
// quorum or more number of servers.
|
|
func getValidServerConfig(serverConfigs []serverConfig, errs []error) (scv serverConfig, e error) {
|
|
// majority-based quorum
|
|
quorum := len(serverConfigs)/2 + 1
|
|
|
|
// Count the number of disks a config.json was found in.
|
|
configCounter := make([]int, len(serverConfigs))
|
|
|
|
// We group equal serverConfigs by the lowest index of the
|
|
// same value; e.g, let us take the following serverConfigs
|
|
// in a 4-node setup,
|
|
// serverConfigs == [c1, c2, c1, c1]
|
|
// configCounter == [3, 1, 0, 0]
|
|
// c1, c2 are the only distinct values that appear. c1 is
|
|
// identified by 0, the lowest index it appears in and c2 is
|
|
// identified by 1. So, we need to find the number of times
|
|
// each of these distinct values occur.
|
|
|
|
// Invariants:
|
|
|
|
// 1. At the beginning of the i-th iteration, the number of
|
|
// unique configurations seen so far is equal to the number of
|
|
// non-zero counter values in config[:i].
|
|
|
|
// 2. At the beginning of the i-th iteration, the sum of
|
|
// elements of configCounter[:i] is equal to the number of
|
|
// non-error configurations seen so far.
|
|
|
|
// For each of the serverConfig ...
|
|
for i := range serverConfigs {
|
|
// Skip nodes where getConfig failed.
|
|
if errs[i] != nil {
|
|
continue
|
|
}
|
|
// Check if it is equal to any of the configurations
|
|
// seen so far. If j == i is reached then we have an
|
|
// unseen configuration.
|
|
for j := 0; j <= i; j++ {
|
|
if j < i && configCounter[j] == 0 {
|
|
// serverConfigs[j] is known to be
|
|
// equal to a value that was already
|
|
// seen. See example above for
|
|
// clarity.
|
|
continue
|
|
} else if j < i && serverConfigs[i].ConfigDiff(&serverConfigs[j]) == "" {
|
|
// serverConfigs[i] is equal to
|
|
// serverConfigs[j], update
|
|
// serverConfigs[j]'s counter since it
|
|
// is the lower index.
|
|
configCounter[j]++
|
|
break
|
|
} else if j == i {
|
|
// serverConfigs[i] is equal to no
|
|
// other value seen before. It is
|
|
// unique so far.
|
|
configCounter[i] = 1
|
|
break
|
|
} // else invariants specified above are violated.
|
|
}
|
|
}
|
|
|
|
// We find the maximally occurring server config and check if
|
|
// there is quorum.
|
|
var configJSON serverConfig
|
|
maxOccurrence := 0
|
|
for i, count := range configCounter {
|
|
if maxOccurrence < count {
|
|
maxOccurrence = count
|
|
configJSON = serverConfigs[i]
|
|
}
|
|
}
|
|
|
|
// If quorum nodes don't agree.
|
|
if maxOccurrence < quorum {
|
|
return scv, errXLWriteQuorum
|
|
}
|
|
|
|
return configJSON, nil
|
|
}
|
|
|
|
// Write config contents into a temporary file on all nodes.
|
|
func writeTmpConfigPeers(peers adminPeers, tmpFileName string, configBytes []byte) []error {
|
|
// For a single-node minio server setup.
|
|
if !globalIsDistXL {
|
|
err := peers[0].cmdRunner.WriteTmpConfig(tmpFileName, configBytes)
|
|
return []error{err}
|
|
}
|
|
|
|
errs := make([]error, len(peers))
|
|
|
|
// Write config into temporary file on all nodes.
|
|
wg := sync.WaitGroup{}
|
|
for i, peer := range peers {
|
|
wg.Add(1)
|
|
go func(idx int, peer adminPeer) {
|
|
defer wg.Done()
|
|
errs[idx] = peer.cmdRunner.WriteTmpConfig(tmpFileName, configBytes)
|
|
}(i, peer)
|
|
}
|
|
wg.Wait()
|
|
|
|
// Return bytes written and errors (if any) during writing
|
|
// temporary config file.
|
|
return errs
|
|
}
|
|
|
|
// Move config contents from the given temporary file onto config.json
|
|
// on all nodes.
|
|
func commitConfigPeers(peers adminPeers, tmpFileName string) []error {
|
|
// For a single-node minio server setup.
|
|
if !globalIsDistXL {
|
|
return []error{peers[0].cmdRunner.CommitConfig(tmpFileName)}
|
|
}
|
|
|
|
errs := make([]error, len(peers))
|
|
|
|
// Rename temporary config file into configDir/config.json on
|
|
// all nodes.
|
|
wg := sync.WaitGroup{}
|
|
for i, peer := range peers {
|
|
wg.Add(1)
|
|
go func(idx int, peer adminPeer) {
|
|
defer wg.Done()
|
|
errs[idx] = peer.cmdRunner.CommitConfig(tmpFileName)
|
|
}(i, peer)
|
|
}
|
|
wg.Wait()
|
|
|
|
// Return errors (if any) received during rename.
|
|
return errs
|
|
}
|