minio/cmd/notification.go
Shireesh Anjal 289b22d911
fix: pool number not added for one server (#11670)
The previous code was iterating over replies from peers and assigning
pool numbers to them, thus missing to add it for the local server.

Fixed by iterating over the server properties of all the servers
including the local one.
2021-03-01 08:09:43 -08:00

1501 lines
40 KiB
Go

/*
* MinIO Cloud Storage, (C) 2018, 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"sort"
"strings"
"sync"
"time"
"github.com/cespare/xxhash/v2"
"github.com/klauspost/compress/zip"
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio/cmd/crypto"
"github.com/minio/minio/cmd/logger"
bandwidth "github.com/minio/minio/pkg/bandwidth"
bucketBandwidth "github.com/minio/minio/pkg/bucket/bandwidth"
"github.com/minio/minio/pkg/bucket/policy"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/madmin"
xnet "github.com/minio/minio/pkg/net"
"github.com/minio/minio/pkg/sync/errgroup"
"github.com/willf/bloom"
)
// NotificationSys - notification system.
type NotificationSys struct {
sync.RWMutex
targetList *event.TargetList
targetResCh chan event.TargetIDResult
bucketRulesMap map[string]event.RulesMap
bucketRemoteTargetRulesMap map[string]map[event.TargetID]event.RulesMap
peerClients []*peerRESTClient // Excludes self
allPeerClients []*peerRESTClient // Includes nil client for self
}
// GetARNList - returns available ARNs.
func (sys *NotificationSys) GetARNList(onlyActive bool) []string {
arns := []string{}
if sys == nil {
return arns
}
region := globalServerRegion
for targetID, target := range sys.targetList.TargetMap() {
// httpclient target is part of ListenNotification
// which doesn't need to be listed as part of the ARN list
// This list is only meant for external targets, filter
// this out pro-actively.
if !strings.HasPrefix(targetID.ID, "httpclient+") {
if onlyActive && !target.HasQueueStore() {
if _, err := target.IsActive(); err != nil {
continue
}
}
arns = append(arns, targetID.ToARN(region).String())
}
}
return arns
}
// NotificationPeerErr returns error associated for a remote peer.
type NotificationPeerErr struct {
Host xnet.Host // Remote host on which the rpc call was initiated
Err error // Error returned by the remote peer for an rpc call
}
// A NotificationGroup is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero NotificationGroup is valid and does not cancel on error.
type NotificationGroup struct {
wg sync.WaitGroup
errs []NotificationPeerErr
}
// WithNPeers returns a new NotificationGroup with length of errs slice upto nerrs,
// upon Wait() errors are returned collected from all tasks.
func WithNPeers(nerrs int) *NotificationGroup {
return &NotificationGroup{errs: make([]NotificationPeerErr, nerrs)}
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the slice of errors from all function calls.
func (g *NotificationGroup) Wait() []NotificationPeerErr {
g.wg.Wait()
return g.errs
}
// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error will be
// collected in errs slice and returned by Wait().
func (g *NotificationGroup) Go(ctx context.Context, f func() error, index int, addr xnet.Host) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
g.errs[index] = NotificationPeerErr{
Host: addr,
}
for i := 0; i < 3; i++ {
if err := f(); err != nil {
g.errs[index].Err = err
// Last iteration log the error.
if i == 2 {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogIf(ctx, err)
}
// Wait for one second and no need wait after last attempt.
if i < 2 {
time.Sleep(1 * time.Second)
}
continue
}
break
}
}()
}
// DeletePolicy - deletes policy across all peers.
func (sys *NotificationSys) DeletePolicy(policyName string) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(GlobalContext, func() error {
return client.DeletePolicy(policyName)
}, idx, *client.host)
}
return ng.Wait()
}
// LoadPolicy - reloads a specific modified policy across all peers
func (sys *NotificationSys) LoadPolicy(policyName string) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(GlobalContext, func() error {
return client.LoadPolicy(policyName)
}, idx, *client.host)
}
return ng.Wait()
}
// LoadPolicyMapping - reloads a policy mapping across all peers
func (sys *NotificationSys) LoadPolicyMapping(userOrGroup string, isGroup bool) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(GlobalContext, func() error {
return client.LoadPolicyMapping(userOrGroup, isGroup)
}, idx, *client.host)
}
return ng.Wait()
}
// DeleteUser - deletes a specific user across all peers
func (sys *NotificationSys) DeleteUser(accessKey string) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(GlobalContext, func() error {
return client.DeleteUser(accessKey)
}, idx, *client.host)
}
return ng.Wait()
}
// LoadUser - reloads a specific user across all peers
func (sys *NotificationSys) LoadUser(accessKey string, temp bool) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(GlobalContext, func() error {
return client.LoadUser(accessKey, temp)
}, idx, *client.host)
}
return ng.Wait()
}
// LoadGroup - loads a specific group on all peers.
func (sys *NotificationSys) LoadGroup(group string) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(GlobalContext, func() error { return client.LoadGroup(group) }, idx, *client.host)
}
return ng.Wait()
}
// DeleteServiceAccount - deletes a specific service account across all peers
func (sys *NotificationSys) DeleteServiceAccount(accessKey string) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(GlobalContext, func() error {
return client.DeleteServiceAccount(accessKey)
}, idx, *client.host)
}
return ng.Wait()
}
// LoadServiceAccount - reloads a specific service account across all peers
func (sys *NotificationSys) LoadServiceAccount(accessKey string) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(GlobalContext, func() error {
return client.LoadServiceAccount(accessKey)
}, idx, *client.host)
}
return ng.Wait()
}
// BackgroundHealStatus - returns background heal status of all peers
func (sys *NotificationSys) BackgroundHealStatus() ([]madmin.BgHealState, []NotificationPeerErr) {
ng := WithNPeers(len(sys.peerClients))
states := make([]madmin.BgHealState, len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
idx := idx
client := client
ng.Go(GlobalContext, func() error {
st, err := client.BackgroundHealStatus()
if err != nil {
return err
}
states[idx] = st
return nil
}, idx, *client.host)
}
return states, ng.Wait()
}
// StartProfiling - start profiling on remote peers, by initiating a remote RPC.
func (sys *NotificationSys) StartProfiling(profiler string) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(GlobalContext, func() error {
return client.StartProfiling(profiler)
}, idx, *client.host)
}
return ng.Wait()
}
// DownloadProfilingData - download profiling data from all remote peers.
func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io.Writer) bool {
profilingDataFound := false
// Initialize a zip writer which will provide a zipped content
// of profiling data of all nodes
zipWriter := zip.NewWriter(writer)
defer zipWriter.Close()
for _, client := range sys.peerClients {
if client == nil {
continue
}
data, err := client.DownloadProfileData()
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogIf(ctx, err)
continue
}
profilingDataFound = true
for typ, data := range data {
// Send profiling data to zip as file
header, zerr := zip.FileInfoHeader(dummyFileInfo{
name: fmt.Sprintf("profile-%s-%s", client.host.String(), typ),
size: int64(len(data)),
mode: 0600,
modTime: UTCNow(),
isDir: false,
sys: nil,
})
if zerr != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogIf(ctx, zerr)
continue
}
header.Method = zip.Deflate
zwriter, zerr := zipWriter.CreateHeader(header)
if zerr != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogIf(ctx, zerr)
continue
}
if _, err = io.Copy(zwriter, bytes.NewReader(data)); err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", client.host.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogIf(ctx, err)
continue
}
}
}
// Local host
thisAddr, err := xnet.ParseHost(GetLocalPeer(globalEndpoints))
if err != nil {
logger.LogIf(ctx, err)
return profilingDataFound
}
data, err := getProfileData()
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", thisAddr.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogIf(ctx, err)
return profilingDataFound
}
profilingDataFound = true
// Send profiling data to zip as file
for typ, data := range data {
header, zerr := zip.FileInfoHeader(dummyFileInfo{
name: fmt.Sprintf("profile-%s-%s", thisAddr, typ),
size: int64(len(data)),
mode: 0600,
modTime: UTCNow(),
isDir: false,
sys: nil,
})
if zerr != nil {
return profilingDataFound
}
header.Method = zip.Deflate
zwriter, zerr := zipWriter.CreateHeader(header)
if zerr != nil {
return profilingDataFound
}
if _, err = io.Copy(zwriter, bytes.NewReader(data)); err != nil {
return profilingDataFound
}
}
return profilingDataFound
}
// ServerUpdate - updates remote peers.
func (sys *NotificationSys) ServerUpdate(ctx context.Context, u *url.URL, sha256Sum []byte, lrTime time.Time) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(ctx, func() error {
return client.ServerUpdate(ctx, u, sha256Sum, lrTime)
}, idx, *client.host)
}
return ng.Wait()
}
// SignalService - calls signal service RPC call on all peers.
func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(GlobalContext, func() error {
return client.SignalService(sig)
}, idx, *client.host)
}
return ng.Wait()
}
// updateBloomFilter will cycle all servers to the current index and
// return a merged bloom filter if a complete one can be retrieved.
func (sys *NotificationSys) updateBloomFilter(ctx context.Context, current uint64) (*bloomFilter, error) {
var req = bloomFilterRequest{
Current: current,
Oldest: current - dataUsageUpdateDirCycles,
}
if current < dataUsageUpdateDirCycles {
req.Oldest = 0
}
// Load initial state from local...
var bf *bloomFilter
bfr, err := intDataUpdateTracker.cycleFilter(ctx, req)
logger.LogIf(ctx, err)
if err == nil && bfr.Complete {
nbf := intDataUpdateTracker.newBloomFilter()
bf = &nbf
_, err = bf.ReadFrom(bytes.NewReader(bfr.Filter))
logger.LogIf(ctx, err)
}
var mu sync.Mutex
g := errgroup.WithNErrs(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
g.Go(func() error {
serverBF, err := client.cycleServerBloomFilter(ctx, req)
if false && intDataUpdateTracker.debug {
b, _ := json.MarshalIndent(serverBF, "", " ")
logger.Info("Disk %v, Bloom filter: %v", client.host.Name, string(b))
}
// Keep lock while checking result.
mu.Lock()
defer mu.Unlock()
if err != nil || !serverBF.Complete || bf == nil {
logger.LogOnceIf(ctx, err, fmt.Sprintf("host:%s, cycle:%d", client.host, current), client.cycleServerBloomFilter)
bf = nil
return nil
}
var tmp bloom.BloomFilter
_, err = tmp.ReadFrom(bytes.NewReader(serverBF.Filter))
if err != nil {
logger.LogIf(ctx, err)
bf = nil
return nil
}
if bf.BloomFilter == nil {
bf.BloomFilter = &tmp
} else {
err = bf.Merge(&tmp)
if err != nil {
logger.LogIf(ctx, err)
bf = nil
return nil
}
}
return nil
}, idx)
}
g.Wait()
return bf, nil
}
// collectBloomFilter will collect bloom filters from all servers from the specified cycle.
func (sys *NotificationSys) collectBloomFilter(ctx context.Context, from uint64) (*bloomFilter, error) {
var req = bloomFilterRequest{
Current: 0,
Oldest: from,
}
// Load initial state from local...
var bf *bloomFilter
bfr, err := intDataUpdateTracker.cycleFilter(ctx, req)
logger.LogIf(ctx, err)
if err == nil && bfr.Complete {
nbf := intDataUpdateTracker.newBloomFilter()
bf = &nbf
_, err = bf.ReadFrom(bytes.NewReader(bfr.Filter))
logger.LogIf(ctx, err)
}
if !bfr.Complete {
// If local isn't complete just return early
return nil, nil
}
var mu sync.Mutex
g := errgroup.WithNErrs(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
g.Go(func() error {
serverBF, err := client.cycleServerBloomFilter(ctx, req)
if false && intDataUpdateTracker.debug {
b, _ := json.MarshalIndent(serverBF, "", " ")
logger.Info("Disk %v, Bloom filter: %v", client.host.Name, string(b))
}
// Keep lock while checking result.
mu.Lock()
defer mu.Unlock()
if err != nil || !serverBF.Complete || bf == nil {
logger.LogIf(ctx, err)
bf = nil
return nil
}
var tmp bloom.BloomFilter
_, err = tmp.ReadFrom(bytes.NewReader(serverBF.Filter))
if err != nil {
logger.LogIf(ctx, err)
bf = nil
return nil
}
if bf.BloomFilter == nil {
bf.BloomFilter = &tmp
} else {
err = bf.Merge(&tmp)
if err != nil {
logger.LogIf(ctx, err)
bf = nil
return nil
}
}
return nil
}, idx)
}
g.Wait()
return bf, nil
}
// findEarliestCleanBloomFilter will find the earliest bloom filter across the cluster
// where the directory is clean.
// Due to how objects are stored this can include object names.
func (sys *NotificationSys) findEarliestCleanBloomFilter(ctx context.Context, dir string) uint64 {
// Load initial state from local...
current := intDataUpdateTracker.current()
best := intDataUpdateTracker.latestWithDir(dir)
if best == current {
// If the current is dirty no need to check others.
return current
}
var req = bloomFilterRequest{
Current: 0,
Oldest: best,
OldestClean: dir,
}
var mu sync.Mutex
g := errgroup.WithNErrs(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
g.Go(func() error {
serverBF, err := client.cycleServerBloomFilter(ctx, req)
// Keep lock while checking result.
mu.Lock()
defer mu.Unlock()
if err != nil {
// Error, don't assume clean.
best = current
logger.LogIf(ctx, err)
return nil
}
if serverBF.OldestIdx > best {
best = serverBF.OldestIdx
}
return nil
}, idx)
}
g.Wait()
return best
}
// GetLocks - makes GetLocks RPC call on all peers.
func (sys *NotificationSys) GetLocks(ctx context.Context, r *http.Request) []*PeerLocks {
locksResp := make([]*PeerLocks, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
index := index
g.Go(func() error {
if client == nil {
return nil
}
serverLocksResp, err := sys.peerClients[index].GetLocks()
if err != nil {
return err
}
locksResp[index] = &PeerLocks{
Addr: sys.peerClients[index].host.String(),
Locks: serverLocksResp,
}
return nil
}, index)
}
for index, err := range g.Wait() {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress",
sys.peerClients[index].host.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogOnceIf(ctx, err, sys.peerClients[index].host.String())
}
locksResp = append(locksResp, &PeerLocks{
Addr: getHostName(r),
Locks: globalLockServer.DupLockMap(),
})
return locksResp
}
// LoadBucketMetadata - calls LoadBucketMetadata call on all peers
func (sys *NotificationSys) LoadBucketMetadata(ctx context.Context, bucketName string) {
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(ctx, func() error {
return client.LoadBucketMetadata(bucketName)
}, idx, *client.host)
}
for _, nErr := range ng.Wait() {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", nErr.Host.String())
if nErr.Err != nil {
logger.LogIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err)
}
}
}
// DeleteBucketMetadata - calls DeleteBucketMetadata call on all peers
func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName string) {
globalBucketMetadataSys.Remove(bucketName)
if localMetacacheMgr != nil {
localMetacacheMgr.deleteBucketCache(bucketName)
}
ng := WithNPeers(len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(ctx, func() error {
return client.DeleteBucketMetadata(bucketName)
}, idx, *client.host)
}
for _, nErr := range ng.Wait() {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", nErr.Host.String())
if nErr.Err != nil {
logger.LogIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err)
}
}
}
// Loads notification policies for all buckets into NotificationSys.
func (sys *NotificationSys) load(buckets []BucketInfo) {
for _, bucket := range buckets {
ctx := logger.SetReqInfo(GlobalContext, &logger.ReqInfo{BucketName: bucket.Name})
config, err := globalBucketMetadataSys.GetNotificationConfig(bucket.Name)
if err != nil {
logger.LogIf(ctx, err)
continue
}
config.SetRegion(globalServerRegion)
if err = config.Validate(globalServerRegion, globalNotificationSys.targetList); err != nil {
if _, ok := err.(*event.ErrARNNotFound); !ok {
logger.LogIf(ctx, err)
}
continue
}
sys.AddRulesMap(bucket.Name, config.ToRulesMap())
}
}
// Init - initializes notification system from notification.xml and listenxl.meta of all buckets.
func (sys *NotificationSys) Init(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error {
if objAPI == nil {
return errServerNotInitialized
}
// In gateway mode, notifications are not supported - except NAS gateway.
if globalIsGateway && !objAPI.IsNotificationSupported() {
return nil
}
logger.LogIf(ctx, sys.targetList.Add(globalConfigTargetList.Targets()...))
go func() {
for res := range sys.targetResCh {
if res.Err != nil {
reqInfo := &logger.ReqInfo{}
reqInfo.AppendTags("targetID", res.ID.Name)
logger.LogOnceIf(logger.SetReqInfo(GlobalContext, reqInfo), res.Err, res.ID)
}
}
}()
go sys.load(buckets)
return nil
}
// AddRulesMap - adds rules map for bucket name.
func (sys *NotificationSys) AddRulesMap(bucketName string, rulesMap event.RulesMap) {
sys.Lock()
defer sys.Unlock()
rulesMap = rulesMap.Clone()
for _, targetRulesMap := range sys.bucketRemoteTargetRulesMap[bucketName] {
rulesMap.Add(targetRulesMap)
}
// Do not add for an empty rulesMap.
if len(rulesMap) == 0 {
delete(sys.bucketRulesMap, bucketName)
} else {
sys.bucketRulesMap[bucketName] = rulesMap
}
}
// RemoveRulesMap - removes rules map for bucket name.
func (sys *NotificationSys) RemoveRulesMap(bucketName string, rulesMap event.RulesMap) {
sys.Lock()
defer sys.Unlock()
sys.bucketRulesMap[bucketName].Remove(rulesMap)
if len(sys.bucketRulesMap[bucketName]) == 0 {
delete(sys.bucketRulesMap, bucketName)
}
}
// ConfiguredTargetIDs - returns list of configured target id's
func (sys *NotificationSys) ConfiguredTargetIDs() []event.TargetID {
if sys == nil {
return nil
}
sys.RLock()
defer sys.RUnlock()
var targetIDs []event.TargetID
for _, rmap := range sys.bucketRulesMap {
for _, rules := range rmap {
for _, targetSet := range rules {
for id := range targetSet {
targetIDs = append(targetIDs, id)
}
}
}
}
// Filter out targets configured via env
var tIDs []event.TargetID
for _, targetID := range targetIDs {
if !globalEnvTargetList.Exists(targetID) {
tIDs = append(tIDs, targetID)
}
}
return tIDs
}
// RemoveNotification - removes all notification configuration for bucket name.
func (sys *NotificationSys) RemoveNotification(bucketName string) {
sys.Lock()
defer sys.Unlock()
delete(sys.bucketRulesMap, bucketName)
targetIDSet := event.NewTargetIDSet()
for targetID := range sys.bucketRemoteTargetRulesMap[bucketName] {
targetIDSet[targetID] = struct{}{}
delete(sys.bucketRemoteTargetRulesMap[bucketName], targetID)
}
sys.targetList.Remove(targetIDSet)
delete(sys.bucketRemoteTargetRulesMap, bucketName)
}
// RemoveAllRemoteTargets - closes and removes all notification targets.
func (sys *NotificationSys) RemoveAllRemoteTargets() {
sys.Lock()
defer sys.Unlock()
for _, targetMap := range sys.bucketRemoteTargetRulesMap {
targetIDSet := event.NewTargetIDSet()
for k := range targetMap {
targetIDSet[k] = struct{}{}
}
sys.targetList.Remove(targetIDSet)
}
}
// Send - sends event data to all matching targets.
func (sys *NotificationSys) Send(args eventArgs) {
sys.RLock()
targetIDSet := sys.bucketRulesMap[args.BucketName].Match(args.EventName, args.Object.Name)
sys.RUnlock()
if len(targetIDSet) == 0 {
return
}
sys.targetList.Send(args.ToEvent(true), targetIDSet, sys.targetResCh)
}
// NetInfo - Net information
func (sys *NotificationSys) NetInfo(ctx context.Context) madmin.ServerNetHealthInfo {
var sortedGlobalEndpoints []string
/*
Ensure that only untraversed links are visited by this server
i.e. if net perf tests have been performed between a -> b, then do
not run it between b -> a
The graph of tests looks like this
a b c d
a | o | x | x | x |
b | o | o | x | x |
c | o | o | o | x |
d | o | o | o | o |
'x's should be tested, and 'o's should be skipped
*/
hostSet := set.NewStringSet()
for _, ez := range globalEndpoints {
for _, e := range ez.Endpoints {
if !hostSet.Contains(e.Host) {
sortedGlobalEndpoints = append(sortedGlobalEndpoints, e.Host)
hostSet.Add(e.Host)
}
}
}
sort.Strings(sortedGlobalEndpoints)
var remoteTargets []*peerRESTClient
search := func(host string) *peerRESTClient {
for index, client := range sys.peerClients {
if client == nil {
continue
}
if sys.peerClients[index].host.String() == host {
return client
}
}
return nil
}
for i := 0; i < len(sortedGlobalEndpoints); i++ {
if sortedGlobalEndpoints[i] != GetLocalPeer(globalEndpoints) {
continue
}
for j := 0; j < len(sortedGlobalEndpoints); j++ {
if j > i {
remoteTarget := search(sortedGlobalEndpoints[j])
if remoteTarget != nil {
remoteTargets = append(remoteTargets, remoteTarget)
}
}
}
}
netInfos := make([]madmin.NetPerfInfo, len(remoteTargets))
for index, client := range remoteTargets {
if client == nil {
continue
}
var err error
netInfos[index], err = client.NetInfo(ctx)
addr := client.host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, err)
netInfos[index].Addr = addr
if err != nil {
netInfos[index].Error = err.Error()
}
}
return madmin.ServerNetHealthInfo{
Net: netInfos,
Addr: GetLocalPeer(globalEndpoints),
}
}
// DispatchNetPerfInfo - Net perf information from other nodes
func (sys *NotificationSys) DispatchNetPerfInfo(ctx context.Context) []madmin.ServerNetHealthInfo {
serverNetInfos := []madmin.ServerNetHealthInfo{}
for index, client := range sys.peerClients {
if client == nil {
continue
}
serverNetInfo, err := sys.peerClients[index].DispatchNetInfo(ctx)
if err != nil {
serverNetInfo.Addr = client.host.String()
serverNetInfo.Error = err.Error()
}
serverNetInfos = append(serverNetInfos, serverNetInfo)
}
return serverNetInfos
}
// DispatchNetPerfChan - Net perf information from other nodes
func (sys *NotificationSys) DispatchNetPerfChan(ctx context.Context) chan madmin.ServerNetHealthInfo {
serverNetInfos := make(chan madmin.ServerNetHealthInfo)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for _, client := range sys.peerClients {
if client == nil {
continue
}
serverNetInfo, err := client.DispatchNetInfo(ctx)
if err != nil {
serverNetInfo.Addr = client.host.String()
serverNetInfo.Error = err.Error()
}
serverNetInfos <- serverNetInfo
}
wg.Done()
}()
go func() {
wg.Wait()
close(serverNetInfos)
}()
return serverNetInfos
}
// NetPerfParallelInfo - Performs Net parallel tests
func (sys *NotificationSys) NetPerfParallelInfo(ctx context.Context) madmin.ServerNetHealthInfo {
netInfos := []madmin.NetPerfInfo{}
wg := sync.WaitGroup{}
for index, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(index int) {
netInfo, err := sys.peerClients[index].NetInfo(ctx)
netInfo.Addr = sys.peerClients[index].host.String()
if err != nil {
netInfo.Error = err.Error()
}
netInfos = append(netInfos, netInfo)
wg.Done()
}(index)
}
wg.Wait()
return madmin.ServerNetHealthInfo{
Net: netInfos,
Addr: GetLocalPeer(globalEndpoints),
}
}
// DrivePerfInfo - Drive perf information
func (sys *NotificationSys) DrivePerfInfo(ctx context.Context) []madmin.ServerDrivesInfo {
reply := make([]madmin.ServerDrivesInfo, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {
continue
}
index := index
g.Go(func() error {
var err error
reply[index], err = sys.peerClients[index].DriveInfo(ctx)
return err
}, index)
}
for index, err := range g.Wait() {
if err != nil {
addr := sys.peerClients[index].host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, err)
reply[index].Addr = addr
reply[index].Error = err.Error()
}
}
return reply
}
// DrivePerfInfoChan - Drive perf information
func (sys *NotificationSys) DrivePerfInfoChan(ctx context.Context) chan madmin.ServerDrivesInfo {
updateChan := make(chan madmin.ServerDrivesInfo)
wg := sync.WaitGroup{}
for _, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(client *peerRESTClient) {
reply, err := client.DriveInfo(ctx)
addr := client.host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, err)
reply.Addr = addr
if err != nil {
reply.Error = err.Error()
}
updateChan <- reply
wg.Done()
}(client)
}
go func() {
wg.Wait()
close(updateChan)
}()
return updateChan
}
// CPUInfo - CPU information
func (sys *NotificationSys) CPUInfo(ctx context.Context) []madmin.ServerCPUInfo {
reply := make([]madmin.ServerCPUInfo, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {
continue
}
index := index
g.Go(func() error {
var err error
reply[index], err = sys.peerClients[index].CPUInfo(ctx)
return err
}, index)
}
for index, err := range g.Wait() {
if err != nil {
addr := sys.peerClients[index].host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, err)
reply[index].Addr = addr
reply[index].Error = err.Error()
}
}
return reply
}
// DiskHwInfo - Disk HW information
func (sys *NotificationSys) DiskHwInfo(ctx context.Context) []madmin.ServerDiskHwInfo {
reply := make([]madmin.ServerDiskHwInfo, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {
continue
}
index := index
g.Go(func() error {
var err error
reply[index], err = sys.peerClients[index].DiskHwInfo(ctx)
return err
}, index)
}
for index, err := range g.Wait() {
if err != nil {
addr := sys.peerClients[index].host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, err)
reply[index].Addr = addr
reply[index].Error = err.Error()
}
}
return reply
}
// OsInfo - Os information
func (sys *NotificationSys) OsInfo(ctx context.Context) []madmin.ServerOsInfo {
reply := make([]madmin.ServerOsInfo, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {
continue
}
index := index
g.Go(func() error {
var err error
reply[index], err = sys.peerClients[index].OsInfo(ctx)
return err
}, index)
}
for index, err := range g.Wait() {
if err != nil {
addr := sys.peerClients[index].host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, err)
reply[index].Addr = addr
reply[index].Error = err.Error()
}
}
return reply
}
// MemInfo - Mem information
func (sys *NotificationSys) MemInfo(ctx context.Context) []madmin.ServerMemInfo {
reply := make([]madmin.ServerMemInfo, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {
continue
}
index := index
g.Go(func() error {
var err error
reply[index], err = sys.peerClients[index].MemInfo(ctx)
return err
}, index)
}
for index, err := range g.Wait() {
if err != nil {
addr := sys.peerClients[index].host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, err)
reply[index].Addr = addr
reply[index].Error = err.Error()
}
}
return reply
}
// ProcInfo - Process information
func (sys *NotificationSys) ProcInfo(ctx context.Context) []madmin.ServerProcInfo {
reply := make([]madmin.ServerProcInfo, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {
continue
}
index := index
g.Go(func() error {
var err error
reply[index], err = sys.peerClients[index].ProcInfo(ctx)
return err
}, index)
}
for index, err := range g.Wait() {
if err != nil {
addr := sys.peerClients[index].host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
logger.LogIf(ctx, err)
reply[index].Addr = addr
reply[index].Error = err.Error()
}
}
return reply
}
// ServerInfo - calls ServerInfo RPC call on all peers.
func (sys *NotificationSys) ServerInfo() []madmin.ServerProperties {
reply := make([]madmin.ServerProperties, len(sys.peerClients))
var wg sync.WaitGroup
for i, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(client *peerRESTClient, idx int) {
defer wg.Done()
info, err := client.ServerInfo()
if err != nil {
info.Endpoint = client.host.String()
info.State = "offline"
} else {
info.State = "ok"
}
reply[idx] = info
}(client, i)
}
wg.Wait()
return reply
}
// GetLocalDiskIDs - return disk ids of the local disks of the peers.
func (sys *NotificationSys) GetLocalDiskIDs(ctx context.Context) (localDiskIDs [][]string) {
localDiskIDs = make([][]string, len(sys.peerClients))
var wg sync.WaitGroup
for idx, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(idx int, client *peerRESTClient) {
defer wg.Done()
localDiskIDs[idx] = client.GetLocalDiskIDs(ctx)
}(idx, client)
}
wg.Wait()
return localDiskIDs
}
// returns all the peers that are currently online.
func (sys *NotificationSys) getOnlinePeers() []*peerRESTClient {
var peerClients []*peerRESTClient
for _, peerClient := range sys.allPeerClients {
if peerClient != nil && peerClient.IsOnline() {
peerClients = append(peerClients, peerClient)
}
}
return peerClients
}
// restClientFromHash will return a deterministic peerRESTClient based on s.
// Will return nil if client is local.
func (sys *NotificationSys) restClientFromHash(s string) (client *peerRESTClient) {
if len(sys.peerClients) == 0 {
return nil
}
peerClients := sys.getOnlinePeers()
if len(peerClients) == 0 {
return nil
}
idx := xxhash.Sum64String(s) % uint64(len(peerClients))
return peerClients[idx]
}
// NewNotificationSys - creates new notification system object.
func NewNotificationSys(endpoints EndpointServerPools) *NotificationSys {
// targetList/bucketRulesMap/bucketRemoteTargetRulesMap are populated by NotificationSys.Init()
remote, all := newPeerRestClients(endpoints)
return &NotificationSys{
targetList: event.NewTargetList(),
targetResCh: make(chan event.TargetIDResult),
bucketRulesMap: make(map[string]event.RulesMap),
bucketRemoteTargetRulesMap: make(map[string]map[event.TargetID]event.RulesMap),
peerClients: remote,
allPeerClients: all,
}
}
// GetPeerOnlineCount gets the count of online and offline nodes.
func GetPeerOnlineCount() (nodesOnline, nodesOffline int) {
nodesOnline = 1 // Self is always online.
nodesOffline = 0
servers := globalNotificationSys.ServerInfo()
for _, s := range servers {
if s.State == "ok" {
nodesOnline++
continue
}
nodesOffline++
}
return
}
type eventArgs struct {
EventName event.Name
BucketName string
Object ObjectInfo
ReqParams map[string]string
RespElements map[string]string
Host string
UserAgent string
}
// ToEvent - converts to notification event.
func (args eventArgs) ToEvent(escape bool) event.Event {
eventTime := UTCNow()
uniqueID := fmt.Sprintf("%X", eventTime.UnixNano())
respElements := map[string]string{
"x-amz-request-id": args.RespElements["requestId"],
"x-minio-origin-endpoint": globalMinioEndpoint, // MinIO specific custom elements.
}
// Add deployment as part of
if globalDeploymentID != "" {
respElements["x-minio-deployment-id"] = globalDeploymentID
}
if args.RespElements["content-length"] != "" {
respElements["content-length"] = args.RespElements["content-length"]
}
keyName := args.Object.Name
if escape {
keyName = url.QueryEscape(args.Object.Name)
}
newEvent := event.Event{
EventVersion: "2.0",
EventSource: "minio:s3",
AwsRegion: args.ReqParams["region"],
EventTime: eventTime.Format(event.AMZTimeFormat),
EventName: args.EventName,
UserIdentity: event.Identity{PrincipalID: args.ReqParams["accessKey"]},
RequestParameters: args.ReqParams,
ResponseElements: respElements,
S3: event.Metadata{
SchemaVersion: "1.0",
ConfigurationID: "Config",
Bucket: event.Bucket{
Name: args.BucketName,
OwnerIdentity: event.Identity{PrincipalID: args.ReqParams["accessKey"]},
ARN: policy.ResourceARNPrefix + args.BucketName,
},
Object: event.Object{
Key: keyName,
VersionID: args.Object.VersionID,
Sequencer: uniqueID,
},
},
Source: event.Source{
Host: args.Host,
UserAgent: args.UserAgent,
},
}
if args.EventName != event.ObjectRemovedDelete && args.EventName != event.ObjectRemovedDeleteMarkerCreated {
newEvent.S3.Object.ETag = args.Object.ETag
newEvent.S3.Object.Size = args.Object.Size
newEvent.S3.Object.ContentType = args.Object.ContentType
newEvent.S3.Object.UserMetadata = args.Object.UserDefined
}
return newEvent
}
func sendEvent(args eventArgs) {
args.Object.Size, _ = args.Object.GetActualSize()
// remove sensitive encryption entries in metadata.
crypto.RemoveSensitiveEntries(args.Object.UserDefined)
crypto.RemoveInternalEntries(args.Object.UserDefined)
// globalNotificationSys is not initialized in gateway mode.
if globalNotificationSys == nil {
return
}
if globalHTTPListen.NumSubscribers() > 0 {
globalHTTPListen.Publish(args.ToEvent(false))
}
globalNotificationSys.Send(args)
}
// GetBandwidthReports - gets the bandwidth report from all nodes including self.
func (sys *NotificationSys) GetBandwidthReports(ctx context.Context, buckets ...string) bandwidth.Report {
reports := make([]*bandwidth.Report, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
for index := range sys.peerClients {
if sys.peerClients[index] == nil {
continue
}
index := index
g.Go(func() error {
var err error
reports[index], err = sys.peerClients[index].MonitorBandwidth(ctx, buckets)
return err
}, index)
}
for index, err := range g.Wait() {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress",
sys.peerClients[index].host.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
logger.LogOnceIf(ctx, err, sys.peerClients[index].host.String())
}
reports = append(reports, globalBucketMonitor.GetReport(bucketBandwidth.SelectBuckets(buckets...)))
consolidatedReport := bandwidth.Report{
BucketStats: make(map[string]bandwidth.Details),
}
for _, report := range reports {
if report == nil || report.BucketStats == nil {
continue
}
for bucket := range report.BucketStats {
d, ok := consolidatedReport.BucketStats[bucket]
if !ok {
consolidatedReport.BucketStats[bucket] = bandwidth.Details{}
d = consolidatedReport.BucketStats[bucket]
d.LimitInBytesPerSecond = report.BucketStats[bucket].LimitInBytesPerSecond
}
if d.LimitInBytesPerSecond < report.BucketStats[bucket].LimitInBytesPerSecond {
d.LimitInBytesPerSecond = report.BucketStats[bucket].LimitInBytesPerSecond
}
d.CurrentBandwidthInBytesPerSecond += report.BucketStats[bucket].CurrentBandwidthInBytesPerSecond
consolidatedReport.BucketStats[bucket] = d
}
}
return consolidatedReport
}
// GetClusterMetrics - gets the cluster metrics from all nodes excluding self.
func (sys *NotificationSys) GetClusterMetrics(ctx context.Context) chan Metric {
g := errgroup.WithNErrs(len(sys.peerClients))
peerChannels := make([]<-chan Metric, len(sys.peerClients))
for index := range sys.peerClients {
if sys.peerClients[index] == nil {
continue
}
index := index
g.Go(func() error {
var err error
peerChannels[index], err = sys.peerClients[index].GetPeerMetrics(ctx)
return err
}, index)
}
ch := make(chan Metric)
var wg sync.WaitGroup
for index, err := range g.Wait() {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress",
sys.peerClients[index].host.String())
ctx := logger.SetReqInfo(ctx, reqInfo)
if err != nil {
logger.LogOnceIf(ctx, err, sys.peerClients[index].host.String())
continue
}
wg.Add(1)
go func(ctx context.Context, peerChannel <-chan Metric, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case m, ok := <-peerChannel:
if !ok {
return
}
ch <- m
case <-ctx.Done():
return
}
}
}(ctx, peerChannels[index], &wg)
}
go func(wg *sync.WaitGroup, ch chan Metric) {
wg.Wait()
close(ch)
}(&wg, ch)
return ch
}