mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
Avoid startup abort when a notify target is down (#6126)
Minio server was preventing itself to start when any notification target is down and not running. The PR changes the behavior by avoiding startup abort in that case, so the user will still be able to access Minio server using mc admin commands after a restart or set config commands.
This commit is contained in:
parent
42c5b64e4e
commit
be1700f595
@ -176,10 +176,7 @@ func prepareAdminXLTestBed() (*adminXLTestBed, error) {
|
||||
// Init global heal state
|
||||
initAllHealState(globalIsXL)
|
||||
|
||||
globalNotificationSys, err = NewNotificationSys(globalServerConfig, globalEndpoints)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
globalNotificationSys = NewNotificationSys(globalServerConfig, globalEndpoints)
|
||||
|
||||
// Create new policy system.
|
||||
globalPolicySys = NewPolicySys()
|
||||
|
@ -1,650 +0,0 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2014, 2015, 2016, 2017, 2018 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio-go/pkg/set"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
// Admin service names
|
||||
signalServiceRPC = "Admin.SignalService"
|
||||
reInitFormatRPC = "Admin.ReInitFormat"
|
||||
listLocksRPC = "Admin.ListLocks"
|
||||
serverInfoDataRPC = "Admin.ServerInfoData"
|
||||
getConfigRPC = "Admin.GetConfig"
|
||||
writeTmpConfigRPC = "Admin.WriteTmpConfig"
|
||||
commitConfigRPC = "Admin.CommitConfig"
|
||||
)
|
||||
|
||||
// localAdminClient - represents admin operation to be executed locally.
|
||||
type localAdminClient struct {
|
||||
}
|
||||
|
||||
// remoteAdminClient - represents admin operation to be executed
|
||||
// remotely, via RPC.
|
||||
type remoteAdminClient struct {
|
||||
*AuthRPCClient
|
||||
}
|
||||
|
||||
// adminCmdRunner - abstracts local and remote execution of admin
|
||||
// commands like service stop and service restart.
|
||||
type adminCmdRunner interface {
|
||||
SignalService(s serviceSignal) error
|
||||
ReInitFormat(dryRun bool) error
|
||||
ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error)
|
||||
ServerInfoData() (ServerInfoData, error)
|
||||
GetConfig() ([]byte, error)
|
||||
WriteTmpConfig(tmpFileName string, configBytes []byte) error
|
||||
CommitConfig(tmpFileName string) error
|
||||
}
|
||||
|
||||
var errUnsupportedSignal = fmt.Errorf("unsupported signal: only restart and stop signals are supported")
|
||||
|
||||
// SignalService - sends a restart or stop signal to the local server
|
||||
func (lc localAdminClient) SignalService(s serviceSignal) error {
|
||||
switch s {
|
||||
case serviceRestart, serviceStop:
|
||||
globalServiceSignalCh <- s
|
||||
default:
|
||||
return errUnsupportedSignal
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReInitFormat - re-initialize disk format.
|
||||
func (lc localAdminClient) ReInitFormat(dryRun bool) error {
|
||||
objectAPI := newObjectLayerFn()
|
||||
if objectAPI == nil {
|
||||
return errServerNotInitialized
|
||||
}
|
||||
return objectAPI.ReloadFormat(context.Background(), dryRun)
|
||||
}
|
||||
|
||||
// ListLocks - Fetches lock information from local lock instrumentation.
|
||||
func (lc localAdminClient) ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) {
|
||||
// check if objectLayer is initialized, if not return.
|
||||
objectAPI := newObjectLayerFn()
|
||||
if objectAPI == nil {
|
||||
return nil, errServerNotInitialized
|
||||
}
|
||||
return objectAPI.ListLocks(context.Background(), bucket, prefix, duration)
|
||||
}
|
||||
|
||||
func (rc remoteAdminClient) SignalService(s serviceSignal) (err error) {
|
||||
switch s {
|
||||
case serviceRestart, serviceStop:
|
||||
reply := AuthRPCReply{}
|
||||
err = rc.Call(signalServiceRPC, &SignalServiceArgs{Sig: s},
|
||||
&reply)
|
||||
default:
|
||||
err = errUnsupportedSignal
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// ReInitFormat - re-initialize disk format, remotely.
|
||||
func (rc remoteAdminClient) ReInitFormat(dryRun bool) error {
|
||||
reply := AuthRPCReply{}
|
||||
return rc.Call(reInitFormatRPC, &ReInitFormatArgs{
|
||||
DryRun: dryRun,
|
||||
}, &reply)
|
||||
}
|
||||
|
||||
// ListLocks - Sends list locks command to remote server via RPC.
|
||||
func (rc remoteAdminClient) ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) {
|
||||
listArgs := ListLocksQuery{
|
||||
Bucket: bucket,
|
||||
Prefix: prefix,
|
||||
Duration: duration,
|
||||
}
|
||||
var reply ListLocksReply
|
||||
if err := rc.Call(listLocksRPC, &listArgs, &reply); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return reply.VolLocks, nil
|
||||
}
|
||||
|
||||
// ServerInfoData - Returns the server info of this server.
|
||||
func (lc localAdminClient) ServerInfoData() (sid ServerInfoData, e error) {
|
||||
if globalBootTime.IsZero() {
|
||||
return sid, errServerNotInitialized
|
||||
}
|
||||
|
||||
// Build storage info
|
||||
objLayer := newObjectLayerFn()
|
||||
if objLayer == nil {
|
||||
return sid, errServerNotInitialized
|
||||
}
|
||||
storage := objLayer.StorageInfo(context.Background())
|
||||
|
||||
return ServerInfoData{
|
||||
StorageInfo: storage,
|
||||
ConnStats: globalConnStats.toServerConnStats(),
|
||||
HTTPStats: globalHTTPStats.toServerHTTPStats(),
|
||||
Properties: ServerProperties{
|
||||
Uptime: UTCNow().Sub(globalBootTime),
|
||||
Version: Version,
|
||||
CommitID: CommitID,
|
||||
SQSARN: globalNotificationSys.GetARNList(),
|
||||
Region: globalServerConfig.GetRegion(),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ServerInfo - returns the server info of the server to which the RPC call is made.
|
||||
func (rc remoteAdminClient) ServerInfoData() (sid ServerInfoData, e error) {
|
||||
args := AuthRPCArgs{}
|
||||
reply := ServerInfoDataReply{}
|
||||
err := rc.Call(serverInfoDataRPC, &args, &reply)
|
||||
if err != nil {
|
||||
return sid, err
|
||||
}
|
||||
|
||||
return reply.ServerInfoData, nil
|
||||
}
|
||||
|
||||
// GetConfig - returns config.json of the local server.
|
||||
func (lc localAdminClient) GetConfig() ([]byte, error) {
|
||||
if globalServerConfig == nil {
|
||||
return nil, fmt.Errorf("config not present")
|
||||
}
|
||||
|
||||
return json.Marshal(globalServerConfig)
|
||||
}
|
||||
|
||||
// GetConfig - returns config.json of the remote server.
|
||||
func (rc remoteAdminClient) GetConfig() ([]byte, error) {
|
||||
args := AuthRPCArgs{}
|
||||
reply := ConfigReply{}
|
||||
if err := rc.Call(getConfigRPC, &args, &reply); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return reply.Config, nil
|
||||
}
|
||||
|
||||
// WriteTmpConfig - writes config file content to a temporary file on
|
||||
// the local server.
|
||||
func (lc localAdminClient) WriteTmpConfig(tmpFileName string, configBytes []byte) error {
|
||||
return writeTmpConfigCommon(tmpFileName, configBytes)
|
||||
}
|
||||
|
||||
// WriteTmpConfig - writes config file content to a temporary file on
|
||||
// a remote node.
|
||||
func (rc remoteAdminClient) WriteTmpConfig(tmpFileName string, configBytes []byte) error {
|
||||
wArgs := WriteConfigArgs{
|
||||
TmpFileName: tmpFileName,
|
||||
Buf: configBytes,
|
||||
}
|
||||
|
||||
err := rc.Call(writeTmpConfigRPC, &wArgs, &WriteConfigReply{})
|
||||
if err != nil {
|
||||
logger.LogIf(context.Background(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CommitConfig - Move the new config in tmpFileName onto config.json
|
||||
// on a local node.
|
||||
func (lc localAdminClient) CommitConfig(tmpFileName string) error {
|
||||
configFile := getConfigFile()
|
||||
tmpConfigFile := filepath.Join(getConfigDir(), tmpFileName)
|
||||
|
||||
err := os.Rename(tmpConfigFile, configFile)
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("tmpConfigFile", tmpConfigFile)
|
||||
reqInfo.AppendTags("configFile", configFile)
|
||||
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// CommitConfig - Move the new config in tmpFileName onto config.json
|
||||
// on a remote node.
|
||||
func (rc remoteAdminClient) CommitConfig(tmpFileName string) error {
|
||||
cArgs := CommitConfigArgs{
|
||||
FileName: tmpFileName,
|
||||
}
|
||||
cReply := CommitConfigReply{}
|
||||
err := rc.Call(commitConfigRPC, &cArgs, &cReply)
|
||||
if err != nil {
|
||||
logger.LogIf(context.Background(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// adminPeer - represents an entity that implements admin API RPCs.
|
||||
type adminPeer struct {
|
||||
addr string
|
||||
cmdRunner adminCmdRunner
|
||||
isLocal bool
|
||||
}
|
||||
|
||||
// type alias for a collection of adminPeer.
|
||||
type adminPeers []adminPeer
|
||||
|
||||
// makeAdminPeers - helper function to construct a collection of adminPeer.
|
||||
func makeAdminPeers(endpoints EndpointList) (adminPeerList adminPeers) {
|
||||
thisPeer := globalMinioAddr
|
||||
if globalMinioHost == "" {
|
||||
// When host is not explicitly provided simply
|
||||
// use the first IPv4.
|
||||
thisPeer = net.JoinHostPort(sortIPs(localIP4.ToSlice())[0], globalMinioPort)
|
||||
}
|
||||
adminPeerList = append(adminPeerList, adminPeer{
|
||||
thisPeer,
|
||||
localAdminClient{},
|
||||
true,
|
||||
})
|
||||
|
||||
hostSet := set.CreateStringSet(globalMinioAddr)
|
||||
cred := globalServerConfig.GetCredential()
|
||||
serviceEndpoint := path.Join(minioReservedBucketPath, adminPath)
|
||||
for _, host := range GetRemotePeers(endpoints) {
|
||||
if hostSet.Contains(host) {
|
||||
continue
|
||||
}
|
||||
hostSet.Add(host)
|
||||
adminPeerList = append(adminPeerList, adminPeer{
|
||||
addr: host,
|
||||
cmdRunner: &remoteAdminClient{newAuthRPCClient(authConfig{
|
||||
accessKey: cred.AccessKey,
|
||||
secretKey: cred.SecretKey,
|
||||
serverAddr: host,
|
||||
serviceEndpoint: serviceEndpoint,
|
||||
secureConn: globalIsSSL,
|
||||
serviceName: "Admin",
|
||||
})},
|
||||
})
|
||||
}
|
||||
|
||||
return adminPeerList
|
||||
}
|
||||
|
||||
// peersReInitFormat - reinitialize remote object layers to new format.
|
||||
func peersReInitFormat(peers adminPeers, dryRun bool) error {
|
||||
errs := make([]error, len(peers))
|
||||
|
||||
// Send ReInitFormat RPC call to all nodes.
|
||||
// for local adminPeer this is a no-op.
|
||||
wg := sync.WaitGroup{}
|
||||
for i, peer := range peers {
|
||||
wg.Add(1)
|
||||
go func(idx int, peer adminPeer) {
|
||||
defer wg.Done()
|
||||
if !peer.isLocal {
|
||||
errs[idx] = peer.cmdRunner.ReInitFormat(dryRun)
|
||||
}
|
||||
}(i, peer)
|
||||
}
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Initialize global adminPeer collection.
|
||||
func initGlobalAdminPeers(endpoints EndpointList) {
|
||||
globalAdminPeers = makeAdminPeers(endpoints)
|
||||
}
|
||||
|
||||
// invokeServiceCmd - Invoke Restart/Stop command.
|
||||
func invokeServiceCmd(cp adminPeer, cmd serviceSignal) (err error) {
|
||||
switch cmd {
|
||||
case serviceRestart, serviceStop:
|
||||
err = cp.cmdRunner.SignalService(cmd)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// sendServiceCmd - Invoke Restart command on remote peers
|
||||
// adminPeer followed by on the local peer.
|
||||
func sendServiceCmd(cps adminPeers, cmd serviceSignal) {
|
||||
// Send service command like stop or restart to all remote nodes and finally run on local node.
|
||||
errs := make([]error, len(cps))
|
||||
var wg sync.WaitGroup
|
||||
remotePeers := cps[1:]
|
||||
for i := range remotePeers {
|
||||
wg.Add(1)
|
||||
go func(idx int) {
|
||||
defer wg.Done()
|
||||
// we use idx+1 because remotePeers slice is 1 position shifted w.r.t cps
|
||||
errs[idx+1] = invokeServiceCmd(remotePeers[idx], cmd)
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
errs[0] = invokeServiceCmd(cps[0], cmd)
|
||||
}
|
||||
|
||||
// listPeerLocksInfo - fetch list of locks held on the given bucket,
|
||||
// matching prefix held longer than duration from all peer servers.
|
||||
func listPeerLocksInfo(peers adminPeers, bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) {
|
||||
// Used to aggregate volume lock information from all nodes.
|
||||
allLocks := make([][]VolumeLockInfo, len(peers))
|
||||
errs := make([]error, len(peers))
|
||||
var wg sync.WaitGroup
|
||||
localPeer := peers[0]
|
||||
remotePeers := peers[1:]
|
||||
for i, remotePeer := range remotePeers {
|
||||
wg.Add(1)
|
||||
go func(idx int, remotePeer adminPeer) {
|
||||
defer wg.Done()
|
||||
// `remotePeers` is right-shifted by one position relative to `peers`
|
||||
allLocks[idx], errs[idx] = remotePeer.cmdRunner.ListLocks(bucket, prefix, duration)
|
||||
}(i+1, remotePeer)
|
||||
}
|
||||
wg.Wait()
|
||||
allLocks[0], errs[0] = localPeer.cmdRunner.ListLocks(bucket, prefix, duration)
|
||||
|
||||
// Summarizing errors received for ListLocks RPC across all
|
||||
// nodes. N B the possible unavailability of quorum in errors
|
||||
// applies only to distributed setup.
|
||||
errCount, err := reduceErrs(errs, []error{})
|
||||
if err != nil {
|
||||
if errCount >= (len(peers)/2 + 1) {
|
||||
return nil, err
|
||||
}
|
||||
return nil, InsufficientReadQuorum{}
|
||||
}
|
||||
|
||||
// Group lock information across nodes by (bucket, object)
|
||||
// pair. For readability only.
|
||||
paramLockMap := make(map[nsParam][]VolumeLockInfo)
|
||||
for _, nodeLocks := range allLocks {
|
||||
for _, lockInfo := range nodeLocks {
|
||||
param := nsParam{
|
||||
volume: lockInfo.Bucket,
|
||||
path: lockInfo.Object,
|
||||
}
|
||||
paramLockMap[param] = append(paramLockMap[param], lockInfo)
|
||||
}
|
||||
}
|
||||
groupedLockInfos := []VolumeLockInfo{}
|
||||
for _, volLocks := range paramLockMap {
|
||||
groupedLockInfos = append(groupedLockInfos, volLocks...)
|
||||
}
|
||||
return groupedLockInfos, nil
|
||||
}
|
||||
|
||||
// uptimeSlice - used to sort uptimes in chronological order.
|
||||
type uptimeSlice []struct {
|
||||
err error
|
||||
uptime time.Duration
|
||||
}
|
||||
|
||||
func (ts uptimeSlice) Len() int {
|
||||
return len(ts)
|
||||
}
|
||||
|
||||
func (ts uptimeSlice) Less(i, j int) bool {
|
||||
return ts[i].uptime < ts[j].uptime
|
||||
}
|
||||
|
||||
func (ts uptimeSlice) Swap(i, j int) {
|
||||
ts[i], ts[j] = ts[j], ts[i]
|
||||
}
|
||||
|
||||
// getPeerUptimes - returns the uptime since the last time read quorum
|
||||
// was established on success. Otherwise returns errXLReadQuorum.
|
||||
func getPeerUptimes(peers adminPeers) (time.Duration, error) {
|
||||
// In a single node Erasure or FS backend setup the uptime of
|
||||
// the setup is the uptime of the single minio server
|
||||
// instance.
|
||||
if !globalIsDistXL {
|
||||
return UTCNow().Sub(globalBootTime), nil
|
||||
}
|
||||
|
||||
uptimes := make(uptimeSlice, len(peers))
|
||||
|
||||
// Get up time of all servers.
|
||||
wg := sync.WaitGroup{}
|
||||
for i, peer := range peers {
|
||||
wg.Add(1)
|
||||
go func(idx int, peer adminPeer) {
|
||||
defer wg.Done()
|
||||
serverInfoData, rpcErr := peer.cmdRunner.ServerInfoData()
|
||||
uptimes[idx].uptime, uptimes[idx].err = serverInfoData.Properties.Uptime, rpcErr
|
||||
}(i, peer)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Sort uptimes in chronological order.
|
||||
sort.Sort(uptimes)
|
||||
|
||||
// Pick the readQuorum'th uptime in chronological order. i.e,
|
||||
// the time at which read quorum was (re-)established.
|
||||
readQuorum := len(uptimes) / 2
|
||||
validCount := 0
|
||||
latestUptime := time.Duration(0)
|
||||
for _, uptime := range uptimes {
|
||||
if uptime.err != nil {
|
||||
logger.LogIf(context.Background(), uptime.err)
|
||||
continue
|
||||
}
|
||||
|
||||
validCount++
|
||||
if validCount >= readQuorum {
|
||||
latestUptime = uptime.uptime
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Less than readQuorum "Admin.Uptime" RPC call returned
|
||||
// successfully, so read-quorum unavailable.
|
||||
if validCount < readQuorum {
|
||||
return time.Duration(0), InsufficientReadQuorum{}
|
||||
}
|
||||
|
||||
return latestUptime, nil
|
||||
}
|
||||
|
||||
// getPeerConfig - Fetches config.json from all nodes in the setup and
|
||||
// returns the one that occurs in a majority of them.
|
||||
func getPeerConfig(peers adminPeers) ([]byte, error) {
|
||||
if !globalIsDistXL {
|
||||
return peers[0].cmdRunner.GetConfig()
|
||||
}
|
||||
|
||||
errs := make([]error, len(peers))
|
||||
configs := make([][]byte, len(peers))
|
||||
|
||||
// Get config from all servers.
|
||||
wg := sync.WaitGroup{}
|
||||
for i, peer := range peers {
|
||||
wg.Add(1)
|
||||
go func(idx int, peer adminPeer) {
|
||||
defer wg.Done()
|
||||
configs[idx], errs[idx] = peer.cmdRunner.GetConfig()
|
||||
}(i, peer)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Find the maximally occurring config among peers in a
|
||||
// distributed setup.
|
||||
|
||||
serverConfigs := make([]serverConfig, len(peers))
|
||||
for i, configBytes := range configs {
|
||||
if errs[i] != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Unmarshal the received config files.
|
||||
err := json.Unmarshal(configBytes, &serverConfigs[i])
|
||||
if err != nil {
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", peers[i].addr)
|
||||
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
||||
logger.LogIf(ctx, err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
configJSON, err := getValidServerConfig(serverConfigs, errs)
|
||||
if err != nil {
|
||||
logger.LogIf(context.Background(), err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Return the config.json that was present quorum or more
|
||||
// number of disks.
|
||||
return json.Marshal(configJSON)
|
||||
}
|
||||
|
||||
// getValidServerConfig - finds the server config that is present in
|
||||
// quorum or more number of servers.
|
||||
func getValidServerConfig(serverConfigs []serverConfig, errs []error) (scv serverConfig, e error) {
|
||||
// majority-based quorum
|
||||
quorum := len(serverConfigs)/2 + 1
|
||||
|
||||
// Count the number of disks a config.json was found in.
|
||||
configCounter := make([]int, len(serverConfigs))
|
||||
|
||||
// We group equal serverConfigs by the lowest index of the
|
||||
// same value; e.g, let us take the following serverConfigs
|
||||
// in a 4-node setup,
|
||||
// serverConfigs == [c1, c2, c1, c1]
|
||||
// configCounter == [3, 1, 0, 0]
|
||||
// c1, c2 are the only distinct values that appear. c1 is
|
||||
// identified by 0, the lowest index it appears in and c2 is
|
||||
// identified by 1. So, we need to find the number of times
|
||||
// each of these distinct values occur.
|
||||
|
||||
// Invariants:
|
||||
|
||||
// 1. At the beginning of the i-th iteration, the number of
|
||||
// unique configurations seen so far is equal to the number of
|
||||
// non-zero counter values in config[:i].
|
||||
|
||||
// 2. At the beginning of the i-th iteration, the sum of
|
||||
// elements of configCounter[:i] is equal to the number of
|
||||
// non-error configurations seen so far.
|
||||
|
||||
// For each of the serverConfig ...
|
||||
for i := range serverConfigs {
|
||||
// Skip nodes where getConfig failed.
|
||||
if errs[i] != nil {
|
||||
continue
|
||||
}
|
||||
// Check if it is equal to any of the configurations
|
||||
// seen so far. If j == i is reached then we have an
|
||||
// unseen configuration.
|
||||
for j := 0; j <= i; j++ {
|
||||
if j < i && configCounter[j] == 0 {
|
||||
// serverConfigs[j] is known to be
|
||||
// equal to a value that was already
|
||||
// seen. See example above for
|
||||
// clarity.
|
||||
continue
|
||||
} else if j < i && serverConfigs[i].ConfigDiff(&serverConfigs[j]) == "" {
|
||||
// serverConfigs[i] is equal to
|
||||
// serverConfigs[j], update
|
||||
// serverConfigs[j]'s counter since it
|
||||
// is the lower index.
|
||||
configCounter[j]++
|
||||
break
|
||||
} else if j == i {
|
||||
// serverConfigs[i] is equal to no
|
||||
// other value seen before. It is
|
||||
// unique so far.
|
||||
configCounter[i] = 1
|
||||
break
|
||||
} // else invariants specified above are violated.
|
||||
}
|
||||
}
|
||||
|
||||
// We find the maximally occurring server config and check if
|
||||
// there is quorum.
|
||||
var configJSON serverConfig
|
||||
maxOccurrence := 0
|
||||
for i, count := range configCounter {
|
||||
if maxOccurrence < count {
|
||||
maxOccurrence = count
|
||||
configJSON = serverConfigs[i]
|
||||
}
|
||||
}
|
||||
|
||||
// If quorum nodes don't agree.
|
||||
if maxOccurrence < quorum {
|
||||
return scv, errXLWriteQuorum
|
||||
}
|
||||
|
||||
return configJSON, nil
|
||||
}
|
||||
|
||||
// Write config contents into a temporary file on all nodes.
|
||||
func writeTmpConfigPeers(peers adminPeers, tmpFileName string, configBytes []byte) []error {
|
||||
// For a single-node minio server setup.
|
||||
if !globalIsDistXL {
|
||||
err := peers[0].cmdRunner.WriteTmpConfig(tmpFileName, configBytes)
|
||||
return []error{err}
|
||||
}
|
||||
|
||||
errs := make([]error, len(peers))
|
||||
|
||||
// Write config into temporary file on all nodes.
|
||||
wg := sync.WaitGroup{}
|
||||
for i, peer := range peers {
|
||||
wg.Add(1)
|
||||
go func(idx int, peer adminPeer) {
|
||||
defer wg.Done()
|
||||
errs[idx] = peer.cmdRunner.WriteTmpConfig(tmpFileName, configBytes)
|
||||
}(i, peer)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Return bytes written and errors (if any) during writing
|
||||
// temporary config file.
|
||||
return errs
|
||||
}
|
||||
|
||||
// Move config contents from the given temporary file onto config.json
|
||||
// on all nodes.
|
||||
func commitConfigPeers(peers adminPeers, tmpFileName string) []error {
|
||||
// For a single-node minio server setup.
|
||||
if !globalIsDistXL {
|
||||
return []error{peers[0].cmdRunner.CommitConfig(tmpFileName)}
|
||||
}
|
||||
|
||||
errs := make([]error, len(peers))
|
||||
|
||||
// Rename temporary config file into configDir/config.json on
|
||||
// all nodes.
|
||||
wg := sync.WaitGroup{}
|
||||
for i, peer := range peers {
|
||||
wg.Add(1)
|
||||
go func(idx int, peer adminPeer) {
|
||||
defer wg.Done()
|
||||
errs[idx] = peer.cmdRunner.CommitConfig(tmpFileName)
|
||||
}(i, peer)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Return errors (if any) received during rename.
|
||||
return errs
|
||||
}
|
@ -139,10 +139,8 @@ func testAdminCmdRunnerServerInfo(t *testing.T, client adminCmdRunner) {
|
||||
}()
|
||||
|
||||
endpoints := new(EndpointList)
|
||||
notificationSys, err := NewNotificationSys(globalServerConfig, *endpoints)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error %v", err)
|
||||
}
|
||||
|
||||
notificationSys := NewNotificationSys(globalServerConfig, *endpoints)
|
||||
|
||||
testCases := []struct {
|
||||
bootTime time.Time
|
||||
|
@ -17,6 +17,7 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
@ -402,17 +403,19 @@ func loadConfig() error {
|
||||
// * Add a new target in pkg/event/target package.
|
||||
// * Add newly added target configuration to serverConfig.Notify.<TARGET_NAME>.
|
||||
// * Handle the configuration in this function to create/add into TargetList.
|
||||
func getNotificationTargets(config *serverConfig) (*event.TargetList, error) {
|
||||
func getNotificationTargets(config *serverConfig) *event.TargetList {
|
||||
targetList := event.NewTargetList()
|
||||
|
||||
for id, args := range config.Notify.AMQP {
|
||||
if args.Enable {
|
||||
newTarget, err := target.NewAMQPTarget(id, args)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
logger.LogIf(context.Background(), err)
|
||||
continue
|
||||
}
|
||||
if err = targetList.Add(newTarget); err != nil {
|
||||
return nil, err
|
||||
logger.LogIf(context.Background(), err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -421,10 +424,14 @@ func getNotificationTargets(config *serverConfig) (*event.TargetList, error) {
|
||||
if args.Enable {
|
||||
newTarget, err := target.NewElasticsearchTarget(id, args)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
logger.LogIf(context.Background(), err)
|
||||
continue
|
||||
|
||||
}
|
||||
if err = targetList.Add(newTarget); err != nil {
|
||||
return nil, err
|
||||
logger.LogIf(context.Background(), err)
|
||||
continue
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -433,10 +440,12 @@ func getNotificationTargets(config *serverConfig) (*event.TargetList, error) {
|
||||
if args.Enable {
|
||||
newTarget, err := target.NewKafkaTarget(id, args)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
logger.LogIf(context.Background(), err)
|
||||
continue
|
||||
}
|
||||
if err = targetList.Add(newTarget); err != nil {
|
||||
return nil, err
|
||||
logger.LogIf(context.Background(), err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -445,10 +454,12 @@ func getNotificationTargets(config *serverConfig) (*event.TargetList, error) {
|
||||
if args.Enable {
|
||||
newTarget, err := target.NewMQTTTarget(id, args)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
logger.LogIf(context.Background(), err)
|
||||
continue
|
||||
}
|
||||
if err = targetList.Add(newTarget); err != nil {
|
||||
return nil, err
|
||||
logger.LogIf(context.Background(), err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -457,10 +468,12 @@ func getNotificationTargets(config *serverConfig) (*event.TargetList, error) {
|
||||
if args.Enable {
|
||||
newTarget, err := target.NewMySQLTarget(id, args)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
logger.LogIf(context.Background(), err)
|
||||
continue
|
||||
}
|
||||
if err = targetList.Add(newTarget); err != nil {
|
||||
return nil, err
|
||||
logger.LogIf(context.Background(), err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -469,10 +482,12 @@ func getNotificationTargets(config *serverConfig) (*event.TargetList, error) {
|
||||
if args.Enable {
|
||||
newTarget, err := target.NewNATSTarget(id, args)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
logger.LogIf(context.Background(), err)
|
||||
continue
|
||||
}
|
||||
if err = targetList.Add(newTarget); err != nil {
|
||||
return nil, err
|
||||
logger.LogIf(context.Background(), err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -481,10 +496,12 @@ func getNotificationTargets(config *serverConfig) (*event.TargetList, error) {
|
||||
if args.Enable {
|
||||
newTarget, err := target.NewPostgreSQLTarget(id, args)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
logger.LogIf(context.Background(), err)
|
||||
continue
|
||||
}
|
||||
if err = targetList.Add(newTarget); err != nil {
|
||||
return nil, err
|
||||
logger.LogIf(context.Background(), err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -493,10 +510,12 @@ func getNotificationTargets(config *serverConfig) (*event.TargetList, error) {
|
||||
if args.Enable {
|
||||
newTarget, err := target.NewRedisTarget(id, args)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
logger.LogIf(context.Background(), err)
|
||||
continue
|
||||
}
|
||||
if err = targetList.Add(newTarget); err != nil {
|
||||
return nil, err
|
||||
logger.LogIf(context.Background(), err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -505,10 +524,11 @@ func getNotificationTargets(config *serverConfig) (*event.TargetList, error) {
|
||||
if args.Enable {
|
||||
newTarget := target.NewWebhookTarget(id, args)
|
||||
if err := targetList.Add(newTarget); err != nil {
|
||||
return nil, err
|
||||
logger.LogIf(context.Background(), err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return targetList, nil
|
||||
return targetList
|
||||
}
|
||||
|
10
cmd/fs-v1.go
10
cmd/fs-v1.go
@ -151,16 +151,6 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) {
|
||||
// or cause changes on backend format.
|
||||
fs.fsFormatRlk = rlk
|
||||
|
||||
// Initialize notification system.
|
||||
if err = globalNotificationSys.Init(fs); err != nil {
|
||||
return nil, uiErrUnableToReadFromBackend(err).Msg("Unable to initialize notification system")
|
||||
}
|
||||
|
||||
// Initialize policy system.
|
||||
if err = globalPolicySys.Init(fs); err != nil {
|
||||
return nil, uiErrUnableToReadFromBackend(err).Msg("Unable to initialize policy system")
|
||||
}
|
||||
|
||||
if !fs.diskMount {
|
||||
go fs.diskUsage(globalServiceDoneCh)
|
||||
}
|
||||
|
@ -170,8 +170,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
|
||||
initNSLock(false) // Enable local namespace lock.
|
||||
|
||||
// Create new notification system.
|
||||
globalNotificationSys, err = NewNotificationSys(globalServerConfig, EndpointList{})
|
||||
logger.FatalIf(err, "Unable to create new notification system")
|
||||
globalNotificationSys = NewNotificationSys(globalServerConfig, EndpointList{})
|
||||
|
||||
// Create new policy system.
|
||||
globalPolicySys = NewPolicySys()
|
||||
|
@ -429,12 +429,8 @@ func (sys *NotificationSys) Send(args eventArgs) []event.TargetIDErr {
|
||||
}
|
||||
|
||||
// NewNotificationSys - creates new notification system object.
|
||||
func NewNotificationSys(config *serverConfig, endpoints EndpointList) (*NotificationSys, error) {
|
||||
targetList, err := getNotificationTargets(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func NewNotificationSys(config *serverConfig, endpoints EndpointList) *NotificationSys {
|
||||
targetList := getNotificationTargets(config)
|
||||
peerRPCClientMap := makeRemoteRPCClients(endpoints)
|
||||
|
||||
// bucketRulesMap/bucketRemoteTargetRulesMap are initialized by NotificationSys.Init()
|
||||
@ -443,7 +439,7 @@ func NewNotificationSys(config *serverConfig, endpoints EndpointList) (*Notifica
|
||||
bucketRulesMap: make(map[string]event.RulesMap),
|
||||
bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap),
|
||||
peerRPCClientMap: peerRPCClientMap,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
type eventArgs struct {
|
||||
|
@ -271,15 +271,6 @@ func serverMain(ctx *cli.Context) {
|
||||
logger.Fatal(uiErrUnexpectedError(err), "Unable to configure one of server's RPC services")
|
||||
}
|
||||
|
||||
// Create new notification system.
|
||||
globalNotificationSys, err = NewNotificationSys(globalServerConfig, globalEndpoints)
|
||||
if err != nil {
|
||||
logger.Fatal(err, "Unable to initialize the notification system")
|
||||
}
|
||||
|
||||
// Create new policy system.
|
||||
globalPolicySys = NewPolicySys()
|
||||
|
||||
// Initialize Admin Peers inter-node communication only in distributed setup.
|
||||
initGlobalAdminPeers(globalEndpoints)
|
||||
|
||||
@ -315,6 +306,25 @@ func serverMain(ctx *cli.Context) {
|
||||
initFederatorBackend(newObject)
|
||||
}
|
||||
|
||||
// Re-enable logging
|
||||
logger.Disable = false
|
||||
|
||||
// Create new policy system.
|
||||
globalPolicySys = NewPolicySys()
|
||||
|
||||
// Initialize policy system.
|
||||
if err := globalPolicySys.Init(newObjectLayerFn()); err != nil {
|
||||
logger.Fatal(err, "Unable to initialize policy system")
|
||||
}
|
||||
|
||||
// Create new notification system.
|
||||
globalNotificationSys = NewNotificationSys(globalServerConfig, globalEndpoints)
|
||||
|
||||
// Initialize notification system.
|
||||
if err := globalNotificationSys.Init(newObjectLayerFn()); err != nil {
|
||||
logger.Fatal(err, "Unable to initialize notification system")
|
||||
}
|
||||
|
||||
// Prints the formatted startup message once object layer is initialized.
|
||||
apiEndpoints := getAPIEndpoints(globalMinioAddr)
|
||||
printStartupMessage(apiEndpoints)
|
||||
@ -322,9 +332,6 @@ func serverMain(ctx *cli.Context) {
|
||||
// Set uptime time after object layer has initialized.
|
||||
globalBootTime = UTCNow()
|
||||
|
||||
// Re-enable logging
|
||||
logger.Disable = false
|
||||
|
||||
handleSignals()
|
||||
}
|
||||
|
||||
|
@ -353,10 +353,8 @@ func UnstartedTestServer(t TestErrHandler, instanceType string) TestServer {
|
||||
globalMinioHost = host
|
||||
globalMinioPort = port
|
||||
globalMinioAddr = getEndpointsLocalAddr(testServer.Disks)
|
||||
globalNotificationSys, err = NewNotificationSys(globalServerConfig, testServer.Disks)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to create new notification system. %v", err)
|
||||
}
|
||||
|
||||
globalNotificationSys = NewNotificationSys(globalServerConfig, testServer.Disks)
|
||||
|
||||
// Create new policy system.
|
||||
globalPolicySys = NewPolicySys()
|
||||
@ -1720,9 +1718,7 @@ func newTestObjectLayer(endpoints EndpointList) (newObject ObjectLayer, err erro
|
||||
}
|
||||
|
||||
// Create new notification system.
|
||||
if globalNotificationSys, err = NewNotificationSys(globalServerConfig, endpoints); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
globalNotificationSys = NewNotificationSys(globalServerConfig, endpoints)
|
||||
|
||||
// Create new policy system.
|
||||
globalPolicySys = NewPolicySys()
|
||||
|
@ -263,16 +263,6 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP
|
||||
// Connect disks right away.
|
||||
s.connectDisks()
|
||||
|
||||
// Initialize notification system.
|
||||
if err := globalNotificationSys.Init(s); err != nil {
|
||||
return nil, fmt.Errorf("Unable to initialize notification system. %v", err)
|
||||
}
|
||||
|
||||
// Initialize policy system.
|
||||
if err := globalPolicySys.Init(s); err != nil {
|
||||
return nil, fmt.Errorf("Unable to initialize policy system. %v", err)
|
||||
}
|
||||
|
||||
// Start the disk monitoring and connect routine.
|
||||
go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user